Flink Collector Output 介面原始碼解析

Flink Collector Output 介面原始碼解析

在 Flink 中 Collector 介面主要用於 operator 傳送(輸出)元素,Output 介面是對 Collector 介面的擴充套件,增加了傳送 WaterMark 的功能,在 Flink 裡面只要涉及到資料的傳遞都必須實現這兩個介面,下面就來梳理一下這些介面的原始碼。

Output Collector UML 圖

Flink Collector Output 介面原始碼解析

WatermarkGaugeExposingOutput

Collector 介面只有 2 個方法:

collect(T record) 用於正常流輸出資料。

close() 關閉 Output ,如果任何資料被緩衝,則該資料將被重新整理。

Output 介面有 4 個方法:

emitWatermark(Watermark mark) 從 operator 發出 Watermark。此水印將廣播到所有下游所有 operator。

emitWatermarkStatus(WatermarkStatus watermarkStatus) 傳送水印狀態。

collect(OutputTag outputTag, StreamRecord record) 傳送資料,這個方法和 Collector 介面中的 collect 方法作用是一樣的,但是這個 collect 方法多了一個 OutputTag 引數,也就是說這個方法主要用在側流輸出場景下。

emitLatencyMarker(LatencyMarker latencyMarker) 傳送 LatencyMarker 它是一種特殊的資料,用來測量資料的延遲。

WatermarkGaugeExposingOutput 介面只有 1 個方法:

getWatermarkGauge() 用來獲取 WatermarkGauge,它是測量最後發出的水印。

我們今天主要說的是 collect 方法,也就是傳送真實資料的方法,Output 介面的實現類是非常多的,因為只要你想傳送資料就必須實現這個介面,那在眾多的實現類裡有幾個是比較重要的,下面我會挑出 7 個常見的實現類進行介紹,我們先來看下面的 Output 實現類的 UML 類圖。

Output 實現類 UML 圖

Flink Collector Output 介面原始碼解析

WatermarkGaugeExposingOutput

可以看到 TimestampedCollector 和 CountingOutput 是直接實現了 Output 介面的,ChainingOutput,RecordWriterOutput,BroadcastingOutputCollector 這三個類是實現了 WatermarkGaugeExposingOutput 介面,主要是為了顯示當前輸出的 Watermark 值,WatermarkGaugeExposingOutput 又繼承了 Output 介面。

根據其使用場景的不同,我們可以把這些 Output 分成五大類:

「同 operatorChain」

ChainingOutput

CopyingChainingOutput

「跨 operatorChain」

RecordWriterOutput

「統計 Metrics」

CountingOutput

「廣播」

BroadcastingOutputCollector

CopyingBroadcastingOutputCollector

「時間戳」

TimestampedCollector

OperatorChain 圖

Flink Collector Output 介面原始碼解析

image-20220914185806184

這是一張 OperatorChain 和 Output 的關係圖,其中虛線代表的是同一個 operatorChain 之間的資料傳遞,使用的是 ChainingOutput,實線代表的是跨 operatorChain 之間資料傳遞,使用的是 RecordWriterOutput。

為了更好的展示每一個 Output 的使用場景,以及把整個資料傳遞流程串聯起來,下面來看一個簡單的 Demo。

Demo

Flink Collector Output 介面原始碼解析

上圖中的 Kafka Source 和 Map 運算元 chain 在一起形成了一個 operatorChain,其中 Kafka Source 又叫做 Head Operator,Map 運算元又叫做 Chain Operator,後面的 Process,兩個 Print 運算元 chain 在一起形成了另外一個 operatorChain,其中 Process 運算元又叫做 Head Operator,Print 運算元又叫做 Chain Operator。

那 Kafka Source -> Map 之間的資料傳遞用的則是 ChainingOutput,對應著上圖中的虛線部分,Map -> Process 之間的資料傳遞使用的是 RecordWriterOutput,對應著上圖中的實線部分。

