Flink 流處理WordCount 示例

繼上一篇是Flink批處理WordCount 示例

Windows 開啟埠監聽

Windows預設不支援 nc 埠監聽,透過下載

netcat

並配置

netcat下載地址:

netcat 1。11 for Win32/Win64

下載後,解壓目錄下除了。txt 檔案,複製到

C:\Windows\System32

目錄下,

Flink 流處理WordCount 示例

然後開啟

cmd視窗

使用 使用命令:

nc -lp 8888

即可開啟監聽 8888 埠號。

如下圖:

Flink 流處理WordCount 示例

編寫程式碼

public static void main(String[] args) throws Exception { // 建立流處理的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); //2。使用StreamExecutionEnvironment建立DataStream //Source(可以有多個Source) //Socket 監聽本地埠8888 // 接收一個socket文字流 DataStreamSource lines = env。socketTextStream(“localhost”, 8888); // 3。進行轉化處理統計 //Transformation(s)對資料進行處理操作 SingleOutputStreamOperator> wordAndOne = lines。flatMap(new FlatMapFunction>() { @Override public void flatMap(String line, Collector> out) throws Exception { //切分 String[] words = line。split(“ ”); //迴圈, for (String word : words) { //將每個單詞與 1 組合,形成一個元組 Tuple2 tp = Tuple2。of(word, 1); //將組成的Tuple放入到 Collector 集合,並輸出 out。collect(tp); } } }); //進行分組聚合(keyBy:將key相同的分到一個組中) SingleOutputStreamOperator> resultDataStream = wordAndOne。keyBy(0)。sum(1); //Transformation 結束 //4。呼叫Sink (Sink必須呼叫) resultDataStream。print()。setParallelism(1); //5。 啟動任務執行 env。execute(“stream word count”); }

執行程式碼

在命令列,輸入內容,在控制檯可看到實時計算結果,效果如下圖所示:

Flink 流處理WordCount 示例