繼上一篇是Flink批處理WordCount 示例
Windows 開啟埠監聽
Windows預設不支援 nc 埠監聽,透過下載
netcat
並配置
netcat下載地址:
netcat 1。11 for Win32/Win64
下載後,解壓目錄下除了。txt 檔案,複製到
C:\Windows\System32
目錄下,
然後開啟
cmd視窗
使用 使用命令:
nc -lp 8888
即可開啟監聽 8888 埠號。
如下圖:
編寫程式碼
public static void main(String[] args) throws Exception { // 建立流處理的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); //2。使用StreamExecutionEnvironment建立DataStream //Source(可以有多個Source) //Socket 監聽本地埠8888 // 接收一個socket文字流 DataStreamSource lines = env。socketTextStream(“localhost”, 8888); // 3。進行轉化處理統計 //Transformation(s)對資料進行處理操作 SingleOutputStreamOperator> wordAndOne = lines。flatMap(new FlatMapFunction>() { @Override public void flatMap(String line, Collector> out) throws Exception { //切分 String[] words = line。split(“ ”); //迴圈, for (String word : words) { //將每個單詞與 1 組合,形成一個元組 Tuple2 tp = Tuple2。of(word, 1); //將組成的Tuple放入到 Collector 集合,並輸出 out。collect(tp); } } }); //進行分組聚合(keyBy:將key相同的分到一個組中) SingleOutputStreamOperator> resultDataStream = wordAndOne。keyBy(0)。sum(1); //Transformation 結束 //4。呼叫Sink (Sink必須呼叫) resultDataStream。print()。setParallelism(1); //5。 啟動任務執行 env。execute(“stream word count”); }
執行程式碼
在命令列,輸入內容,在控制檯可看到實時計算結果,效果如下圖所示: