Flink實戰之會話視窗WindowsAPI使用示例

介紹

Fink的視窗(Window)可以分成兩類:

1、CountWindow:按照指定的資料條數生成一個 Window,與時間無關。

2、TimeWindow:按照時間生成 Window。

TimeWindow,可以根據視窗實現原理的不同分成三類:滾動視窗(Tumbling

Window)、滑動視窗(Sliding Window)和會話視窗(Session Window)。

本文介紹會話視窗(Session Window),並透過例子說明如何使用這些視窗。

接下來文章介紹滾動視窗(TumblingWindow)、滑動視窗(Sliding Window)、CountWindow視窗,

歡迎關注。

會話視窗使用例子

類似於web應用的session,也就是一段時間沒有接收到新的資料就會生成新的視窗,由一系列事件組合一個指定時間長度的timeout間隙組成。

特點:時間無對齊。

Flink實戰之會話視窗WindowsAPI使用示例

示例說明:

兩次的時間間隔超過5秒的基礎上再加延遲1秒後,沒有新的活動事件這個視窗就會關閉,然後處理這個視窗區間內所產生的活動資料計算。

示例程式碼如下:

// 建立流處理的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment(); //宣告使用eventTime env。setStreamTimeCharacteristic(TimeCharacteristic。EventTime); //2。使用StreamExecutionEnvironment建立DataStream //Source(可以有多個Source) //Socket 監聽本地埠8888 // 接收一個socket文字流 DataStreamSource lines = env。socketTextStream(“localhost”, 8888); // 輸入的活動資料轉換 DataStream> windowCount = lines。map(new MapFunction>() { @Override public Tuple3 map(String line) throws Exception { String[] words = line。split(“ ”); return new Tuple3(words[0], Long。valueOf(words[1]), 1); } }); // 描述 flink如何獲取資料中的event時間戳進行判斷 // 描述延遲的watermark1秒 DataStream> textWithEventTimeDStream = windowCount。assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor> (Time。milliseconds(1000)) { @Override public long extractTimestamp(Tuple3 stringLongIntegerTuple3) { return stringLongIntegerTuple3。f1; } })。setParallelism(1); // 按key分組,keyBy之後是分到各個分割槽再window去處理 KeyedStream, Tuple> textKeyStream = textWithEventTimeDStream。keyBy(0); textKeyStream。print(“textKey: ”); // 設定5秒的(會話視窗)活動時間間隔 SingleOutputStreamOperator> windowStream = textKeyStream。window(EventTimeSessionWindows。withGap(Time。milliseconds(5000L)))。sum(2); //3。呼叫Sink (Sink必須呼叫) windowStream。print(“windows: ”)。setParallelism(1); //啟動(這個異常不建議try。。。catch。。。 捕獲,因為它會拋給上層flink,flink根據異常來做相應的重啟策略等處理) env。execute(“StreamWordCount”);

在終端透過命令nc -lk 8888 輸入一些資料

Sensor1 1000Sensor1 7000Sensor1 10000Sensor1 15000Sensor1 17000Sensor1 24000

檢視一下效果

輸入第1個引數表示感測器id,空格後第2個引數表示時間,進行前一次輸入與當前輸入時間對比是否超過時間間隔。

第一個會話視窗 7000-1000=6秒 超過活動時間間隔5秒+延遲的watermark1秒,觸發計算

第一個會話視窗 24000-17000=7秒 超過活動時間間隔5秒+延遲的watermark1秒,觸發計算

Flink實戰之會話視窗WindowsAPI使用示例

如果覺得文章能幫到您,歡迎關注wx公眾號:“大資料技術天涯” ,共同進步!

持續分享java微服務技術,大資料、人工智慧等科技類原創文章。