另外從上面的分類可以看出來,很多 Output 都有一個對應的 CopyingXXXOutput,比如同一個 operatorChain 內資料傳遞是有 ChainingOutput 和 CopyingChainingOutput 兩個實現類的,那這兩者之間又有什麼區別和聯絡呢?我們接著往下面看。

同一個 operatorChain 之間(KafkaSource -> Map)

AsyncDataOutputToOutput#emitRecord

@Override public void emitRecord(StreamRecord streamRecord) { // 更新 metric numRecordsOut。inc(); metricGroup。recordEmitted(streamRecord。getTimestamp()); // 這裡是 CopyingChainingOutput output。collect(streamRecord); }

在 Kafka Source Operator 中傳送資料的物件是 AsyncDataOutputToOutput,它會持有一個 Output ,這裡的 Output 實際上是 CopyingChainingOutput 而不是 ChainingOutput,透過呼叫 collect 傳送資料。

CopyingChainingOutput#collect

@Override public void collect(StreamRecord record) { // 如果是正常的流 outputTag 是空的所以會直接走下面的邏輯 if (this。outputTag != null) { // we are not responsible for emitting to the main output。 return; } pushToOperator(record); }

正常的流是沒有 outputTag 的(只有側流輸出才有)所以會直接走 pushToOperator 方法。

CopyingChainingOutput#pushToOperator

@Override protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects。 @SuppressWarnings(“unchecked”) // 淺複製 StreamRecord castRecord = (StreamRecord) record; // 更新 metric numRecordsIn。inc(); // 對 record 做深複製 StreamRecord copy = castRecord。copy(serializer。copy(castRecord。getValue())); input。setKeyContextElement(copy); // 呼叫下游的 processElement 方法 input。processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String。format( “%s。 Failed to push OutputTag with id ‘%s’ to operator。 ” + “This can occur when multiple OutputTags with different types ” + “but identical names are being used。”, e。getMessage(), outputTag。getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } }

pushToOperator 方法的邏輯很簡單:

對 record 淺複製。

更新 metrics。

對 record 做深複製。

設定 Key 的上下文。

呼叫 chain operator 的 processElement 方法處理資料。

可以看到在深複製的時候是需要對資料進行序列化的,這跟我們廣義上理解的 Flink 在 JobGraph 階段主要優化了 operatorChain,從而減少資料在網路傳輸中序列化和反序列的開銷是不太一致的,難道這句話是錯的嗎?

那我們就來看下 ChainingOutput 的 pushToOperator 方法和 CopyingChainingOutput 的 pushToOperator 有什麼區別呢?

ChainingOutput#pushToOperator

protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator expects。 @SuppressWarnings(“unchecked”) // 淺複製 StreamRecord castRecord = (StreamRecord) record; // 更新 metric numRecordsIn。inc(); // 設定 key 上下文 input。setKeyContextElement(castRecord); // 呼叫下一個運算元處理資料 input。processElement(castRecord); } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } }

你會發現 ChainingOutput 的 pushToOperator 方法和 CopyingChainingOutput 的幾乎一致,唯一的區別就是這裡沒有對 record 做深複製,僅做了一個淺複製,顯然,這種淺複製的方式效能是更高的,那是由什麼決定使用 ChainingOutput 還是 CopyingChainingOutput 呢?其實是透過 env。getConfig()。enableObjectReuse() 這個配置決定的,預設情況下 objectReuse 是 false 也就是會使用 CopyingChainingOutput 如果開啟了 objectReuse 則會使用 ChainingOutput,也就是說如果不開啟 objectReuse 是不能完全發揮 operatorChain 最佳化效果的。

那既然 ChainingOutput 的效能更高,為什麼預設不使用 ChainingOutput 呢?因為在某些場景下,開啟 objectReuse 可能會帶來安全性問題,所以就選擇了 CopyingChainingOutput 作為預設的 Output。

當前 operator 的輸出是下一個 operator 的輸入,所以這裡的 input 是 StreamMap 物件,也就相當於是直接呼叫 StreamMap。processElement 方法來傳輸資料。

StreamMap#processElement

@Override public void processElement(StreamRecord element) throws Exception { // 先呼叫我們自己的 map 邏輯 output。collect(element。replace(userFunction。map(element。getValue()))); }

在 processElement 方法裡面會先呼叫 userFunction 的 map 方法,這裡的 userFunction 其實就是我們自定義的 map 運算元的程式碼邏輯,然後把返回的結果透過 collect 方法傳送到下游運算元,在傳送之前需要先更新相關的 Metric,所以這裡的 output 其實是 CountingOutput 。

CountingOutput#collect

@Override public void collect(StreamRecord record) { // 更新 metric numRecordsOut。inc(); // 傳送資料 output。collect(record); }

CountingOutput 物件主要的作用是更新 Metric,然後再發送資料,因為 Map 是 operatorChain 的最後一個 operator,所以它持有的 Output 是 RecordWriterOutput 物件,也就是上圖所說的實線傳輸資料。

不同 operatorChain 之間(Map -> Process)

RecordWriterOutput#collect

@Override public void collect(StreamRecord record) { if (this。outputTag != null) { // we are not responsible for emitting to the main output。 return; } pushToRecordWriter(record); }

在 RecordWriterOutput 的 collect 方法裡又呼叫了 pushToRecordWriter 方法。

RecordWriterOutput#pushToRecordWriter

private void pushToRecordWriter(StreamRecord record) { serializationDelegate。setInstance(record); try { recordWriter。emit(serializationDelegate); } catch (IOException e) { throw new UncheckedIOException(e。getMessage(), e); } }

透過 recordWriter 的 emit 方法傳送資料,因為是跨 operatorChain 的資料傳輸,並不像 operatorChain 之間資料傳輸那麼簡單,直接呼叫 Chain Operator 的 processElement 處理資料,而是上游先把資料寫到 ResultPartition 裡,然後下游運算元透過 InputChannel 消費資料,這個過程就不在展開了,因為不是我們今天討論的重點。

OneInputStreamTask#StreamTaskNetworkOutput#emitRecord

@Override public void emitRecord(StreamRecord record) throws Exception { // 更新 metric numRecordsIn。inc(); operator。setKeyContextElement(record); operator。processElement(record); }

下游消費到資料後透過 StreamTaskNetworkOutput 的 emitRecord 方法來發送資料,首先還是更新 Metrics,同樣的道理,這裡的 operator 表示的是下游運算元 ProcessOperator,先要設定上下文 key,最後呼叫其 processElement 方法傳遞資料。

同一個 operatorChain 之間(Process -> Print)

從這裡開始,其實還是同一個 operatorChain 內的資料傳遞,整體上的邏輯和上面同一個 operatorChain 之間資料傳遞的邏輯是一樣的,所以下面有些地方就一筆帶過了。

ProcessOperator#processElement

@Override public void processElement(StreamRecord element) throws Exception { // 設定時間戳 collector。setTimestamp(element); // 把 element 賦值給 context。element context。element = element; // 呼叫使用者的程式碼邏輯 userFunction。processElement(element。getValue(), context, collector); // 把 context。element 賦值為空 context。element = null; }

在 processElement 方法裡主要做了下面幾件事情:

給 StreamRecord 物件設定時間戳屬性。

把 element 賦值給 context。element。

執行使用者自定義的 ProcessFunction 的 processElement 方法。

把 context。element 設定為空。

其中步驟 3 是最重要的,所以我們再來看下在 3 裡面是如何傳遞資料的?

ProcessFunction#processElement

new ProcessFunction() { @Override public void processElement( JasonLeePOJO value, ProcessFunction。Context ctx, Collector out) throws Exception { if (value。getName()。equals(“flink”)) { // 正常流 out。collect(value); } else if (value。getName()。equals(“spark”)) { // 側流 ctx。output(test, value); } } })

ProcessFunction 是我們自定義的程式碼邏輯,主要實現了 processElement 方法,在這裡會有兩種不同的 Output,一種是正常的流輸出,一種是側流輸出,正常的流輸出用的是 TimestampedCollector,側流輸出用的是 ContextImpl 物件,它實現了 Context 抽象類的 output 方法。

TimestampedCollector#collect

@Override public void collect(T record) { output。collect(reuse。replace(record)); }

在 TimestampedCollector 的 collect 方法裡沒做任何處理,直接呼叫 CountingOutput 的 collect 方法傳遞資料。

ContextImpl#output

@Override public void output(OutputTag outputTag, X value) { if (outputTag == null) { throw new IllegalArgumentException(“OutputTag must not be null。”); } output。collect(outputTag, new StreamRecord<>(value, element。getTimestamp())); }

側流和正常流稍微不同的是它並沒有實現 Output 介面,而是實現了 Context 物件,但是 output 方法裡的 output 同樣也是 CountingOutput。

CountingOutput#collect

@Override public void collect(StreamRecord record) { numRecordsOut。inc(); output。collect(record); }

CountingOutput 的 collect 裡先是更新 Metrics,因為需要像下游廣播資料,所以這裡的 output 是 BroadcastingOutputCollector。

BroadcastingOutputCollector#collect

@Override public void collect(StreamRecord record) { for (Output> output : outputs) { output。collect(record); } }

因為這是在同一個 operatorChain 內傳遞資料,所以這裡的 output 是 CopyingChainingOutput。與 BroadcastingOutputCollector 對應的還有一個 CopyingBroadcastingOutputCollector,這裡也順便看一下。

CopyingBroadcastingOutputCollector#collect

@Override public void collect(OutputTag outputTag, StreamRecord record) { for (int i = 0; i < outputs。length - 1; i++) { Output> output = outputs[i]; StreamRecord shallowCopy = record。copy(record。getValue()); output。collect(outputTag, shallowCopy); } if (outputs。length > 0) { // don‘t copy for the last output outputs[outputs。length - 1]。collect(outputTag, record); } }

CopyingBroadcastingOutputCollector 是 BroadcastingOutputCollector 的特殊版本,在 collect 方法裡面多了一個淺複製的邏輯,如果開啟了 objectReuse 則使用 CopyingBroadcastingOutputCollector 否則使用 BroadcastingOutputCollector。

CopyingChainingOutput#collect

@Override public void collect(StreamRecord record) { if (this。outputTag != null) { // we are not responsible for emitting to the main output。 return; } pushToOperator(record); }

這裡邏輯和上面一樣,就跳過了。

CopyingChainingOutput#pushToOperator

@Override protected void pushToOperator(StreamRecord record) { try { // we know that the given outputTag matches our OutputTag so the record // must be of the type that our operator (and Serializer) expects。 @SuppressWarnings(“unchecked”) StreamRecord castRecord = (StreamRecord) record; numRecordsIn。inc(); StreamRecord copy = castRecord。copy(serializer。copy(castRecord。getValue())); input。setKeyContextElement(copy); input。processElement(copy); } catch (ClassCastException e) { if (outputTag != null) { // Enrich error message ClassCastException replace = new ClassCastException( String。format( “%s。 Failed to push OutputTag with id ’%s‘ to operator。 ” + “This can occur when multiple OutputTags with different types ” + “but identical names are being used。”, e。getMessage(), outputTag。getId())); throw new ExceptionInChainedOperatorException(replace); } else { throw new ExceptionInChainedOperatorException(e); } } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } }

還是和上面一樣,直接呼叫下游 operator 的 processElement 傳遞資料,這裡的下游是 StreamSink,所以 input 是 StreamSink。

StreamSink#processElement

@Override public void processElement(StreamRecord element) throws Exception { sinkContext。element = element; userFunction。invoke(element。getValue(), sinkContext); }

在 processElement 方法裡會呼叫 userFunction 的 invoke 方法,但是這裡的 userFunction 不是我們自定義實現的,而是 Flink 預設提供的 PrintSinkFunction。

為了更加方便的對比開啟 objectReuse 和不開啟 objectReuse 的不同之處,整個呼叫鏈路如下:

不開啟(預設)objectReuse 的呼叫鏈:

AsyncDataOutputToOutput。emitRecord——>CopyingChainingOutput。collect——>CopyingChainingOutput。pushToOperator ——>StreamMap。processElement ——>CountingOutput。collect ——>RecordWriterOutput。collect ——>RecordWriterOutput。pushToRecordWriter ——>AbstractStreamTaskNetworkInput。emitNext ——>AbstractStreamTaskNetworkInput。processElement ——>OneInputStreamTask。StreamTaskNetworkOutput。emitRecord ——>ProcessOperator。processElement ——>ProcessFunction。processElement ——>TimestampedCollector。collect (這個是正常流的鏈路) ——>CountingOutput。collect ——>BroadcastingOutputCollector。collect ——>CopyingChainingOutput。collect ——>CopyingChainingOutput。pushToOperator ——>StreamSink。processElement ——>SinkFunction。invoke ——>PrintSinkFunction。invoke

開啟 objectReuse 的呼叫鏈:

AsyncDataOutputToOutput。emitRecord——>ChainingOutput。collect——>ChainingOutput。pushToOperator ——>StreamMap。processElement ——>CountingOutput。collect ——>RecordWriterOutput。collect ——>RecordWriterOutput。pushToRecordWriter ——>AbstractStreamTaskNetworkInput。emitNext ——>AbstractStreamTaskNetworkInput。processElement ——>OneInputStreamTask。StreamTaskNetworkOutput。emitRecord ——>ProcessFunction。processElement ——>ProcessOperator。ContextImpl。output (這個是側流輸出的鏈路) ——>CountingOutput。collect ——>CopyingBroadcastingOutputCollector。collect ——>ChainingOutput。collect ——>ChainingOutput。pushToOperator ——>StreamSink。processElement ——>SinkFunction。invoke ——>PrintSinkFunction。invoke

經過對比整個呼叫鏈路,你會發現,不開啟(預設)objectReuse 的時候,在 operatorChain 之間傳遞資料用的是 CopyingChainingOutput,在有側流輸出廣播的場景下用的是 BroadcastingOutputCollector,開啟 objectReuse 的話,在 operatorChain 之間傳遞資料用的是 ChainingOutput,在有側流輸出廣播的場景下用的是 CopyingBroadcastingOutputCollector,其他地方沒有差別。

總結

本文從一個簡單的 Flink 應用程式出發,介紹了常見的幾個 Output 實現類的使用場景及原始碼解析,ChainingOutput 主要用在 operatorChain 內部傳遞資料,RecordWriterOutput 主要用在跨 operatorChain 不同 Task 之間傳遞資料,CountingOutput 主要是為了更新 Metrics,BroadcastingOutputCollector 主要用於廣播場景下,TimestampedCollector 主要用來給 StreamRecord 設定時間戳屬性。

推薦閱讀

Flink 任務實時監控最佳實踐

Flink on yarn 實時日誌收集最佳實踐

Flink 1。14。0 全新的 Kafka Connector

Flink 1。14。0 消費 kafka 資料自定義反序列化類

Flink SQL JSON Format 原始碼解析

Flink on yarn 遠端除錯原始碼

Flink 透過 State Processor API 實現狀態的讀取和寫入

Flink 側流輸出原始碼解析

Flink 原始碼:廣播流狀態原始碼解析

Flink 原始碼分析之 Client 端啟動流程分析

Flink Print SQL Connector 新增隨機取樣功能

Flink Collector Output 介面原始碼解析

公眾號素材

如果你覺得文章對你有幫助,麻煩點一下

在看

吧,你的支援是我創作的最大動力。