Flink操練(五十九)之資料寫入到kafka

關鍵程式碼

Properties properties = new Properties(); properties。put(“bootstrap。servers”, “localhost:9092”); DataStreamSource source = env。readTextFile(“G:/bigData_learn/Flink_learn/src/main/resources/UserBehavior。csv”); source。addSink(new FlinkKafkaProducer( “user-behavior” ,new SimpleStringSchema() ,properties ));

完整程式碼

package day07;import org。apache。flink。api。common。serialization。SimpleStringSchema;import org。apache。flink。streaming。api。datastream。DataStreamSource;import org。apache。flink。streaming。api。environment。StreamExecutionEnvironment;import org。apache。flink。streaming。connectors。kafka。FlinkKafkaProducer;import java。util。Properties;/** * @program: bigData_learn * @description: 寫入到Kafka * @author: Mr。逗 * @create: 2021-09-26 09:52 **/public class WriteToKafka { public static void main(String[] args) { StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); env。setParallelism(1); Properties properties = new Properties(); properties。put(“bootstrap。servers”, “localhost:9092”); DataStreamSource source = env。readTextFile(“G:/bigData_learn/Flink_learn/src/main/resources/UserBehavior。csv”); source。addSink(new FlinkKafkaProducer( “user-behavior” ,new SimpleStringSchema() ,properties )); String name = WriteToKafka。class。getName(); try { env。execute(name); }catch (Exception e) { e。printStackTrace(); } }}