Flink面試大全總結(全文6萬字、110個知識點、160張圖)

前言

Hello,各位大資料學習愛好者,我是

3分鐘秒懂大資料

公眾號的作者土哥,目前在杭州某網際網路大廠擔任大資料演算法工程師,組內專注於Flink流式計算元件以及AB融合技術,為了讓更多朋友更清晰的瞭解流式計算元件,現在我以面試的方式為大家全面總結了Flink所涉及的知識點,全文總共6萬字,涉及各種原理,以及原始碼分析,

圖片是一張張繪製而出

,歡迎大家進行解讀

可以新增作者微信:threeknowbigdata,備註:FLink,領取pdf版文章。

在網際網路行業,我們都知道薪資結構跟崗位存在直接關係,如下面這幅關係圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

如果你想找大資料開發或者大資料演算法工程師、那Flink流計算框架是面試必考點。

而作為一名大資料演算法工程師,土哥我專門整理了Flink 面試常問的題目,同時這些題目之間的

關聯性

很強,本文將透過

問答 + 圖解

的形式

由淺入深

幫助大家進一步學習和理解

Flink流批一體

計算框架。

全文總計6萬字

110個知識點

160張原理、流程圖

。提綱如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

提綱

正文

01、Flink 基礎篇

1、什麼是Flink?描述一下

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink是一個以

為核心的高可用、高效能的分散式計算引擎。具備

流批一體,

高吞吐、低延遲,容錯能力,大規模複雜計算等特點,在資料流上提供

資料分發

、通訊等功能。

2、能否詳細解釋一下其中的 資料流、流批一體、容錯能力等概念?

資料流:

所有產生的

資料

都天然帶有

時間概念

,把 事件 按照時間順序排列起來,就形成了一個事件流,也被稱作資料流。

流批一體:

首先必須先明白什麼是

有界資料

無界資料

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

有界資料

,就是在一個確定的時間範圍內的資料流,有開始,有結束,一旦確定就不會再改變,一般

批處理

用來處理有界資料,如上圖的 bounded stream。

無界資料

,就是持續產生的資料流,資料是無限的,有開始,無結束,一般

流處理

用來處理無界資料。如圖 unbounded stream。

Flink的設計思想是以

為核心,批是流的特例,擅長處理 無界 和 有界 資料, Flink 提供 精確的時間控制能力 和 有狀態 計算機制,可以輕鬆應對無界資料流,同時 提供

視窗

處理有界資料流。所以被成為流批一體。

容錯能力:

在分散式系統中,硬體故障、程序異常、應用異常、網路故障等異常無處不在,Flink引擎必須保證故障發生後 不僅可以

重啟

應用程式,還要

確保

其內部狀態保持一致,從最後一次正確的時間點重新出發

Flink提供

叢集級容錯

應用級容錯

能力

叢集級容錯:

Flink 與 叢集管理器緊密連線,如YARN、Kubernetes,當程序掛掉後,自動重啟新程序接管之前的工作。同時具備

高可用性 ,

可消除所有單點故障,

應用級容錯:

Flink 使用 輕量級分散式快照,設計檢查點(

checkpoint

)實現可靠容錯。

Flink 利用檢查點特性,在框架層面 提供

Exactly-once

語義,即端到端的一致性,確保資料僅處理一次,不會重複也不會丟失,即使出現故障,也能保證資料只寫一次。

3、Flink 和 Spark Streaming的區別?

Flink

Spark Sreaming 最大的區別

在於:Flink 是標準的實時處理引擎,基於事件驅動,

以流為核心

,而 Spark Streaming 的RDD 實際是一組小批次的RDD集合,是微批(Micro-Batch)的模型,

以批為核心

下面我們介紹兩個框架的主要區別:

1. 架構模型

Spark Streaming 在執行時的主要角色包括:

服務架構叢集和資源管理 Master / Yarn Application Master;

工作節點 Work / Node Manager;

任務排程器 Driver;任務執行器 Executor

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink 在執行時主要包含:客戶端 Client、作業管理 Jobmanager、任務管理Taskmanager。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

2. 任務排程

Spark Streaming 連續不斷的生成微小的資料批次,構建有向無環圖DAG,Spark Streaming 會依次建立 DStreamGraph、JobScheduler。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink 根據使用者提交的程式碼生成

StreamGraph

,經過最佳化生成

JobGraph

,然後提交給 JobManager進行處理,JobManager 會根據 JobGraph 生成

ExecutionGraph

,ExecutionGraph 是 Flink 排程最核心的資料結構,JobManager 根據 ExecutionGraph 對 Job 進行排程,根據物理執行圖部署到Taskmanager上形成具體的Task執行。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

3. 時間機制

Spark Streaming 支援的時間機制有限,只支援處理時間。

Flink 支援了流處理程式在時間上的三個定義:事件時間 EventTime、攝入時間 IngestionTime 、處理時間 ProcessingTime。同時也支援 watermark 機制來處理滯後資料。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

4. 容錯機制

對於 Spark Streaming 任務,我們可以設定 checkpoint,然後假如發生故障並重啟,我們可以從上次 checkpoint 之處恢復,但是這個行為只能使得資料不丟失,可能會重複處理,不能做到恰好一次處理語義。

Flink 則使用兩階段提交協議來解決這個問題。

4、Flink的架構包含哪些?

Flink 架構分為

技術架構

執行架構

兩部分。

5、簡單介紹一下技術架構

如下圖為Flink技術架構:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink 作為流批一體的分散式計算引擎,必須提供面向開發人員的

API層

,同時還需要跟外部資料儲存進行互動,需要

聯結器

,作業開發、測試完畢後,需要提交叢集執行,需要

部署層

,同時還需要運維人員能夠管理和監控,還提供圖計算、機器學習、SQL等,需要

應用框架層

6、詳細介紹一下Flink的執行架構

如下圖為Flink執行架構:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink 叢集採取 Master - Slave 架構,Master的角色為

JobManager

,負責叢集和作業管理,Slave的角色是

TaskManager

,負責執行計算任務,同時,Flink 提供客戶端 Client 來管理叢集和提交任務,

JobManager

TaskManager

是叢集的程序。

(1)Client

Flink 客戶端是F1ink 提供的 CLI 命令列工具,用來提交 Flink 作業到 Flink 叢集,在客戶端中負責 StreamGraph (流圖)和 Job Graph (作業圖)的構建。

(2)JobManager

JobManager 根據並行度將 Flink 客戶端提交的Flink 應用分解為子任務,從資源管理器 ResourceManager 申請所需的計算資源,資源具備之後,開始分發任務到 TaskManager 執行 Task,並負責應用容錯,跟蹤作業的執行狀態,發現異常則恢復作業等。

(3)TaskManager

TaskManager 接收 JobManage 分發的子任務,根據自身的資源情況 管理子任務的啟動、 停止、銷燬、異常恢復等生命週期階段。Flink程式中必須有一個TaskManager。

7、Flink的並行度介紹一下?

Flink程式在執行的時候,會被對映成一個

Streaming Dataflow。

一個Streaming Dataflow是由一組Stream和Transformation Operator組成的。在啟動時從一個或多個Source Operator開始,結束於一個或多個Sink Operator。

Flink程式本質上是並行的和分散式的

,在執行過程中,

一個流(stream)包含一個或多個流分割槽

,而每一個operator包含一個或多個operator子任務。操作子任務間彼此獨立,在不同的執行緒中執行,甚至是在不同的機器或不同的容器上。

operator子任務的數量是這一特定operator的並行度

。相同程式中的不同operator有不同級別的並行度。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

一個Stream可以被分成多個Stream的分割槽,也就是Stream Partition。

一個Operator也可以被分為多個Operator Subtask

。如上圖中,Source被分成Source1和Source2,它們分別為Source的Operator Subtask。每一個Operator Subtask都是在不同的執行緒當中獨立執行的。

一個Operator的並行度,就等於Operator Subtask的個數

上圖Source的並行度為2。而一個Stream的並行度就等於它生成的Operator的並行度。資料在兩個operator之間傳遞的時候有兩種模式:

(1)

One to One模式:

兩個operator用此模式傳遞的時候,會保持資料的分割槽數和資料的 排序;如上圖中的Source1到Map1,它就保留的Source的分割槽特性,以及分割槽元素處 理的有序性。

(2)

Redistributing (重新分配)模式

:這種模式會改變資料的分割槽數;每個operator subtask會根據選擇transformation把資料傳送到不同的目標subtasks,比如keyBy()會透過hashcode重新分割槽,broadcast()和rebalance()方法會隨機重新分割槽;

8、Flink的並行度的怎麼設定的?

我們在實際生產環境中可以從四個不同層面設定並行度:

操作運算元層面(Operator Level)

執行環境層面(Execution Environment Level)

客戶端層面(Client Level)

系統層面(System Level)

需要注意的優先順序:運算元層面>環境層面>客戶端層面>系統層面。

9、Flink程式設計模型瞭解不?

Flink 應用程式主要由三部分組成,

Source、

轉換

transformation、

目的地

sink。這些流式 dataflows 形成了有向圖,以一個或多個源(source)開始,並以一個或多個目的地(sink)結束。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

10、Flink作業中的DataStream,Transformation介紹一下

Flink作業中,包含兩個基本的塊:

資料流(DataStream)

轉換(Transformation)

DataStream是邏輯概念,為開發者提供API介面,Transformation是處理行為的抽象,包含了資料的讀取、計算、寫出。所以Flink 作業中的DataStream API 呼叫,實際上構建了多個由 Transformation組成的資料處理流水線(Pipeline)

DataStream API 和 Transformation 的轉換如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

11、Flink的分割槽策略瞭解嗎?

目前 Flink 支援8種分割槽策略的實現,資料分割槽體系如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)GlobalPartitioner

資料會被分發到下游運算元的第一個例項中進行處理。

(2)ForwardPartitioner

在API層面上ForwardPartitioner應用在 DataStream上,生成一個新的 DataStream。

該Partitioner 比較特殊,用於在同一個 OperatorChain 中上下游運算元之間的資料轉發,實際上資料是直接傳遞給下游的,要求上下游並行度一樣。

(3)ShufflePartitioner

隨機的將元素進行分割槽,可以確保下游的Task能夠均勻地獲得資料,使用程式碼如下:

dataStream。shuffle();

(4)RebalancePartitioner

以Round-robin 的方式為每個元素分配分割槽,確保下游的 Task 可以均勻地獲得資料,避免資料傾斜。使用程式碼如下:

dataStream。rebalance();

(5)RescalePartitioner

根據上下游 Task 的數量進行分割槽, 使用

Round-robin

選擇下游的一個Task 進行資料分割槽,如上游有2個 Source。,下游有6個 Map,那麼每個 Source 會分配3個固定的下游 Map,不會向未分配給自己的分割槽寫人資料。這一點與 ShufflePartitioner 和 RebalancePartitioner 不同, 後兩者會寫入下游所有的分割槽。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

執行程式碼如下:

dataStream。rescale();

(6)BroadcastPartitioner

將該記錄廣播給所有分割槽,即有N個分割槽,就把資料複製N份,每個分割槽1份,其使用程式碼如下:

dataStream。broadcast();

(7)KeyGroupStreamPartitioner

在API層面上,KeyGroupStreamPartitioner應用在 KeyedStream上,生成一個新的 KeyedStream。

KeyedStream根據keyGroup索引編號進行分割槽,會將資料按 Key 的 Hash 值輸出到下游運算元例項中。該分割槽器不是提供給使用者來用的。

KeyedStream在構造Transformation的時候預設使用KeyedGroup分割槽形式,從而在底層上支援作業Rescale功能。

(8)CustomPartitionerWrapper

使用者自定義分割槽器。需要使用者自己實現Partitioner介面,來定義自己的分割槽邏輯。

更詳細的介紹,請參考之前寫的 Flink分割槽策略:你可以不會,但不能不懂

12、描述一下Flink wordcount執行包含的步驟有哪些?

主要包含以下幾步:

(1)獲取執行環境 StreamExecutionEnvironment

(2)接入source源

(3)執行轉換操作,如map()、flatmap()、keyby()、sum()

(4)輸出sink源 如print()

(5) 執行 execute

提供一個示例:

import org。apache。flink。api。common。functions。FlatMapFunction;import org。apache。flink。api。java。utils。ParameterTool;import org。apache。flink。streaming。api。datastream。DataStream;import org。apache。flink。streaming。api。datastream。DataStreamSource;import org。apache。flink。streaming。api。environment。StreamExecutionEnvironment;import org。apache。flink。streaming。api。windowing。time。Time;import org。apache。flink。util。Collector;public class WordCount {public static void main(String[] args) throws Exception {//定義socket的埠號int port;try{ ParameterTool parameterTool = ParameterTool。fromArgs(args); port = parameterTool。getInt(“port”); }catch (Exception e){ System。err。println(“沒有指定port引數,使用預設值9000”); port = 9000; }//獲取執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironment();//連線socket獲取輸入的資料 DataStreamSource text = env。socketTextStream(“10。192。12。106”, port, “\n”);//計算資料 DataStream windowCount = text。flatMap(new FlatMapFunction() {public void flatMap(String value, Collector out) throws Exception { String[] splits = value。split(“\\s”);for (String word:splits) { out。collect(new WordWithCount(word,1L)); } } })//打平操作,把每行的單詞轉為型別的資料 。keyBy(“word”)//針對相同的word資料進行分組 。timeWindow(Time。seconds(2),Time。seconds(1))//指定計算資料的視窗大小和滑動視窗大小 。sum(“count”); //把資料列印到控制檯 windowCount。print() 。setParallelism(1);//使用一個並行度//注意:因為flink是懶載入的,所以必須呼叫execute方法,上面的程式碼才會執行 env。execute(“streaming word count”); }/** * 主要為了儲存單詞以及單詞出現的次數 */public static class WordWithCount{public String word;public long count;public WordWithCount(){}public WordWithCount(String word, long count) {this。word = word;this。count = count; }@Overridepublic String toString() {return “WordWithCount{” +“word=‘” + word + ’\‘’ +“, count=” + count +‘}’; } }}

13、Flink常用的運算元有哪些?

分兩部分:

(1)資料讀取,這是Flink流計算應用的起點,常用運算元有:

從記憶體讀:fromElements、從檔案讀:readTextFile、Socket 接入 :socketTextStream、自定義讀取:createInput

(2)處理資料的運算元,主要用於

轉換

過程

常用的運算元包括:Map(單輸入單輸出)、FlatMap(單輸入、多輸出)、Filter(過濾)、KeyBy(分組)、Reduce(聚合)、Window(視窗)、Connect(連線)、Split(分割)等。

02、Flink 核心篇

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

核心篇主要涉及以上知識點,下面讓我們詳細瞭解一下。

14、Flink的四大基石包含哪些?

Flink四大基石分別是:Checkpoint(檢查點)、State(狀態)、Time(時間)、Window(視窗)。

15、說說Flink視窗,以及劃分機制

視窗概念:

將無界流的資料,按時間區間,劃分成多份資料,分別進行統計

(聚合)

Flink支援兩種劃分視窗的方式(time和count),第一種,按時間驅動進行劃分、另一種按資料驅動進行劃分。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

1、按時間驅動Time Window 劃分可以分為 滾動視窗

Tumbling Window

和滑動視窗

Sliding Window。

2、按資料驅動Count Window也可以劃分為滾動視窗

Tumbling Window

和滑動視窗

Sliding Window。

3、Flink支援視窗的兩個重要屬性(

視窗長度size和滑動間隔interval

),透過視窗長度和滑動間隔來區分滾動視窗和滑動視窗。

如果size=interval,那麼就會形成tumbling-window(無重疊資料)——滾動視窗

如果size(1min)>interval(30s),那麼就會形成sliding-window(有重疊資料)——滑動視窗

透過組合可以得出

四種基本視窗:

(1)time-tumbling-window 無重疊資料的時間視窗,設定方式舉例:timeWindow(Time。seconds(5))——-基於時間的滾動視窗

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(2)time-sliding-window 有重疊資料的時間視窗,設定方式舉例:timeWindow(Time。seconds(10), Time。seconds(5))——-基於時間的滑動視窗

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(3)count-tumbling-window無重疊資料的數量視窗,設定方式舉例:countWindow(5)——-基於數量的滾動視窗

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(4)count-sliding-window 有重疊資料的數量視窗,設定方式舉例:countWindow(10,5)——-基於數量的滑動視窗

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink中還支援一個特殊的視窗:會話視窗SessionWindows

session視窗分配器透過session活動來對元素進行分組,session視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況

session視窗在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個視窗就會關閉。

一個session視窗透過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session視窗中去,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

16、看你基本概念講的還是很清楚的,那你介紹下Flink的視窗機制以及各元件之間是如何相互工作的

以下為視窗機制的流程圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

WindowAssigner

1、視窗運算元負責處理視窗,資料流源源不斷地進入運算元(window operator)時,每一個到達的元素首先會被交給 WindowAssigner。

WindowAssigner 會決定元素被放到哪個或哪些視窗(window)

,可能會建立新視窗。因為一個元素可以被放入多個視窗中(個人理解是滑動視窗,滾動視窗不會有此現象),所以同時存在多個視窗是可能的。注意,

Window

本身只是一個ID識別符號

,其內部可能儲存了一些元資料,如

TimeWindow

中有開始和結束時間,但是並不會儲存視窗中的元素。視窗中的元素實際儲存在 Key/Value State 中,key為

Window

,value為元素集合(或聚合值)。為了保證視窗的容錯性,該實現依賴了 Flink 的 State 機制。

WindowTrigger

2、每一個Window都擁有一個屬於自己的 Trigger,Trigger上會有定時器,

用來決定一個視窗何時能夠被計算或清除

。每當有元素加入到該視窗,或者之前註冊的定時器超時了,那麼Trigger都會被呼叫。Trigger的返回結果可以是 :

(1)continue(繼續、不做任何操作),

(2)Fire(觸發計算,處理視窗資料),

(3)Purge(觸發清理,移除視窗和視窗中的資料),

(4)Fire + purge(觸發計算+清理,處理資料並移除視窗和視窗中的資料)。

當資料到來時,呼叫

Trigger

判斷是否需要觸發計算,如果呼叫結果只是Fire的話,那麼會計算視窗並保留視窗原樣,也就是說視窗中的資料不清理,等待下次Trigger fire的時候再次執行計算。視窗中的資料會被反覆計算,直到觸發結果清理。在清理之前,視窗和資料不會釋放沒所以視窗會一直佔用記憶體。

Trigger 觸發流程:

3、當Trigger Fire了,視窗中的元素集合就會交給

Evictor

(如果指定了的話)。

Evictor 主要用來遍歷視窗中的元素列表,並決定最先進入視窗的多少個元素需要被移除

。剩餘的元素會交給使用者指定的函式進行視窗的計算。如果沒有 Evictor 的話,視窗中的所有元素會一起交給函式進行計算。

4、計算函式收到了視窗的元素(可能經過了 Evictor 的過濾),並計算出視窗的結果值,併發送給下游。視窗的結果值可以是一個也可以是多個。DataStream API 上可以接收不同型別的計算函式,包括預定義的

sum()

min()

max()

,還有

ReduceFunction

FoldFunction

,還有

WindowFunction

。WindowFunction 是最通用的計算函式,其他的預定義的函式基本都是基於該函式實現的。

5、Flink 對於一些聚合類的視窗計算(如sum,min)做了最佳化,因為聚合類的計算不需要將視窗中的所有資料都儲存下來,只需要儲存一個result值就可以了。每個進入視窗的元素都會執行一次聚合函式並修改result值。這樣可以大大降低記憶體的消耗並提升效能。但是如果使用者定義了 Evictor,則不會啟用對聚合視窗的最佳化,因為 Evictor 需要遍歷視窗中的所有元素,必須要將視窗中所有元素都存下來。

17、講一下Flink的Time概念

在Flink的流式處理中,會涉及到時間的不同概念,主要分為三種時間機制,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

- EventTime[事件時間]

事件發生的時間,例如:點選網站上的某個連結的時間,每一條日誌都會記錄自己的生成時間。

如果以EventTime為基準來定義時間視窗那將形成EventTimeWindow,要求訊息本身就應該攜帶EventTime

- IngestionTime[攝入時間]

資料進入Flink的時間,如某個Flink節點的

sourceoperator

接收到資料的時間,例如:某個source消費到kafka中的資料

如果以

IngesingtTime

為基準來定義時間視窗那將形成IngestingTimeWindow,以source的systemTime為準

- ProcessingTime[處理時間]

某個Flink節點執行某個operation的時間,例如:timeWindow處理資料時的系統時間,預設的時間屬性就是Processing Time

如果以

ProcessingTime

基準來定義時間視窗那將形成ProcessingTimeWindow,以operator的systemTime為準

在Flink的流式處理中,絕大部分的業務都會使用EventTime,一般只在EventTime無法使用時,才會被迫使用ProcessingTime或者IngestionTime。

如果要使用EventTime,那麼需要引入EventTime的時間屬性,引入方式如下所示:

18、那在API呼叫時,應該怎麼使用?

使用方式如下:

final StreamExecutionEnvironment env = StreamExecutionEnvironment。getExecutionEnvironrnent();// 使用處理時間env。setStreamTimeCharacteristic(TimeCharacteristic。ProcessingTime) ; // 使用攝入時間env。setStrearnTimeCharacteristic(TimeCharacteristic。IngestionTime);// 使用事件時間env。setStrearnTimeCharacteristic(TimeCharacteri stic Eve~tTime);

19、在流資料處理中,有沒有遇到過資料延遲等問題,透過什麼處理呢?

有遇到過資料延遲問題。舉個例子:

案例1:

假你正在去往地下停車場的路上,並且打算用手機點一份外賣。

選好了外賣後,你就用線上支付功能付款了,這個時候是11點50分。恰好這時,你走進了地下停車庫,而這裡並沒有手機訊號。因此外賣的線上支付並沒有立刻成功,而支付系統一直在Retry重試“支付”這個操作。

當你找到自己的車並且開出地下停車場的時候,已經是12點05分了。這個時候手機重新有了訊號,手機上的支付資料成功發到了外賣線上支付系統,支付完成。

在上面這個場景中你可以看到,

支付資料的事件時間是11點50分

,而

支付資料的處理時間是12點05分

案例2:

如上圖所示,某App 會記錄使用者的所有點選行為,並回傳日誌(在網路不好的情況下,先儲存在本地,延後回傳)。

A 使用者在11:02 對 App 進行操作,B使用者在11:03 操作了 App,

但是A 使用者的網路不太穩定,回傳日誌延遲了,導致我們在服務端先接受到B 使用者11:03 的訊息,然後再接受到A 使用者11:02 的訊息,

訊息亂序

了。

一般處理資料延遲、訊息亂序等問題,透過WaterMark水印來處理。

水印是用來解決資料延遲、資料亂序等問題,總結如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

水印就是一個時間戳(timestamp),Flink可以給資料流新增水印

- 水印並不會影響原有Eventtime事件時間

- 當資料流新增水印後,會按照水印時間來觸發視窗計算,

也就是說watermark水印是用來觸發視窗計算的

- 設定水印時間,會比事件時間小几秒鐘,表示最大允許資料延遲達到多久

- 水印時間 = 事件時間 - 允許延遲時間 (例如:10:09:57 = 10:10:00 - 3s )

20、WaterMark原理講解一下?

如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

視窗是10分鐘觸發一次,現在在12:00 - 12:10 有一個視窗,本來有一條資料是在12:00 - 12:10這個視窗被計算,但因為延遲,12:12到達,這時12:00 - 12:10 這個視窗就會被關閉,只能將資料下發到下一個視窗進行計算,這樣就產生了資料延遲,造成計算不準確。

現在新增一個水位線:資料時間戳為2分鐘。這時用資料產生的事件時間 12:12 -允許延遲的水印 2分鐘 = 12:10 >= 視窗結束時間 。視窗觸發計算,該資料就會被計算到這個窗口裡。

在DataStream API 中使用 TimestampAssigner 介面定義時間戳的提取行為,包含兩個子介面

AssignerWithPeriodicWatermarks

介面和

AssignerWithPunctuatedWaterMarks

介面

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

如果覺得阿周講解的知識點還滿意的話,請關注公眾號:

3分鐘秒懂大資料

,獲取更多,更全面的技術博文。並加博主微信:

threeknowbigdata,

拉你進大資料交流群。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

21、如果資料延遲非常嚴重呢?只使用WaterMark可以處理嗎?那應該怎麼解決?

使用 WaterMark+ EventTimeWindow 機制可以在一定程度上解決資料亂序的問題,但是,WaterMark 水位線也不是萬能的,在某些情況下,資料延遲會非常嚴重,即使透過Watermark + EventTimeWindow也無法等到資料全部進入視窗再進行處理,因為視窗觸發計算後,

對於延遲到達的本屬於該視窗的資料,Flink預設會將這些延遲嚴重的資料進行丟棄

那麼如果想要讓一定時間範圍的延遲資料不會被丟棄,可以使用

Allowed Lateness(允許遲到機制/側道輸出機制)

設定一個允許延遲的時間和側道輸出物件來解決

即使用

WaterMark + EventTimeWindow + Allowed Lateness方案

(包含側道輸出),可以做到資料不丟失。

API呼叫

l allowedLateness(lateness:Time)——-設定允許延遲的時間

該方法傳入一個Time值,設定允許資料遲到的時間,這個時間和watermark中的時間概念不同。再來回顧一下,

watermark=資料的事件時間-允許亂序時間值

隨著新資料的到來,watermark的值會更新為最新資料事件時間-允許亂序時間值,但是如果這時候來了一條歷史資料,watermark值則不會更新。

總的來說,watermark永遠不會倒退它是為了能接收到儘可能多的亂序資料。

那這裡的Time值呢?主要是為了等待遲到的資料,如果屬於該視窗的資料到來,仍會進行計算,後面會對計算方式仔細說明

注意:該方法只針對於基於event-time的視窗

l sideOutputLateData(outputTag:OutputTag[T])——儲存延遲資料

該方法是將遲來的資料儲存至給定的outputTag引數,而OutputTag則是用來標記延遲資料的一個物件。

l DataStream。getSideOutput(tag:OutputTag[X])——獲取延遲資料

透過window等操作返回的DataStream呼叫該方法,傳入標記延遲資料的物件來獲取延遲的資料

22、剛才提到State,那你簡單說一下什麼是State。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在Flink中,狀態被稱作state,是用來

儲存

中間的

計算結果

或者

快取資料。

根據狀態是否需要儲存中間結果,分為

無狀態計算

有狀態計算。

對於流計算而言,事件持續產生,如果每次計算

相互獨立

,不依賴上下游的事件,則相同輸入,可以得到相同輸出,

是無狀態計算

如果計算需要

依賴

於之前或者後續事件,則被稱

為有狀態計算

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

有狀態計算如 sum求和,資料類加等。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

23、Flink 狀態包括哪些?

(1)

按照由 Flink管理 還是 使用者管理,狀態可以分為

原始狀態(Raw State)

託管狀態(ManagedState)

託管狀態(ManagedState)

:由Flink 自行進行管理的State。

原始狀態(Raw State)

:由使用者自行進行管理。

兩者區別:

1。 從狀態管理方式的方式來說,Managed State 由Flink Runtime 管理,自動儲存,自動恢復,在記憶體管理上有最佳化;而 Raw State 需要使用者自己管理,需要自己序列化,Flink 不知道 State 中存入的資料是什麼結構,只有使用者自己知道,需要最終序列化為可儲存的資料結構。

2。 從狀態資料結構來說,Managed State 支援已知的資料結構,如Value、List、Map等。而 Raw State

只支援 位元組

陣列,所有狀態都要轉換為二進位制位元組陣列才可以。

3。 從推薦使用場景來說,Managed State 大多數情況下均可使用,而Raw State 是當 Managed State 不夠用時,比如需要自定義Operator 時,才會使用 Raw State。

在實際生產過程中,只推薦使用 Managed State 。

(2)State

按照是否有 key 劃分為

KeyedState

OperatorState

兩種。

keyedState特點:

1。 只能用在keyedStream上的運算元中,狀態跟特定的key繫結。

2。 keyStream流上的每一個key 對應一個state 物件。若一個operator 例項處理多個key,訪問相應的多個State,

可對應多個state。

3。 keyedState 儲存在StateBackend中

4。 透過RuntimeContext訪問,實現Rich Function介面。

5。 支援多種資料結構:ValueState、ListState、ReducingState、AggregatingState、MapState。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

OperatorState特點:

1。 可以用於所有運算元,但整個運算元只對應一個state。

2。併發改變時有多種重新分配的方式可選:均勻分配;

3。 實現CheckpointedFunction或者 ListCheckpointed 介面。

4。 目前只支援 ListState資料結構。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

這裡的fromElements會呼叫FromElementsFunction的類,其中就使用了型別為List state 的 operator state

24、Flink廣播狀態瞭解嗎?

Flink中,廣播狀態中叫作 BroadcastState。 在廣播狀態模式中使用。

所謂廣播狀態模式, 就是來自一個流的資料需要被廣播到所有下游任務

在運算元本地儲存,在處理另一個流的時候依賴於廣播的資料

。下面以一個示例來說明廣播狀態模式。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

上圖這個示例包含兩個流,一個為

kafka模型流

,該模型是透過機器學習或者深度學習訓練得到的模型,將該模型透過廣播,傳送給下游所有規則運算元,規則運算元將規則快取到Flink的本地記憶體中,另一個為

Kafka資料流

,用來接收測試集,該測試集依賴於模型流中的模型,透過模型完成測試集的推理任務。

廣播狀態(State)必須是MapState型別,廣播狀態模式需要使用廣播函式進行處理,廣播函式提供了處理廣播資料流和普通資料流的介面。

25、Flink 狀態介面包括哪些?

在Flink中使用狀態,包含兩種狀態介面:

(1)

狀態操作介面:

使用狀態物件本身儲存,寫入、更新資料。

(2)

狀態訪問介面:

從StateBackend獲取狀態物件本身。

狀態操作介面

Flink 中的

狀態操作介面

面向兩類使用者,即

應用開發者

Flink 框架本身

所有Flink設計了兩套介面

1、面向開發者State介面

面向開發的State介面只提供了對State中資料的增刪改基本操作介面,使用者無法訪問狀態的其他執行時所需要的資訊。介面體系如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

面向開發者的 State 介面體系

2、面向內部State介面

內部State 介面 是給 Flink 框架使用,提供更多的State方法,可以根據需要靈活擴充套件。除了對State中資料的訪問之外,還提供內部執行時資訊,如State中資料的序列化器,名稱空間(namespace)、名稱空間的序列化器、名稱空間合併的介面。

內部State介面命名方式為InternalxxxState。

狀態訪問介面

有了狀態之後,開發者自定義UDF時,應該如何訪問狀態?

狀態會被儲存在StateBackend中,但StateBackend 又包含不同的型別。所有Flink中抽象了兩個狀態訪問介面:

OperatorStateStore

KeyedStateStore,

使用者在編寫UDF時,就無須考慮到底是使用哪種 StateBackend型別介面。

OperatorStateStore 介面原理:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

OperatorState資料以Map形式儲存在記憶體中,並沒有使用RocksDBStateBackend和HeapKeyedStateBackend。

KeyedStateStore 介面原理:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

keyedStateStore資料使用RocksDBStateBackend或者HeapKeyedStateBackend來儲存,KeyedStateStore中建立、獲取狀態都交給了具體的StateBackend來處理,KeyedStateStore本身更像是一個代理。

26、Flink 狀態如何儲存

在Flink中, 狀態儲存被叫做

StateBackend

, 它具備兩種能力:

(1)在計算過程中提供訪問State能力,開發者在編寫業務邏輯中能夠使用StateBackend的介面讀寫資料。

(2)能夠將State持久化到外部儲存,提供容錯能力。

Flink狀態

提供三種儲存方式

(1)記憶體:

MemoryStateBackend,適用於驗證、測試、不推薦生產使用。

(2)檔案:

FSStateBackend,適用於長週期大規模的資料。

(3)RocksDB :

RocksDBStateBackend,適用於長週期大規模的資料。

上面提到的 StateBackend是面向使用者的,在Flink內部3種 State 的關係如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在執行時,

MemoryStateBackend

FSStateBackend

本地的 State 都儲存在TaskManager的記憶體中,所以其底層都依賴於HeapKeyedStateBackend。HeapKeyedStateBackend面向Flink 引擎內部,使用者無須感知。

1、記憶體型 StateBackend

MemoryStateBackend,執行時所需的State資料全部儲存在

TaskManager JVM堆上記憶體中,

KV型別的State、視窗運算元的State 使用HashTable 來儲存資料、觸發器等。

執行檢查點的時候,會把 State 的快照資料儲存到JobManager程序的記憶體中。

MemoryStateBackend 可以使用

非同步

的方式進行快照,(也可以同步),

推薦非同步

,避免阻塞運算元處理資料。

基於記憶體的 Stateßackend 在生產環境下不建議使用,可以在本地開發除錯測試 。

注意點如下 :

1) State 儲存在 JobManager 的記憶體中。受限於 JobManager的記憶體大小。

2) 每個 State預設5MB,可透過 MemoryStateBackend 建構函式調整

3) 每個 Stale 不能超過 Akka Frame 大小。

2、檔案型 StateBackend

FSStateBackend,執行時所需的State資料全部儲存在

TaskManager 的記憶體中, 執行檢查點的時候,會把 State 的快照資料儲存到配置的檔案系統中。

可以是分散式或者本地檔案系統,路徑如:

HDFS路徑:“hdfs://namenode:40010/flink/checkpoints”

本地路徑:“file:///data/flink/checkpoints”。

FSStateBackend

適用於處理大狀態、長視窗、或者大鍵值狀態的有狀態處理任務

注意點如下 :

1) State 資料首先被存在 TaskManager 的記憶體中。

2) State大小不能超過TM記憶體。

3) TM非同步將State資料寫入外部儲存。

MemoryStateBackend 和FSStateBackend 都依賴於HeapKeyedStateBackend,HeapKeyedStateBackend 使用 State儲存資料。

3、RocksDBStateBackend

RocksDBStateBackend 跟記憶體型和檔案型都不同 。

RocksDBStateBackend

使用嵌入式的本地資料庫 RocksDB

將流計算資料狀態

儲存在本地磁碟中

,不會受限於TaskManager 的記憶體大小,在執行檢查點的時候,再將整個 RocksDB 中儲存的State資料全量或者增量持久化到配置的檔案系統中,

在 JobManager 記憶體中會儲存少量的檢查點元資料。RocksDB克服了State受記憶體限制的問題,同時又能夠持久化到遠端檔案系統中,比較適合在生產中使用。

缺點:

RocksDBStateBackend 相比基於記憶體的StateBackend,訪問State的成本高很多,可能導致資料流的吞吐量劇烈下降,甚至可能

降低為原來的 1/10

適用場景

1)最適合用於處理大狀態、長視窗,或大鍵值狀態的有狀態處理任務。

2)RocksDBStateBackend 非常適合用於高可用方案。

3) RocksDBStateBackend 是

目前唯一支援增量檢查點的後端。

增量檢查點非常適用於超 大狀態的場景。

注意點

1)總 State 大小僅限於磁碟大小,不受記憶體限制

2)RocksDBStateBackend 也需要配置外部檔案系統,集中儲存State 。

3)RocksDB的 JNI API 基於 byte 陣列,單 key 和單 Value 的大小不能超過 8 位元組

4)對於使用具有合併操作狀態的應用程式,如ListState ,隨著時間可能會累積到超過 2*31次方位元組大小,這將會導致在接下來的查詢中失敗。

27、Flink 狀態如何持久化?

首選,Flink的狀態最終都要持久化到第三方儲存中,確保叢集故障或者作業掛掉後能夠恢復。

RocksDBStateBackend 持久化策略有兩種:

全量持久化策略

RocksFullSnapshotStrategy

增量持久化策略

RocksIncementalSnapshotStrategy

1、全量持久化策略

每次將全量的State寫入到狀態儲存中(HDFS)。記憶體型、檔案型、RocksDB型別的StataBackend 都支援全量持久化策略。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

快照儲存策略類體系

在執行持久化策略的時候,使用非同步機制,每個運算元啟動1個獨立的執行緒,將自身的狀態寫入分散式儲存可靠儲存中。在做持久化的過程中,狀態可能會被持續修改,

基於記憶體的狀態後端

使用 CopyOnWriteStateTable 來保證執行緒安全,RocksDBStateBackend則使用RocksDB的快照機制,使用快照來保證執行緒安全。

2、增量持久化策略

增量持久化就是每次持久化增量的State,

只有RocksDBStateBackend 支援增量持久化。

Flink 增量式的檢查點以 RocksDB為基礎

, RocksDB是一個基於LSM-Tree的KV儲存。新的資料儲存在記憶體中, 稱為

memtable

。如果Key相同,後到的資料將覆蓋之前的資料,一旦memtable寫滿了,RocksDB就會將資料壓縮並寫入磁碟。memtable的資料持久化到磁碟後,就變成了不可變的

sstable

因為 sstable 是不可變的,

Flink對比前一個檢查點建立和刪除的RocksDB sstable 檔案就可以計算出狀態有哪些發生改變。

為了確保 sstable 是不可變的,

Flink 會在RocksDB 觸發重新整理操作

強制將 memtable 重新整理到磁碟上 。

在Flink 執行檢查點時,會將新的sstable 持久化到HDFS中,同時保留引用。這個過程中 Flink 並不會持久化本地所有的sstable,因為本地的一部分歷史sstable 在之前的檢查點中已經持久化到儲存中了,只需增加對 sstable檔案的引用次數就可以。

RocksDB會在後臺合併 sstable 並刪除其中重複的資料

。然後在RocksDB刪除原來的 sstable,替換成新合成的 sstable。。

新的 sstable 包含了被刪除的 sstable中的資訊,

透過合併歷史的sstable會合併成一個新的 sstable,並刪除這些歷史sstable。 可以減少檢查點的歷史檔案,避免大量小檔案的產生。

28、Flink 狀態過期後如何清理?

1、DataStream中狀態過期

可以對DataStream中的每一個狀態設定

清理策略 StateTtlConfig

,可以設定的內容如下:

過期時間:超過多長時間未訪問,視為State過期,類似於快取。

過期時間更新策略:建立和寫時更新、讀取和寫時更新。

State可見性:未清理可用,超時則不可用。

2、Flink SQL中狀態過期

Flink SQL 一般在流Join、聚合類場景使用State,如果State不定時清理,則導致State過多,記憶體溢位。清理策略配置如下:

StreamQueryConfig qConfig = 。。。//設定過期時間為 min = 12小時 ,max = 24小時qConfig。withIdleStateRetentionTime(Time。hours(12),Time。hours(24));

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

29、Flink 透過什麼實現

可靠的容錯機制

Flink 使用 輕量級分散式快照,設計檢查點(

checkpoint

)實現可靠容錯。

30、什麼是Checkpoin檢查點?

Checkpoint

被叫做

檢查點

,是Flink實現容錯機制最核心的功能,是Flink可靠性的基石,它能夠根據配置週期性地基於Stream中各個Operator的

狀態

來生成Snapshot

快照

,從而將這些狀態資料定期持久化儲存下來,當Flink程式一旦意外崩潰時,重新執行程式時可以有選擇地從這些Snapshot進行恢復,從而修正因為故障帶來的程式資料狀態中斷。

Flink的checkpoint機制原理來自“

Chandy-Lamport algorithm

”演算法

注意:區分State和Checkpoint

1.State:

一般指一個具體的Task/Operator的狀態(operator的狀態表示一些運算元在執行的過程中會產生的一些中間結果)

State資料預設儲存在Java的堆記憶體中/TaskManage節點的記憶體中

State可以被記錄,在失敗的情況下資料還可以恢復。

2.Checkpoint:

表示了一個FlinkJob在一個特定時刻的一份全域性狀態快照,即包含了所有Task/Operator的狀態

可以理解為Checkpoint是把State資料定時持久化儲存了

比如KafkaConsumer運算元中維護的Offset狀態,當任務重新恢復的時候可以從Checkpoint中獲取。

31、什麼是Savepoin儲存點?

儲存點

在 Flink 中叫作

Savepoint

。 是基於Flink 檢查點機制的應用完整快照備份機制。 用來儲存狀態 可以在另一個叢集或者另一個時間點。從儲存的狀態中將作業恢復回來。適用 於應用升級、叢集遷移、 Flink 叢集版本更新、A/B測試以及假定場景、暫停和重啟、歸檔等場景。儲存點可以視為一個(運算元 ID -> State) 的Map,對於每一個有狀態的運算元,Key是運算元ID,Value是運算元State。

32、什麼是CheckpointCoordinator檢查點協調器?

Flink中檢查點協調器叫作

CheckpointCoordinator

,負責協調 Flink 運算元的 State 的分散式快照。當觸發快照的時候,CheckpointCoordinator向 Source 運算元中注入

Barrier

訊息 ,然後等待所有的Task通知檢查點確認完成,同時持有所有 Task 在確認完成訊息中上報的State控制代碼。

33、Checkpoint中儲存的是什麼資訊?

檢查點裡面到底儲存著什麼資訊呢?我們以flink消費kafka資料wordcount為例:

1、我們從Kafka讀取到一條條的日誌,從日誌中解析出app_id,然後將統計的結果放到記憶體中一個Map集合,app_id做為key,對應的pv做為value,每次只需要將相應app_id 的pv值+1後put到Map中即可;

2、kafka topic:test;

3、flink運算流程如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

kafka topic有且只有一個分割槽

假設kafka的topic-test只有一個分割槽,flink的Source task記錄了當前消費到kafka test topic的所有partition的offset

例:(0,1000)表示0號partition目前消費到offset為1000的資料

Flink的pv task記錄了當前計算的各app的pv值,為了方便講解,我這裡有兩個app:app1、app2

例:(app1,50000)(app2,10000)表示app1當前pv值為50000表示app2當前pv值為10000每來一條資料,只需要確定相應app_id,將相應的value值+1後put到map中即可;

該案例中,CheckPoint儲存的其實就是

第n次CheckPoint消費的offset資訊和各app的pv值資訊

,記錄一下發生CheckPoint當前的狀態資訊,並將該狀態資訊儲存到相應的狀態後端。圖下程式碼:(

注:狀態後端是儲存狀態的地方,決定狀態如何儲存,如何保障狀態高可用

,我們只需要知道,我們能從狀態後端拿到offset資訊和pv資訊即可。狀態後端必須是高可用的,否則我們的狀態後端經常出現故障,會導致無法透過checkpoint來恢復我們的應用程式)。

chk-100offset:(0,1000)pv:(app1,50000)(app2,10000)該狀態資訊表示第100次CheckPoint的時候, partition 0 offset消費到了1000,pv統計

34、當作業失敗後,檢查點如何恢復作業?

Flink提供了

應用自動恢復機制

手動作業恢復機制

應用自動恢復機制:

Flink設定有作業失敗重啟策略,包含三種:

1、定期恢復策略:fixed-delay

固定延遲重啟策略會嘗試一個給定的次數來重啟Job,如果超過最大的重啟次數,Job最終將失敗,在連續兩次重啟嘗試之間,重啟策略會等待一個固定時間,預設Integer。MAX_VALUE次

2、失敗比率策略:failure-rate

失敗率重啟策略在job失敗後重啟,但是超過失敗率後,Job會最終被認定失敗,在兩個連續的重啟嘗試之間,重啟策略會等待一個固定的時間。

3、直接失敗策略:None 失敗不重啟

手動作業恢復機制

因為Flink檢查點目錄分別對應的是JobId,每透過flink run 方式/頁面提交方式恢復都會重新生成 jobId,Flink 提供了在啟動之時透過設定

-s

。引數指定檢查點目錄的功能,讓新的 jobld 讀取該檢查點元檔案資訊和狀態資訊,從而達到指定時間節點啟動作業的目的。

啟動方式如下:

/bin/flink -s /flink/checkpoints/03112312a12398740a87393/chk-50/_metadata

35、當作業失敗後,從儲存點如何恢復作業?

從儲存點恢復作業並不簡單,尤其是在作業變更(如修改邏輯、修復 bug) 的情況下, 需要考慮如下幾點:

(1)運算元的順序改變

如果對應的 UID 沒變,則可以恢復,如果對應的 UID 變了恢復失敗。

(2)作業中添加了新的運算元

如果是無狀態運算元,沒有影響,可以正常恢復,如果是有狀態的運算元,跟無狀態的運算元 一樣處理。

(3)從作業中刪除了一個有狀態的運算元

預設需要恢復儲存點中所記錄的所有運算元的狀態,如果刪除了一個有狀態的運算元,從儲存點回復的時候被刪除的OperatorID找不到,所以會報錯 可以透過在命令中新增

—— allowNonReStoredSlale (short: -n )跳過無法恢復的運算元 。

(4)新增和刪除無狀態的運算元

如果手動設定了 UID 則可以恢復,儲存點中不記錄無狀態的運算元 如果是自動分配的 UID ,那麼有狀態運算元的可能會變( Flink 一個單調遞增的計數器生成 UID,DAG 改版,計數器極有可能會變) 很有可能恢復失敗。

36、Flink如何實現輕量級非同步分散式快照?

要實現分散式快照,最關鍵的是能夠將資料流切分。

Flink 中使用 Barrier (屏障)來切分資料 流。

Barrierr 會週期性地注入資料流中,作為資料流的一部分,從上游到下游被運算元處理。Barrier 會嚴格保證順序,不會超過其前邊的資料。Barrier 將記錄分割成記錄集,

兩個 Barrier 之間的資料流中的資料隸屬於同一個檢查點。每一個 Barrier 都攜帶一個其所屬快照的 ID 編號。

Barrier 隨著資料向下流動,不會打斷資料流,因此非常輕量。 在一個數據流中,可能會存在多個隸屬於不同快照的 Barrier ,併發非同步地執行分散式快照,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Barrier 會在資料流源頭被注人並行資料流中。

Barrier n所在的位置就是恢復時資料重新處理的起始位置。

例如,在Kafka中,這個位置就是最後一個記錄在分割槽內的偏移量 ( offset) ,作業恢復時,會根據這個位置從這個偏移量之後向 kafka 請求資料 這個偏移量就是State中儲存的內容之一。

Barrier 接著向下遊傳遞。當一個非資料來源運算元從所有的輸入流中收到了快照 n 的Barrier時,該運算元就會對自己的 State 儲存快照,並向自己的下游

廣播 傳送

快照 n 的 Barrier。一旦Sink 運算元接收到 Barrier ,有兩種情況:

(1)如果是引擎內嚴格一次處理保證,

當 Sink 運算元已經收到了所有上游的 Barrie n 時, Sink 運算元對自己的 State 進行快照,然後通知檢查點協調器( CheckpointCoordinator) 。當所有 的運算元都向檢查點協調器彙報成功之後,檢查點協調器向所有的運算元確認本次快照完成。

(2)如果是端到端嚴格一次處理保證,

當 Sink 運算元已經收到了所有上游的 Barrie n 時, Sink 運算元對自己的 State 進行快照,

並預提交事務(兩階段提交的第一階段)

,再通知檢查點協調器( CheckpointCoordinator) ,檢查點協調器向所有的運算元確認本次快照完成,Sink 運算元

提交事務(兩階段提交的第二階段)

,本次事務完成。

我們接著 33 的案例來具體說一下如何執行分散式快照:

對應到pv案例中就是

,Source Task接收到JobManager的編號為

chk-100(從最近一次恢復)

的CheckPoint觸發請求後,發現自己恰好接收到kafka offset(0,1000)處的資料,所以會往offset(0,1000)資料之後offset(0,1001)資料之前安插一個barrier,然後自己開始做快照,也就是將offset(0,1000)儲存到狀態後端chk-100中。然後barrier接著往下游傳送,當統計pv的task接收到barrier後,也會暫停處理資料,將自己記憶體中儲存的pv資訊(app1,50000)(app2,10000)儲存到狀態後端chk-100中。OK,flink大概就是透過這個原理來儲存快照的;

統計pv的task接收到barrier,就意味著barrier之前的資料都處理了,所以說,不會出現丟資料的情況。

37、什麼是Barrier對齊?

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

一旦Operator從輸入流接收到CheckPoint barrier n,它就不能處理來自該流的任何資料記錄,直到它從其他所有輸入接收到barrier n為止。否則,它會混合屬於快照n的記錄和屬於快照n + 1的記錄;

如上圖所示:

圖1,運算元收到數字流的Barrier,字母流對應的barrier尚未到達

圖2,運算元收到數字流的Barrier,會繼續從數字流中接收資料,但這些流只能被擱置,記錄不能被處理,而是放入快取中,等待字母流 Barrier到達。在字母流到達前, 1,2,3資料已經被快取。

圖3,字母流到達,運算元開始對齊State進行非同步快照,並將Barrier向下遊廣播,並不等待快照執行完畢。

圖4,運算元做非同步快照,首先處理快取中積壓資料,然後再從輸入通道中獲取資料。

38、什麼是Barrier不對齊?

checkpoint 是要等到所有的barrier全部都到才算完成

上述圖2中,當還有其他輸入流的barrier還沒有到達時,會把已到達的barrier之後的資料1、2、3擱置在緩衝區,等待其他流的barrier到達後才能處理。

barrier不對齊:就是指當還有其他流的barrier還沒到達時,為了不影響效能,也不用理會,直接處理barrier之後的資料。等到所有流的barrier的都到達後,就可以對該Operator做CheckPoint了;

39、為什麼要進行barrier對齊?不對齊到底行不行?

答:Exactly Once時必須barrier對齊,如果barrier不對齊就變成了At Least Once;

CheckPoint的目的就是為了儲存快照

,如果不對齊,那麼在chk-100快照之前,已經處理了一些chk-100 對應的offset之後的資料,當程式從chk-100恢復任務時,chk-100對應的offset之後的資料還會被處理一次,所以就出現了重複消費。

40、Flink支援Exactly-Once語義,那什麼是Exactly-Once?

Exactly-Once語義 : 指

端到端

的一致性,從

資料讀取

引擎計算

寫入外部儲存的

整個過程中,

即使機器或軟體出現故障,

都確保資料僅處理一次,不會重複、也不會丟失。

41、要實現Exactly-Once,需具備什麼條件?

流系統要實現Exactly-Once,需要保證上游 Source 層、中間計算層和下游 Sink 層三部分同時滿足

端到端嚴格一次處理,如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink端到端嚴格一次處理

Source端:

資料從上游進入Flink,必須保證訊息嚴格一次消費。同時Source 端必須滿足可重放(replay)。否則 Flink 計算層收到訊息後未計算,卻發生 failure 而重啟,訊息就會丟失。

Flink計算層:

利用 Checkpoint 機制,把狀態資料定期持久化儲存下來,Flink程式一旦發生故障的時候,可以選擇狀態點恢復,避免資料的丟失、重複。

Sink端:

Flink將處理完的資料傳送到Sink端時,透過

兩階段提交協議 ,

即 TwoPhaseCommitSinkFunction 函式。該 SinkFunction 提取並封裝了兩階段提交協議中的公共邏輯,保證Flink 傳送Sink端時實現嚴格一次處理語義。

同時:

Sink端必須支援

事務機制

,能夠進行資料

回滾

或者滿足

冪等性。

回滾機制:

即當作業失敗後,能夠將部分寫入的結果回滾到之前寫入的狀態。

冪等性:

就是一個相同的操作,無論重複多少次,造成的結果和只操作一次相等。即當作業失敗後,寫入部分結果,但是當重新寫入全部結果時,不會帶來負面結果,重複寫入不會帶來錯誤結果。

42、什麼是兩階段提交協議?

兩階段提交協議(Two -Phase Commit,2PC)是解決分散式事務問題最常用的方法,它可以保證在分散式事務中,要麼所有參與程序都提交事務,要麼都取消,即實現ACID中的 A(原子性)。

兩階段提交協議中 有兩個重要角色

,協調者(Coordinator)

參與者(Participant)

,其中協調者只有一個,起到分散式事務的協調管理作用,參與者有多個。

兩階段提交階段分為兩個階段:

投票階段(Voting)

提交階段(Commit)。

投票階段:

(1)協調者向所有參與者

傳送 prepare 請求

和事務內容,詢問是否可以準備事務提交,等待參與者的相應。

(2)參與者執行事務中包含的操作,並記錄 undo 日誌(用於回滾)和 redo 日誌(用於重放),但不真正提交。

(3)參與者向協調者返回事務操作的執行結果,執行成功返回yes,失敗返回no。

提交階段:

分為成功與失敗兩種情況。

若所有參與者都返回 yes,說明事務可以提交:

協調者向所有參與者傳送 commit 請求。

參與者收到 commit 請求後,將事務真正地提交上去,並釋放佔用的事務資源,並向協調者返回 ack 。

協調者收到所有參與者的 ack 訊息,事務成功完成,如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

若有參與者返回 no 或者超時未返回,說明事務中斷,需要回滾:

協調者向所有參與者傳送rollback請求。

參與者收到rollback請求後,根據undo日誌回滾到事務執行前的狀態,釋放佔用的事務資源,並向協調者返回ack。

協調者收到所有參與者的ack訊息,事務回滾完成。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

43、Flink 如何保證 Exactly-Once 語義?

Flink透過兩階段提交協議來保證Exactly-Once語義。

對於Source端:

Source端嚴格一次處理比較簡單,因為資料要進入Flink 中,所以Flink 只需要儲存消費資料的偏移量 (offset)即可。如果Source端為 kafka,Flink 將 Kafka Consumer 作為 Source,可以將偏移量儲存下來,如果後續任務出現了故障,恢復的時候可以由聯結器重置偏移量,重新消費資料,保證一致性。

對於 Sink 端:

Sink 端是最複雜的,因為資料是落地到其他系統上的,資料一旦離開 Flink 之後,Flink 就監控不到這些資料了,所以嚴格一次處理語義必須也要應用於 Flink 寫入資料的外部系統,

故這些外部系統必須提供一種手段允許提交或回滾這些寫入操作

,同時還要保證與 Flink Checkpoint 能夠協調使用(

Kafka 0.11 版本已經實現精確一次處理語義

)。

我們以

Kafka - Flink -Kafka

為例 說明如何保證Exactly-Once語義。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

如上圖所示:Flink作業包含以下運算元。

(1)一個Source運算元,從Kafka中讀取資料(即KafkaConsumer)

(2)一個視窗運算元,基於時間視窗化的聚合運算(即window+window函式)

(3)一個Sink運算元,將結果寫會到Kafka(即kafkaProducer)

Flink使用兩階段提交協議

預提交(Pre-commit)

階段和

提交(Commit)階段保證端到端嚴格一次。

(1)預提交階段

1、當Checkpoint 啟動時,進入預提交階段

,JobManager 向Source Task 注入檢查點分界線(CheckpointBarrier),Source Task 將 CheckpointBarrier 插入資料流,向下遊廣播開啟本次快照,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

預處理階段: Checkpoint 啟動

2、Source 端:Flink Data Source 負責儲存 KafkaTopic 的 offset偏移量

,當 Checkpoint 成功時 Flink 負責提交這些寫入,否則就終止取消掉它們,當 Checkpoint 完成位移儲存,它會將 checkpoint barrier(檢查點分界線) 傳給下一個 Operator,然後每個運算元會對當前的狀態做個快照,儲存到

狀態後端(State Backend)

對於 Source 任務而言,就會把當前的 offset 作為狀態儲存起來。下次從 Checkpoint 恢復時,Source 任務可以重新提交偏移量,從上次儲存的位置開始重新消費資料,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

預處理階段:checkpoint barrier傳遞 及 offset 儲存

3、Slink 端:

從 Source 端開始,每個內部的 transformation 任務遇到 checkpoint barrier(檢查點分界線)時,都會把狀態存到 Checkpoint 裡。資料處理完畢到 Sink 端時,Sink 任務首先把資料寫入外部 Kafka,

這些資料都屬於預提交的事務(還不能被消費)

此時的 Pre-commit 預提交階段下Data Sink 在儲存狀態到狀態後端的同時還必須預提交它的外部事務,

如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

預處理階段:預提交到外部系統

(2)提交階段

4、當所有運算元任務的快照完成

(所有建立的快照都被視為是 Checkpoint 的一部分),

也就是這次的 Checkpoint 完成時

JobManager 會向所有任務發通知,確認這次 Checkpoint 完成,此時 Pre-commit 預提交階段才算完成

。才正式到兩階段提交協議的

第二個階段:commit 階段

。該階段中 JobManager 會為應用中每個 Operator 發起 Checkpoint 已完成的回撥邏輯。

本例中的 Data Source 和視窗操作無外部狀態,因此在該階段,這兩個 Opeartor 無需執行任何邏輯,

但是 Data Sink 是有外部狀態的,此時我們必須提交外部事務

,當 Sink 任務收到確認通知,就會正式提交之前的事務,Kafka 中未確認的資料就改為“已確認”,資料就真正可以被消費了,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

提交階段:資料精準被消費

注:Flink 由 JobManager 協調各個 TaskManager 進行 Checkpoint 儲存,Checkpoint 儲存在 StateBackend(狀態後端) 中,預設 StateBackend 是記憶體級的,也可以改為檔案級的進行持久化儲存。

44、數的很好,很清楚,那你對Flink 端到端 嚴格一次Exactly-Once 語義做個總結

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

如果覺得阿周講解的知識點還滿意的話,請關注公眾號:

3分鐘秒懂大資料

,獲取更多,更全面的技術博文。並加博主微信:

threeknowbigdata,

拉你進大資料交流群。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

45、Flink廣播機制瞭解嗎?

如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)從圖中可以理解

廣播

就是一個

公共的共享變數,

廣播變數是發給

TaskManager的記憶體中,

所以廣播變數不應該太大,將一個數據集廣播後,不同的

Task

都可以在節點上獲取到,每個節點只存一份。 如果不使用廣播,每一個Task都會複製一份資料集,造成記憶體資源浪費 。

46、Flink反壓瞭解嗎?

反壓(backpressure)是實時計算應用開發中,特別是流式計算中,十分常見的問題。

反壓

意味著資料管道中某個節點成為瓶頸,

下游處理速率 跟不上 上游傳送資料的速率

,而需要對上游進行限速。由於實時計算應用通常使用訊息佇列來進行生產端和消費端的解耦,消費端資料來源是 pull-based 的,所以反壓通常是從某個節點傳導至資料來源並降低資料來源(比如 Kafka consumer)的攝入速率。

簡單來說就是

下游處理速率 跟不上 上游傳送資料的速率

下游來不及消費,導致佇列被佔滿後,上游的生產會被阻塞,最終導致資料來源的攝入被阻塞。

47、Flink反壓的影響有哪些?

反壓會影響到兩項指標:

checkpoint 時長

state 大小

(1)前者是因為 checkpoint barrier 是不會越過普通資料的,

資料處理被阻塞

也會

導致

checkpoint barrier 流經

整個資料管道的時長變長

,因而 checkpoint 總體時間(End to End Duration)變長。

(2)後者是因為為保證 EOS(Exactly-Once-Semantics,準確一次),對於有兩個以上輸入管道的 Operator,checkpoint barrier 需要對齊(Alignment),接受到較快的輸入管道的 barrier 後,它後面資料會被快取起來但不處理,直到較慢的輸入管道的 barrier 也到達,這些被快取的資料會被放到state 裡面,導致 checkpoint 變大。

這兩個影響對於生產環境的作業來說是十分危險的

,因為 checkpoint 是保證資料一致性的關鍵,checkpoint 時間變長有可能導致 checkpoint 超時失敗,而 state 大小同樣可能拖慢 checkpoint 甚至導致 OOM (使用 Heap-based StateBackend)或者物理記憶體使用超出容器資源(使用 RocksDBStateBackend)的穩定性問題。

48、Flink反壓如何解決?

Flink社群提出了 FLIP-76: Unaligned Checkpoints[4] 來解耦反壓和 checkpoint。

(1)定位反壓節點

要解決反壓首先要做的是定位到造成反壓的節點,這主要有兩種辦法:

透過 Flink Web UI 自帶的反壓監控面板;

透過 Flink Task Metrics。

(1)反壓監控面板

Flink Web UI 的反壓監控提供了 SubTask 級別的反壓監控,原理是透過週期性對 Task 執行緒的棧資訊取樣,得到執行緒被阻塞在請求 Buffer(意味著被下游佇列阻塞)的頻率來判斷該節點是否處於反壓狀態。預設配置下,這個頻率在 0。1 以下則為 OK,0。1 至 0。5 為 LOW,而超過 0。5 則為 HIGH。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(2)Task Metrics

Flink 提供的 Task Metrics 是更好的反壓監控手段

如果一個 Subtask 的傳送端 Buffer 佔用率很高,則表明它被下游反壓限速了;

如果一個 Subtask 的接受端 Buffer 佔用很高,則表明它將反壓傳導至上游。

49、Flink支援的資料型別有哪些?

Flink支援的資料型別如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

從圖中可以看到 Flink 型別可以分為基礎型別(Basic)、陣列(Arrays)、複合型別(Composite)、輔助型別(Auxiliary)、泛型和其它型別(Generic)。Flink 支援任意的 Java 或是 Scala 型別。

50、Flink如何進行序列和反序列化的?

所謂序列化和反序列化的含義:

序列化:

就是將一個記憶體物件轉換成二進位制串,形成網路傳輸或者持久化的資料流。

反序列化:

將二進位制串轉換為記憶體對。

TypeInformation 是 Flink 型別系統的核心類

在Flink中,當資料需要進行序列化時,會使用TypeInformation的生成序列化器介面呼叫一個 createSerialize() 方法,創建出TypeSerializer,TypeSerializer提供了序列化和反序列化能力。如下圖所示:Flink 的序列化過程

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

對於大多數資料型別 Flink 可以自動生成對應的序列化器,能非常高效地對資料集進行序列化和反序列化 ,如下圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

比如,BasicTypeInfo、WritableTypeIno ,但針對 GenericTypeInfo 型別,Flink 會使用 Kyro 進行序列化和反序列化。其中,Tuple、Pojo 和 CaseClass 型別是複合型別,它們可能巢狀一個或者多個數據型別。在這種情況下,它們的序列化器同樣是複合的。它們會將內嵌型別的序列化委託給對應型別的序列化器。

透過一個案例介紹Flink序列化和反序列化:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

如上圖所示,當建立一個Tuple 3 物件時,包含三個層面,一是 int 型別,一是 double 型別,還有一個是 Person。Person物件包含兩個欄位,一是 int 型的 ID,另一個是 String 型別的 name,

(1)在序列化操作時,會委託相應具體序列化的序列化器進行相應的序列化操作。從圖中可以看到 Tuple 3 會把 int 型別透過 IntSerializer 進行序列化操作,此時 int 只需要佔用四個位元組。

(2)Person 類會被當成一個 Pojo 物件來進行處理,PojoSerializer 序列化器會把一些屬性資訊使用一個位元組儲存起來。同樣,其欄位則採取相對應的序列化器進行相應序列化,在序列化完的結果中,可以看到所有的資料都是由 MemorySegment 去支援。

MemorySegment 具有什麼作用呢?

MemorySegment 在 Flink 中會將物件序列化到預分配的記憶體塊上,它代表 1 個固定長度的記憶體,預設大小為 32 kb。MemorySegment 代表 Flink 中的一個最小的記憶體分配單元,相當於是 Java 的一個 byte 陣列。每條記錄都會以序列化的形式儲存在一個或多個 MemorySegment 中。

51、為什麼Flink使用自主記憶體而不用JVM記憶體管理?

因為在記憶體中儲存大量的資料 (包括快取和高效處理)時,JVM會面臨很多問題,包括如下:

JVM 記憶體管理的不足:

1)Java 物件儲存密度低。

Java 的物件在記憶體中儲存包含 3 個主要部分:物件頭、例項 資料、對齊填充部分。例如,一個只包含 boolean 屬性的物件佔 16byte:物件頭佔 8byte, boolean 屬性佔 1byte,為了對齊達到 8 的倍數額外佔 7byte。而實際上只需要一個 bit(1/8 位元組)就夠了。

2)Full GC 會極大地影響效能。

尤其是為了處理更大資料而開了很大記憶體空間的 JVM 來說,GC 會達到秒級甚至分鐘級。

3)OOM 問題影響穩定性。

OutOfMemoryError 是分散式計算框架經常會遇到的問題, 當JVM中所有物件大小超過分配給JVM的記憶體大小時,就會發生OutOfMemoryError錯誤, 導致 JVM 崩潰,分散式框架的健壯性和效能都會受到影響。

4)快取未命中問題。

CPU 進行計算的時候,是從 CPU 快取中獲取資料。現代體系的 CPU 會有多級快取,而載入的時候是以 Cache Line 為單位載入。如果能夠將物件連續儲存, 這樣就會大大降低 Cache Miss。使得 CPU 集中處理業務,而不是空轉。

52、那Flink自主記憶體是如何管理物件的?

Flink 並不是將大量物件存在堆記憶體上,而是將物件都序列化到一個預分配的記憶體塊上, 這個記憶體塊叫做

MemorySegment

,它代表了一段固定長度的記憶體(預設大小為 32KB),也 是

Flink 中最小的記憶體分配單元

,並且提供了非常高效的讀寫方法,很多運算可以直接操作 二進位制資料,不需要反序列化即可執行。每條記錄都會以序列化的形式儲存在一個或多個 MemorySegment 中。如果需要處理的資料多於可以儲存在記憶體中的資料,Flink 的運算子會 將部分資料溢位到磁碟

53、Flink記憶體模型介紹一下?

Flink總體記憶體類圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

主要包含

JobManager記憶體模型

TaskManager記憶體模型

JobManager記憶體模型

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在 1。10 中,Flink 統一了 TM 端的記憶體管理和配置,相應的在 1。11 中,Flink 進一步 對 JM 端的記憶體配置進行了修改,使它的選項和配置方式與 TM 端的配置方式保持一致。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

TaskManager記憶體模型

Flink 1.10

對 TaskManager 的記憶體模型和 Flink 應用程式的配置選項進行了

重大更改

, 讓使用者能夠更加嚴格地控制其記憶體開銷。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

JVM Heap:JVM 堆上記憶體

1、Framework Heap Memory:

Flink 框架本身使用的記憶體,即 TaskManager 本身所 佔用的堆上記憶體,不計入 Slot 的資源中。

配置引數:taskmanager。memory。framework。heap。size=128MB,預設 128MB

2、Task Heap Memory:

Task 執行使用者程式碼時所使用的堆上記憶體。

配置引數:taskmanager。memory。task。heap。size

Off-Heap Mempry:JVM 堆外記憶體

1、DirectMemory:JVM 直接記憶體

1)Framework Off-Heap Memory:Flink框架本身所使用的記憶體,即TaskManager 本身所佔用的對外記憶體,不計入 Slot 資源。

配置參

數:

taskmanager。memory。framework。off-heap。size=128MB

,預設 128

MB

2)Task Off-Heap Memory:Task 執行使用者程式碼所使用的對外記憶體。

配置引數:

taskmanager。memory。task。off-heap。size=0

,預設 0

3)Network Memory:網路資料交換所使用的堆外記憶體大小,如網路資料交換 緩衝區

2、Managed Memory:Flink 管理的堆外記憶體

用於排序、雜湊表、快取中間結果及 RocksDB State Backend 的本地記憶體。

JVM specific memory:JVM 本身使用的記憶體

1、JVM metaspace:JVM 元空間

2、JVM over-head 執行開銷:

JVM 執行時自身所需要的內容,包括執行緒堆疊、IO、 編譯快取等所使用的記憶體。

配置引數:taskmanager。memory。jvm-overhead。min=192mb

taskmanager。memory。jvm-overhead。max=1gb

taskmanager。memory。jvm-overhead。fraction=0。1

總體記憶體

1、總程序記憶體:

Flink Java 應用程式(包括使用者程式碼)和 JVM 執行整個程序所消 耗的總記憶體。

總程序記憶體 = Flink 使用記憶體 + JVM 元空間 + JVM 執行開銷

配置項:taskmanager。memory。process。size: 1728m

2、Flink 總記憶體:

僅 Flink Java 應用程式消耗的記憶體,包括使用者程式碼,但不包括 JVM 為其執行而分配的記憶體。

Flink 使用記憶體:框架堆內外 + task 堆內外 + network + manage

54、Flink如何進行資源管理的?

Flink在資源管理上可以分為兩層:

叢集資源

自身資源

。叢集資源支援主流的資源管理系統,如yarn、mesos、k8s等,也支援獨立啟動的standalone叢集。自身資源涉及到每個子task的資源使用,由Flink自身維護。

1 叢集架構剖析

Flink的執行主要由 客戶端、一個JobManager(後文簡稱JM)和 一個以上的TaskManager(簡稱TM或Worker)組成。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

客戶端

客戶端主要用於提交任務到叢集,在Session或Per Job模式中,客戶端程式還要負責解析使用者程式碼,生成JobGraph;在Application模式中,直接提交使用者jar和執行引數即可。客戶端一般支援兩種模式:detached模式,客戶端提交後自動退出。attached模式,客戶端提交後阻塞等待任務執行完畢再退出。

JobManager

JM負責決定應用何時排程task,在task執行結束或失敗時如何處理,協調檢查點、故障恢復。該程序主要由下面幾個部分組成:

1 ResourceManager

,負責資源的申請和釋放、管理slot(Flink叢集中最細粒度的資源管理單元)。Flink實現了多種RM的實現方案以適配多種資源管理框架,如yarn、mesos、k8s或standalone。在standalone模式下,RM只能分配slot,而不能啟動新的TM。注意:這裡所說的RM跟Yarn的RM不是一個東西,這裡的RM是JM中的一個獨立的服務。

2 Dispatcher

,提供Flink提交任務的rest介面,為每個提交的任務啟動新的JobMaster,為所有的任務提供web ui,查詢任務執行狀態。

3 JobMaster

,負責管理執行單個JobGraph,多個任務可以同時在一個叢集中啟動,每個都有自己的JobMaster。注意這裡的JobMaster和JobManager的區別。

TaskManager

TM也叫做worker,用於執行資料流圖中的任務,快取並交換資料。叢集至少有一個TM,TM中最小的資源管理單元是Slot,每個Slot可以執行一個Task,因此TM中slot的數量就代表同時可以執行任務的數量。

2 Slot與資源管理

每個TM是一個獨立的JVM程序,內部基於獨立的執行緒執行一個或多個任務。TM為了控制每個任務的執行資源,使用task slot來進行管理。每個task slot代表TM中的一部分固定的資源,比如一個TM有3個slot,每個slot將會得到TM的1/3記憶體資源。不同任務之間不會進行資源的搶佔,注意GPU目前沒有進行隔離,目前slot只能劃分記憶體資源。

比如下面的資料流圖,在擴充套件成並行流圖後,同一的task可能分拆成多個任務並行在叢集中執行。操作鏈可以把多個不同的任務進行合併,從而支援在一個執行緒中先後執行多個任務,無需頻繁釋放申請執行緒。同時操作鏈還可以統一快取資料,增加資料處理吞吐量,降低處理延遲。

在Flink中,想要不同子任務合併需要滿足幾個條件:下游節點的入邊是1(保證不存在資料的shuffle);子任務的上下游不為空;連線策略總是ALWAYS;分割槽型別為ForwardPartitioner;並行度一致;當前Flink開啟Chain特性。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在叢集中的執行圖可能如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink也支援slot的共享,即把不同任務根據任務的依賴關係分配到同一個Slot中。這樣帶來幾個好處:方便統計當前任務所需的最大資源配置(某個子任務的最大並行度);避免Slot的過多申請與釋放,提升Slot的使用效率。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

透過Slot共享,就有可能某個Slot中包含完整的任務執行鏈路。

3 應用執行

一個Flink應用就是使用者編寫的main函式,其中可能包含一個或多個Flink的任務。這些任務可以在本地執行,也可以在遠端叢集啟動,叢集既可以長期執行,也支援獨立啟動。下面是目前支援的任務提交方案:

Session叢集

生命週期

:叢集事先建立並長期執行,客戶端提交任務時與該叢集連線。即使所有任務都執行完畢,叢集仍會保持執行,除非手動停止。因此叢集的生命週期與任務無關。

資源隔離

:TM的slot由RM申請,當上面的任務執行完畢會自動進行釋放。由於多個任務會共享相同的叢集,因此任務間會存在競爭,比如網路頻寬等。如果某個TM掛掉,上面的所有任務都會失敗。

其他方面

:擁有提前建立的叢集,可以避免每次使用的時候過多考慮叢集問題。比較適合那些執行時間很短,對啟動時間有比較高的要求的場景,比如互動式查詢分析。

Per Job叢集

生命週期

:為每個提交的任務單獨建立一個叢集,客戶端在提交任務時,直接與ClusterManager溝通申請建立JM並在內部執行提交的任務。TM則根據任務執行需要的資源延遲申請。一旦任務執行完畢,叢集將會被回收。

資源隔離

:任務如果出現致命問題,僅會影響自己的任務。

其他方面

:由於RM需要申請和等待資源,因此啟動時間會稍長,適合單個比較大、長時間執行、需要保證長期的穩定性、不在乎啟動時間的任務。

Application叢集

生命週期

:與Per Job類似,只是main()方法執行在叢集中。任務的提交程式很簡單,不需要啟動或連線叢集,而是直接把應用程式打包到資源管理系統中並啟動對應的EntryPoint,在EntryPoint中呼叫使用者程式的main()方法,解析生成JobGraph,然後啟動執行。叢集的生命週期與應用相同。

資源隔離

:RM和Dispatcher是應用級別。

03、Flink 原始碼篇

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

55、FLink作業提交流程應該瞭解吧?

Flink的提交流程:

1。在Flink Client中,透過反射啟動jar中的main函式,生成Flink StreamGraph和JobGraph,將JobGraph提交給Flink叢集

2。Flink叢集收到JobGraph(JobManager收到)後,將JobGraph翻譯成ExecutionGraph,然後開始排程,啟動成功之後開始消費資料。

總結來說:Flink核心執行流程,對使用者API的呼叫可以轉為 StreamGraph ——>JobGraph —— > ExecutionGraph。

56、FLink作業提交分為幾種方式?

Flink的作業提交分為兩種方式

1.Local 方式:

即本地提交模式,直接在IDEA執行程式碼。

2.遠端提交方式:

分為Standalone方式、yarn方式、K8s方式

Yarn 方式分為三種提交模式:

Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式

57、FLink JobGraph是在什麼時候生成的?

StreamGraph、JobGraph全部是在Flink Client 客戶端生成的,即提交叢集之前生成,原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

58、那在jobGraph提交叢集之前都經歷哪些過程?

(1)使用者透過啟動Flink叢集,使用命令列提交作業,執行 flink run -c WordCount xxx。jar

(2)執行命令列後,會透過run指令碼呼叫CliFrontend入口,CliFrontend會觸發使用者提交的jar檔案中的main方法,然後交給PipelineExecuteor # execute方法,最終根據提交的模式選擇觸發一個具體的PipelineExecutor執行。

(3)根據具體的PipelineExecutor執行,將對使用者的程式碼進行編譯生成streamGraph,經過最佳化後生成jobgraph。

具體流程圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

59、看你提到PipeExecutor,它有哪些實現類?

(1)

PipeExecutor

在Flink中被叫做 流水線執行器,它是一個介面,是

Flink Client

生成

JobGraph

之後,將作業提交給叢集的重要環節,前面說過,作業提交到叢集有好幾種方式,最常用的是

yarn

方式,

yarn

方式包含

3

種提交模式,主要使用

session

模式,

perjob

模式。Application模式 jaobGraph是在叢集中生成。

所以

PipeExecutor

的實現類如下圖所示:(在程式碼中按

CTRL+H

就會出來)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

除了上述框的兩種模式外,在IDEA環境中執行Flink MiniCluster 進行除錯時,使用LocalExecutor。

60、Local提交模式有啥特點,怎麼實現的?

(1)

Local

是在本地IDEA環境中執行的提交方式。不上叢集。主要用於除錯,原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

1。 Flink程式由JobClient進行提交

2。 JobClient將作業提交給JobManager

3。 JobManager負責協調資源分配和作業執行。資源分配完成後,任務將提交給相應的TaskManager

4。 TaskManager啟動一個執行緒開始執行,TaskManager會向JobManager報告狀態更改,如開始執 行,正在進行或者已完成。

5。 作業執行完成後,結果將傳送回客戶端。

原始碼分析:透過Flink1.12.2原始碼進行分析的

(1)建立獲取對應的StreamExecutionEnvironment物件

:LocalStreamEnvironment

呼叫StreamExecutionEnvironment物件的execute方法

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(2)獲取streamGraph

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(3)執行具體的PipeLineExecutor - >得到localExecutorFactory

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(4) 獲取JobGraph

根據localExecutorFactory的實現類LocalExecutor生成JobGraph

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

上面這部分全部是在Flink Client生成的,由於是使用Local模式提交。所有接下來將建立MiniCluster叢集,由miniCluster。submitJob指定要提交的jobGraph

(5)例項化MiniCluster叢集

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(6)返回JobClient 客戶端

在上面執行miniCluster。submitJob 將JobGraph提交到本地集群后,會返回一個JobClient客戶端,該JobClient包含了應用的一些詳細資訊,包括JobID,應用的狀態等等。最後返回到程式碼執行的上一層,對應類為StreamExecutionEnvironment。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

以上就是Local模式的原始碼執行過程。

61、遠端提交模式都有哪些?

遠端提交方式:

分為Standalone方式、yarn方式、K8s方式

Standalone:

包含session模式

Yarn 方式分為三種提交模式:

Yarn-perJob模式、Yarn-Sessionmo模式、Yarn-Application模式。

K8s方式:

包含 session模式

62、Standalone模式簡單介紹一下?

Standalone 模式為Flink叢集的單機版提交方式,只使用一個節點進行提交,常用Session模式。

作業提交原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

提交命令如下:

bin/flink run org。apache。flink。WordCount xxx。jar

client客戶端提交任務給JobManager

JobManager負責申請任務執行所需要的資源並管理任務和資源,

JobManager分發任務給TaskManager執行

TaskManager定期向JobManager彙報狀態

63、yarn叢集提交方式介紹一下?

透過yarn叢集提交分為3種提交方式:分別為session模式、perjob模式、application模式

64、yarn - session模式特點?

提交命令如下:

。/bin/flink run -t yarn-session \-Dyarn。application。id=application_XXXX_YY xxx。jar

Yarn-Session模式:

所有

作業共享叢集資源

,隔離性差,JM負載瓶頸,main方法在客戶端執行。適合執行時間短,頻繁執行的短任務,叢集中的所有作業

只有一個JobManager

,另外,

Job被隨機分配給TaskManager

特點:

Session-Cluster模式需要先啟動叢集

然後再提交作業

,接著會向yarn申請一塊空間後,資源永遠保持不變。如果資源滿了,下一個作業就無法提交,只能等到 yarn中的其中一個作業執行完成後,釋放了資源,下個作業才會正常提交。所有作業共享Dispatcher和ResourceManager;共享資源;適合規模小執行時間短的 作業。

原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

65、yarn - perJob模式特點?

提交命令:

。/bin/flink run -t yarn-per-job ——detached xxx。jar

Yarn-Per-Job模式:每個作業單獨啟動叢集

,隔離性好,JM負載均衡,main方法在客戶端執行。在per-job模式下,每個Job都有一個JobManager,每個TaskManager只有單個Job。

特點:

一個任務會對應一個Job

每提交一個作業

會根據自身的情況,

都會單獨向yarn申請資源

,直到作業執行完成,一個作業的失敗與否並不會影響下一個作業的正常提交和執行。獨享Dispatcher 和 ResourceManager,按需接受資源申請;

適合規模大長時間執行的作業。

原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

66、yarn - application模式特點?

提交命令如下:

。/bin/flink run-application -t yarn-application xxx。jar

Yarn-Application模式:

每個作業單獨啟動叢集,隔離性好,JM負載均衡,

main方法在JobManager上執行

特點:

在yarn-per-job 和 yarn-session模式下,客戶端都需要執行以下三步,即:

1、獲取作業所需的依賴項;

2、透過執行環境分析並取得邏輯計劃,即StreamGraph→JobGraph;

3、將依賴項和JobGraph上傳到叢集中。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

只有在這些都完成之後,才會透過env。execute()方法 觸發 Flink執行時真正地開始執行作業。

如果所有使用者都在同一個客戶端上提交作業

較大的依賴會消耗更多的頻寬

,而較複雜的作業邏輯翻譯成JobGraph也需要吃掉更多的CPU和記憶體,

客戶端的資源反而會成為瓶頸

為了解決它,社群在傳統部署模式的基礎上實現了 Application模式。原本需要客戶端做的三件事被轉移到了JobManager裡,也就是說main()方法在叢集中執行(入口點位於 ApplicationClusterEntryPoint ),客 戶端只需要負責發起部署請求了

原理圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

綜上所述,Flink社群比較推薦使用 yarn-perjob 或者 yarn-application模式進行提交應用。

67、yarn - session 提交流程詳細介紹一下?

提交流程圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

1、啟動叢集

(1)Flink Client向Yarn ResourceManager提交任務資訊。

1)Flink Client將應用配置(Flink-conf。yaml、logback。xml、log4j。properties)和相關檔案(Flink Jar、配置類檔案、使用者Jar檔案、JobGraph物件等)上傳至分散式儲存HDFS中。

2)Flink Client向Yarn ResourceManager提交任務資訊

(2)Yarn 啟動 Flink叢集,做2步操作:

1)

透過Yarn Client 向Yarn ResourceManager提交Flink建立叢集的申請

,Yarn ResourceManager 分配Container 資源,並通知對應的NodeManager上啟動一個ApplicationMaster(每提交一個flink job 就會啟動一個applicationMaster),ApplicationMaster會包含當前要啟動的 JobManager和 Flink自己內部要使用的ResourceManager。

2)在JobManager 程序中執行

YarnSessionClusterEntryPoint

作為叢集啟動的入口。初始化Dispatcher,Flink自己內部要使用的ResourceManager,啟動相關RPC服務,等待Flink Client 透過Rest介面提交JobGraph。

2、作業提交

(3)Flink Client 透過Rest 向Dispatcher 提交編譯好的JobGraph。

Dispatcher 是 Rest 介面,不負責實際的排程、指定工作。

(4)Dispatcher 收到 JobGraph 後,為作業建立一個JobMaster,將工作交給JobMaster,

JobMaster負責作業排程,管理作業和Task的生命週期

,構建ExecutionGraph(

JobGraph的並行化版本,排程層最核心的資料結構

以上兩步執行完後,作業進入排程執行階段。

3、作業排程執行

(5)JobMaster向ResourceManager申請資源,開始排程ExecutionGraph。

(6)ResourceManager將資源請求加入等待佇列,透過心跳向YarnResourceManager申請新的Container來啟動TaskManager程序。

(7)YarnResourceManager啟動,然後從HDFS載入Jar檔案等所需相關資源,在容器中啟動TaskManager,TaskManager啟動TaskExecutor

(8)TaskManager啟動後,向ResourceManager 註冊,並把自己的Slot資源情況彙報給ResourceManager。

(9)ResourceManager從等待佇列取出Slot請求,向TaskManager確認資源可用情況,並告知TaskManager將Slot分配給哪個JobMaster。

(10)TaskManager向JobMaster回覆自己的一個Slot屬於你這個任務,JobMaser會將Slot快取到SlotPool。

(11)JobMaster排程Task到TaskMnager的Slot上執行。

68、yarn - perjob 提交流程詳細介紹一下?

提交命令如下:

。/bin/flink run -t yarn-per-job ——detached xxx。jar

提交流程圖如下所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

1、啟動叢集

(1)Flink Client向Yarn ResourceManager提交任務資訊。

1)Flink Client將應用配置(Flink-conf。yaml、logback。xml、log4j。properties)和相關檔案(Flink Jar、配置類檔案、使用者Jar檔案、JobGraph物件等)上傳至分散式儲存HDFS中。

2)Flink Client向Yarn ResourceManager提交任務資訊

(2)Yarn 啟動 Flink叢集,做2步操作:

1)

透過Yarn Client 向Yarn ResourceManager提交Flink建立叢集的申請

,Yarn ResourceManager 分配Container 資源,並通知對應的NodeManager上啟動一個ApplicationMaster(每提交一個Flink job 就會啟動一個ApplicationMaster),ApplicationMaster會包含當前要啟動的 JobManager和 Flink自己內部要使用的ResourceManager。

2)在JobManager 程序中執行

YarnJobClusterEntryPoint

作為叢集啟動的入口。初始化Dispatcher,Flink自己內部要使用的ResourceManager,啟動相關RPC服務,等待Flink Client 透過Rest介面提交JobGraph。

2、作業提交

(3)ApplicationMaster啟動Dispatcher,Dispatcher啟動ResourceManager和JobMaster(該步和Session不同,Jabmaster是由Dispatcher拉起,而不是Client傳過來的)。

JobMaster負責作業排程,管理作業和Task的生命週期

,構建ExecutionGraph(

JobGraph的並行化版本,排程層最核心的資料結構

以上兩步執行完後,作業進入排程執行階段。

3、作業排程執行

(4)JobMaster向ResourceManager申請Slot資源,開始排程ExecutionGraph。

(5)ResourceManager將資源請求加入等待佇列,透過心跳向YarnResourceManager申請新的Container來啟動TaskManager程序。

(6)YarnResourceManager啟動,然後從HDFS載入Jar檔案等所需相關資源,在容器中啟動TaskManager。

(7)TaskManager在內部啟動TaskExecutor。

(8)TaskManager啟動後,向ResourceManager 註冊,並把自己的Slot資源情況彙報給ResourceManager。

(9)ResourceManager從等待佇列取出Slot請求,向TaskManager確認資源可用情況,並告知TaskManager將Slot分配給哪個JobMaster。

(10)TaskManager向JobMaster回覆自己的一個Slot屬於你這個任務,JobMaser會將Slot快取到SlotPool。

(11)JobMaster排程Task到TaskMnager的Slot上執行。

69、流圖、作業圖、執行圖三者區別?

Flink內部Graph總覽圖,由於現在Flink 實行流批一體程式碼,Batch API基本廢棄,就不過多介紹

在Flink DataStramAPI 中,Graph內部轉換圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

以WordCount為例,流圖、作業圖、執行圖、物理執行圖之間的Task排程如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

對於Flink 流計算應用,執行使用者程式碼時,首先呼叫DataStream API ,將使用者程式碼轉換為

Transformation

,然後經過:

StreamGraph

->

JobGraph

->

ExecutionGraph

3層轉換(這些都是Flink內建的資料結構),最後經過Flink排程執行,在Flink 叢集中啟動計算任務,形成一個

物理執行圖

70、流圖介紹一下?

(1)流圖 StreamGraph

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

流圖StreamGraph 核心物件包括兩個:

StreamNode 點 和 StreamEdge 邊

1)StreamNode 點

StreamNode 點

,從 Transformation轉換而來,可以簡單理解為

StreamNode

表示一個運算元,存在實體和虛擬,可以有多個輸入和輸出,實體

StreamNode

最終變成物理運算元,虛擬的附著在

StreamEdge 邊

上。

2)StreamEdge 邊

StreamEdge 是 StreamGraph 的邊,

用來連線兩個StreamNode 點,一個StreamEdge可以有多個出邊、入邊等資訊。

71、作業圖介紹一下?

(2)作業圖 JobGraph

JobGraph是由StreamGraph最佳化而來,是透過

OperationChain

機制將運算元合併起來,在執行時,排程在同一個Task執行緒上,避免資料的跨執行緒,跨網路傳遞。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

作業圖JobGraph 核心物件包括三個:

JobVertex 點 、 JobEdge 邊、IntermediateDataSet 中間資料集

1)JobVertex 點

經過運算元融合最佳化後符合條件的多個StreamNode 可能會融合在一起生成一個 JobVertex,即一個JobVertex 包含一個或多個運算元,

JobVertex 的輸入是 JobEdge. 輸出是 IntermediateDataSet

2)JobEdge 邊

JobEdge

表示 JobGraph 中的一 個數據流轉通道, 其上游資料來源是

IntermediateDataSet

,下游消費者是

JobVertex

JobEdge

中的資料分發模式會直接影響執行時 Task 之間的資料連線關係是

點對點連線

還是

全連線

3)IntermediateDataSet 中間資料集

中間資料集

IntermediateDataSet

是一種邏輯結構。用來表示

JobVertex

的輸出,即該 JobVertex 中包含的運算元會產生的資料集。不同的執行模式下,其對應的結果分割槽型別不同,決 定了在執行時刻資料交換的模式。

72、執行圖介紹一下?

(3)執行圖 ExecutionGraph

ExecutionGraph

是排程Flink 作業執行的核心資料結構,包含了作業中所有並行執行的Task資訊、Task之間的關聯關係、資料流轉關係。

StreamGraph 和JobGraph都在Flink Client生成,然後交給Flink叢集。JobGraph到ExecutionGraph在JobMaster中 完成,轉換過程中重要變化如下:

1)加入了並行度的概念,成為真正可排程的圖結構。

2)生成了6個核心物件。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

執行圖ExecutionGraph 核心物件包括6個:

ExecutionJobVertex、ExecutionVertex、IntermediateResult、IntermediateResultPartition、ExecutionEdge、Execution。

1)ExecutionJobVertex

該物件和 JobGraph 中的 JobVertex 一 一對應。該物件還包含一組 ExecutionVertex, 數量 與該 JobVertex 中所包含的StreamNode 的並行度一致,假設 StreamNode 的並行度為5 ,那麼

ExecutionJobVertex

中也會包含 5個

ExecutionVertex

ExecutionJobVertex

用來將一個JobVertex 封裝成

ExecutionJobVertex

,並依次建立 ExecutionVertex、Execution、IntermediateResult 和 IntermediateResultPartition,用於豐富

ExecutionGraph。

2)ExecutionVertex

ExecutionJobVertex

會對作業進行並行化處理,構造可以並行執行的例項,每一個並行執行的例項就是

ExecutionVertex

3)IntermediateResult

IntermediateResult

又叫作中間結果集,該物件是個邏輯概念 表示

ExecutionJobVertex

輸出,和 JobGrap 中的IntermediateDalaSet 一 一對應,同樣 一個

ExecutionJobVertex

可以有多箇中間結果,取決於當前 JobVertex 有幾個出邊(JobEdge)。

4)IntermediateResultPartition

IntermediateResultPartition

又叫作中間結果分割槽。表示1個

ExecutionVertex

輸出結果,與 Execution Edge 相關聯。

5)ExecutionEdge

表示

ExecutionVertex

的輸入,連按到上游產生的

IntermediateResultPartition

。1個Execution對應唯一的1個

IntermediateResultPartition

和1個

ExecutionVertex

。1個

ExecutionVertex

可以有多個

ExecutionEdge。

6)Execution

ExecutionVertex

相當於每個 Task 的模板,在真正執行的時候,會將

ExecutionVertex中的資訊包裝為1個Execution,執行一個ExecutionVertex的一次嘗試。

JobManager 和 TaskManager 之間關於Task 的部署和Task執行狀態的更新都是透過ExecutionAttemptID來識別標識的。

如果覺得阿周講解的知識點還滿意的話,請關注公眾號:

3分鐘秒懂大資料

,獲取更多,更全面的技術博文。並加博主微信:

threeknowbigdata,

拉你進大資料交流群。

接下來問問作業排程的問題

73、Flink排程器的概念介紹一下?

排程器

是Flink作業執行的核心元件,管理作業執行的所有相關過程,包括JobGraph到ExecutionGraph的轉換、作業生命週期管理(作業的釋出、取消、停止)、作業的Task生命週期管理(Task的釋出、取消、停止)、資源申請與釋放、作業和Task的Faillover等。

(1)DefaultScheduler

Flink 目前預設的排程器。是Flink新的排程設計,使用SchedulerStrategy來實現排程。

(2)LegacySchedular

過去的排程器,實現了原來的Execution排程邏輯。

74、Flink排程行為包含幾種?

排程行為包含四種:

SchedulerStrategy介面定義了排程行為,其中包含4種行為:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)startScheduling:排程入口,觸發排程器的排程行為

(2)restartTasks:重啟執行失敗的Task,一般是Task執行異常導致的。

(3)onExecutionStateChange:當Execution狀態發生改變時。

(4)onPartitionConsumable:當IntermediateResultPartition中的資料可以消費時。

75、Flink排程模式包含幾種?

排程模式包含3種:Eager模式、分階段模式(Lazy_From_Source)、分階段Slot重用模式(Lazy_From_Sources_With_Batch_Slot_Request)。

1)Eager 排程

適用於流計算

。一次性申請需要的所有資源,如果資源不足,則作業啟動失敗。

2)分階段排程

LAZY_FROM_SOURCES

適用於批處理

。從 SourceTask 開始分階段排程,申請資源的時候,一次性申請本階段所需要的所有資源。上游 Task 執行完畢後開始排程執行下游的 Task,

讀取上游的資料,執行本階段的計算任務,執行完畢之後,排程後一個階段的 Task,依次進行排程,直到作業完成。

3)分階段 Slot 重用排程

LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST

適用於批處理

。與分階段排程基本一樣,區別在於該模式下使用批處理資源申請模式,可以在資源不足的情況下執行作

業,但是需要確保在本階段的作業執行中沒有 Shuffle 行為。

目前視線中的 Eager 模式和 LAZY_FROM_SOURCES 模式的資源申請邏輯一樣,LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST 是單獨的資源申請邏輯。

76、Flink排程策略包含幾種?

排程策略包含3種:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

排程策略全部實現於排程器SchedulingStrategy,有三種實現:

1) EagerSchedulingStrategy:

適用於流計算,同時排程所有的 task

2) LazyFromSourcesSchedulingStrategy:

適用於批處理,當輸入資料準備好時(上游處理完)進行 vertices 排程。

3) PipelinedRegionSchedulingStrategy:

以流水線的區域性為粒度進行排程

PipelinedRegionSchedulingStrategy 是 1。11 加入的,從 1。12 開始,將以 pipelined region為單位進行排程。

pipelined region 是一組流水線連線的任務。這意味著,對於包含多個 region的流作業,在開始部署任務之前,它不再等待所有任務獲取 slot。取而代之的是,一旦任何region 獲得了足夠的任務 slot 就可以部署它。對於批處理作業,將不會為任務分配 slot,也不會單獨部署任務。取而代之的是,一旦某個 region 獲得了足夠的 slot,則該任務將與所有其他任務一起部署在同一區域中。

77、Flink作業生命週期包含哪些狀態?

在Flink叢集中,

JobMaster

負責

作業

的生命週期管理,具體的管理行為在排程器和ExecutionGraph中實現。

作業的完整生命週期狀態變換如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)作業首先處於建立狀態(created),然後切換到執行狀態(running),並且在完成所有工作後,它將切換到完成狀態(finished)。

(2)在失敗的情況下,作業首先切換到失敗狀態(failing),取消所有正在執行任務。

如果所有節點都已達到最終狀態,並且作業不可重新啟動,則狀態將轉換為失敗(failed)。(3)如果作業可以重新啟動,那麼它將進入重新啟動狀態(restarting)。一旦完成重新啟動,它將變成建立狀態(created)。

(4)在使用者取消作業的情況下,將進入取消狀態(cancelling),會取消所有當前正在執行的任務。一旦所有執行的任務已經達到最終狀態,該作業將轉換到已取消狀態(canceled)。

完成狀態(finished),取消狀態(canceled)

失敗狀態(failed)

表示一個全域性的終結狀態,並且觸發清理工作,

而暫停狀態(suspended)僅處於本地終止狀態

。意味著作業的執行在相應的 JobManager 上終止,但叢集的另一個 JobManager 可以從持久的HA儲存中恢復這個作業並重新啟動。因此,處於暫停狀態的作業將不會被完全清理。

78、Task的作業生命週期包含哪些狀態?

TaskManager

負責

Task

的生命週期管理,並將狀態的變化通知到JobMaster,在ExecutionGraph中跟蹤Execution的狀態變化,一個Execution對於一個Task。

Task的生命週期如下:共8種狀態。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在執行 ExecutionGraph 期間,每個並行任務經過多個階段,從建立(created)到完成(finished)或失敗(failed) ,下圖說明了它們之間的狀態和可能的轉換。任務可以執行多次(例如故障恢復)。每個 Execution 跟蹤一個 ExecutionVertex 的執行,每個 ExecutionVertex 都有一個當前 Execution(current execution)和一個前驅 Execution(prior execution)。

79、Flink的任務排程流程講解一下?

任務排程流程圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

1。 當Flink執行executor會自動根據程式程式碼生成

DAG資料流圖

,即

Jobgraph;

2。

ActorSystem

建立

Actor

將資料流圖傳送給JobManager中的Actor;

3。

JobManager

會不斷接收

TaskManager

的心跳訊息,從而可以獲取到有效的TaskManager;

4。 JobManager透過排程器在TaskManager中排程執行Task(在Flink中,最小的排程單元就是task,對應就是一個執行緒) ;

5。 在程式執行過程中,task與task之間是可以進行資料傳輸的 。

• Job Client

– 主要職責是提交任務, 提交後可以結束程序, 也可以等待結果返回 ;

– Job Client 不是 Flink 程式執行的內部部分,但它是任務執行的起點;

– Job Client 負責接受使用者的程式程式碼,然後建立資料流,將資料流提交給 JobManager 以便進一步執行。執行完成後,Job Client 將結果返回給使用者。

• JobManager

– 主要職責是排程工作並協調任務做檢查點;

– 叢集中至少要有一個 master,master 負責排程 task,協調checkpoints 和 容錯;

– 高可用設定的話可以有多個 master,但要保證一個是 leader, 其他是stand by;

– Job Manager 包含

Actor System、Scheduler、CheckPoint

三個重要的元件 ;

– JobManager從客戶端接收到任務以後, 首先生成最佳化過的執行計劃, 再排程到TaskManager中執行。

• TaskManager

– 主要職責是從JobManager處接收任務, 並部署和啟動任務, 接收上游的數 據並處理

– Task Manager 是在 JVM 中的一個或多個執行緒中執行任務的工作節點。

– TaskManager在建立之初就設定好了Slot, 每個Slot可以執行一個任務。

80、Flink的任務槽是什麼意思?

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

每個TaskManager是一個JVM的程序, 可以在不同的執行緒中執行一個或多個子任務。為了控制一個worker能接收多少個task。worker透過task slot來進行控制(一個worker 至少有一個task slot)。

1、任務槽

每個task slot表示TaskManager擁有資源的一個固定大小的子集。

一般來說:我們分配槽的個數都是和CPU的核數相等,比如8核,那麼就分配8個槽。

Flink將程序的記憶體劃分到多個slot中。

圖中有2個TaskManager,每個TaskManager有3個slot,每個slot佔有1/3的記憶體。

記憶體被劃分到不同的slot之後可以獲得如下好處:

TaskManager

最多能同時併發執行的任務是可以控制的,那就是3個,因為不能超過slot的數量。

任務槽的作用就是分離任務的託管記憶體,不會發生cpu隔離。

• slot有獨佔的記憶體空間,這樣在一個TaskManager中可以執行多個不同的作業,作業之間不受影響。

總結:task slot的個數代表TaskManager可以並行執行的task數。

81、Flink 槽共享又是什麼意思?

2、槽共享

預設情況下,Flink允許子任務共享插槽

,即使它們是不同任務的子任務,只要它們來自同一個作業。結果是一個槽可以儲存作業的整個管道。允許插槽共享有兩個主要好處:

• 只需計算Job中最高並行度(parallelism)的task slot。只要這個滿足,其他的job也都能滿足。

• 資源分配更加公平。如果有比較空閒的slot可以將更多的任務分配給它。圖中若沒有任務槽共享,負載不高的Source/Map等subtask將會佔據許多資源,而負載較高的視窗subtask則會缺乏資源。

• 有了任務槽共享,可以將基本並行度(base parallelism)從2提升到6。提高了分槽資源的利用率。同時它還可以保障TaskManager給subtask的分配的slot方案更加公平。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

04、Flink SQL篇

Flink SQL也是面試的重點考察點,不僅需要你掌握紮實的SQL程式設計,同時還需要理解SQL提交的核心原理,以及Flink SQL中涉及的一些重點知識,例如CEP、CDC、SQL GateWay、SQL-Hive等,思維導圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

82、Flink SQL有沒有使用過?

用過,在Flink中,一共有

四種

級別的

抽象

,而Flink SQL作為最上層,是Flink API的一等公民

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

在標準SQL中,SQL語句包含四種類型

DML(Data Manipulation Language):資料操作語言,用來定義資料庫記錄(資料)。

DCL (Data Control Language):資料控制語言,用來定義訪問許可權和安全級別。

DQL (Data Query Language):資料查詢語言,用來查詢記錄(資料)。

DDL(Data Definition Language):資料定義語言,用來定義資料庫物件(庫,表,列等)。

Flink SQL包含 DML 資料操作語言、 DDL 資料語言, DQL 資料查詢語言

不包含DCL語言。

83、Flink被稱作流批一體,那從哪個版本開始,真正實現流批一體的?

從1。9。0版本開始,引入了阿里巴巴的 Blink ,對 FIink TabIe & SQL 模組做了重大的重構,保留了 Flink Planner 的同時,引入了 Blink PIanner,沒引入以前,Flink 沒考慮流批作業統一,針對流批作業,底層實現兩套程式碼,引入後,基於流批一體理念,重新設計算子,以流為核心,流作業和批作業最終都會被轉為transformation。

84、Flink SQL 使用哪種解析器?

Flink SQL使用 Apache Calcite作為解析器和最佳化器。

Calcite

一種動態資料管理框架,它具備很多典型資料庫管理系統的功能 如SQL 解析、 SQL 校驗、 SQL 查詢最佳化、 SQL 生成以及資料連線查詢等,但是又省略了一些關鍵的功能,如 Calcite並

不儲存

相關的

元資料

基本資料

,不完全包含相關處理資料的演算法等。

85、Calcite主要功能包含哪些?

Calcite 主要包含以下五個部分:

(1) SQL 解析 (Parser)

Calcite SQL 解析是

透過 JavaCC

實現的,使用 JavaCC 編寫 SQL 語法描述檔案,將 SQL 解析成未經校驗的 AST 語法樹。

(2) SQL 校驗 (Validato)

校驗分兩部分

1)無狀態的校驗 即驗證 SQL 語句是否符合規範。

2)有狀態的校驗 即透過與元資料結合驗證 SQL 中的 Schema、Field、 Function 是否存 在,輸入輸出型別是否匹配等。

(3) SQL 查詢最佳化

對上個步驟的輸出( RelNode ,邏輯計劃樹)進行最佳化,得到最佳化後的物理執行計劃 最佳化有兩種:

基於規則的最佳化

基於代價的最佳化

,後面會詳細介紹。

(4) SQL 生成

將物理執行計劃生成為在特定平臺/引擎的可執行程式,如生成符合 MySQL 或 Oracle 等不同平臺規則的 SQL 查詢語句等。

(5) 資料連線與執行

透過各個執行平臺執行查詢,得到輸出結果。

在Flink 或者其他使用 Calcite 的大資料引擎中,一般到 SQL 查詢最佳化即結束,由各個平臺結合 Calcite SQL 程式碼生成 和 平臺實現的程式碼生成,將最佳化後的物理執行計劃組合成可執行的程式碼,然後在記憶體中編譯執行。

86、Flink SQL 處理流程說一下?

下面舉個例子,詳細描述一下Flink Sql的處理流程,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

我們寫一張source表,來源為kafka,當執行create table log_kafka之後 Flink SQL將做如下操作:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)首先,FlinkSQL 底層使用的是

apache Calcite

引擎來處理SQL語句,Calcite會使用javaCC做SQL解析,javaCC根據Calcite中定義的

Parser.jj

檔案,生成一系列的java程式碼,生成的java程式碼會

把SQL轉換成AST抽象語法樹(即SQLNode型別)

(2)生成的 SqlNode 抽象語法樹,他是一個

未經驗證的抽象語法樹

,這時,SQL Validator 會獲取 Flink Catalog中的元資料資訊來驗證 sql 語法,元資料資訊檢查包括表名,欄位名,函式名,資料型別等檢查。然後

生成一個校驗後的SqlNode

(3)到達這步後,只是將 SQL 解析到 java 資料結構的固定節點上,並沒有給出相關節點之間的關聯關係以及每個節點的型別資訊。所以,還

需要將 SqlNode 轉換為邏輯計劃,也就是LogicalPl

an,在轉換過程中,會使用

SqlToOperationConverter

類,來將SqlNode轉換為Operation,Operation會根據SQL語法來執行建立表或者刪除表等操作,同時

FlinkPlannerImpl.rel()方法會將SQLNode轉換成RelNode樹,並返回RelRoot

(4)第4步將

執行 Optimize 操作

,按照預定義的最佳化規則 RelOptRule 最佳化邏輯計劃。

Calcite中的最佳化器RelOptPlanner有兩種,一是

基於規則最佳化(RBO)的HepPlanner

,二是

基於代價最佳化(CBO)的VolcanoPlanner

。然後得到最佳化後的RelNode, 再基於Flink裡面的rules

將最佳化後的邏輯計劃轉換成物理計劃

(5)第5步

執行 execute 操作

,會透過程式碼生成 transformation,然後遞迴遍歷各節點,將DataStreamRelNode 轉換成DataStream, 在這期間,會依次遞迴呼叫DataStreamUnion、DataStreamCalc、DataStreamScan類中重寫的 translateToPlan方法。遞迴呼叫各節點的translateToPlan,實際是利用CodeGen元編成Flink的各種運算元,相當於直接利用Flink的DataSet或者DataStream開發程式。

(6)

最後進一步

編譯

成可執行的 JobGraph 提交執行。

87、Flink SQL包含哪些最佳化規則?

如下圖為執行流程圖

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

總結就是:

先解析,然後驗證,將SqlNode轉化為Operation來建立表,然後呼叫rel方法將sqlNode變成 邏輯計劃 (RelNodeTree)緊接著對邏輯計劃進行最佳化;

最佳化之前 會根據Calcite中的最佳化器中的基於規則最佳化的HepPlanner

針對四種規則進行預處理

,處理完之後得到Logic RelNode,緊接著

使用代價最佳化的VolcanoPlanner使用

Logical_Opt_Rules(邏輯計劃最佳化)找到最優的執行Planner,並轉換為FlinkLogical RelNode。

最後運用 Flink包含的最佳化規則,如DataStream_Opt_Rules:流式計算最佳化,DataStream_Deco_Rules:裝飾流式計算最佳化 將最佳化後的邏輯計劃轉換為物理計劃。

最佳化規則包含如下:

Table_subquery_rules 子查詢最佳化

Expand_plan_rules:擴充套件計劃最佳化

Post_expand_clean_up_rules:擴充套件計劃最佳化

Datastream_norm_rules:正常化流處理

Logical_Opt_Rules:邏輯計劃最佳化

DataStream_Opt_Rules:流式計算最佳化

DataStream_Deco_Rules:裝飾流式計算最佳化

88、Flink SQL中涉及到哪些operation?

先介紹一下什麼是Operation

在Flink SQL中嗎,涉及的DDL,DML,DQL操作都是Operation,在 Flink內部表示,Operation可以和SqlNode對應起來。

Operation執行在最佳化前

,執行的函式為executeQperation,如下圖所示,為執行的所有Operation。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

89、Flink Hive有沒有使用過?

Flink社群在Flink1。11版本進行了重大改變,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

90、Flink與Hive整合時都做了哪些操作?

如下所示為Flink與HIve進行連線時的執行圖:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)Flink1。1新引入了Hive方言,所以在Flink SQL中可以編寫HIve語法,即Hive Dialect。

(2)編寫HIve SQL後,FlinkSQL Planner 會將SQL進行解析,驗證,轉換成邏輯計劃,物理計劃,最終變成Jobgraph。

(3)

HiveCatalog作為Flink和Hive的表元素持久化介質,會將不同會話的Flink元資料儲存到Hive Metastore中

。使用者利用HiveCatalog可以將hive表或者 Kafka表儲存到Hive Metastore中。

BlinkPlanner 是在Flink1。9版本新引入的機制,Blink 的查詢處理器則實現流批作業介面的統一,

底層的 API 都是Transformation

。真正實現 流 &批 的統一處理,替代原FlinkPlanner將流&批區分處理的方式。在1。11版本後 已經預設為Blink Planner。

91、HiveCatalog類包含哪些方法?

重點方法如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

HiveCatalog主要是

持久化元資料

,所以 一般的建立型別都包含,如 database,Table,View,Function,Partition,還有is_Generic欄位判斷等。

92、Flink SQL1。11新增了實時數倉功能,介紹一下?

Flink1。11 版本新增的一大功能是實時數倉,可以實時的將kafka中的資料插入Hive中,傳統的實時數倉基於 Kafka+ Flinkstreaming,定義全流程的流計算作業,有著秒級甚至毫秒的實時性,

但實時數倉的一個問題是歷史資料只有 3-15天,無法在其上做 Ad-hoc的查詢。

針對這個特點,Flink1。11 版本將

FlieSystemStreaming Sink

重新修改,增加了

分割槽提交

滾動策略機制

,讓HiveStreaming sink 重新使用檔案系統流接收器。

Flink 1。11 的 Table/SQL API 中,

FileSystemConnector 是靠增強版 StreamingFileSink元件實現

,在原始碼中名為 StreamingFileWriter。

* 只有在Checkpoint 成功時,StreamingFileSink寫入的檔案才會由 Pending狀態變成 Finished狀態,從而能夠安全地被下游讀取。所以,我們一定要開啟 Checkpointing,並設定合理的間隔。

93、Flink -Hive實時寫資料介紹下?

StreamingWrite

,從kafka 中實時拿到資料,使用分割槽提交將資料從Kafka寫入Hive表中,並執行批處理查詢以讀取該資料。

Flink -SQL 寫法

Source源

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Sink目的地

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Insert 插入

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink-table寫法:

Source源

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Sink目的地

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Insert 插入

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

94、Flink -Hive實時讀資料介紹下?

如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink 原始碼中在對Hive進行讀取操作時,會經歷以下幾個步驟:

1、Flink都是基於calcite先解析sql,確定表來源於hive,如果是Hive表,將會

在HiveCatalog中建立HiveTableFactory

2、HiveTableFactory 會基於配置檔案

建立 HiveTableSource

,然後HiveTableSource在真正執行時,會呼叫getDataStream方法,透過getDataStream方法來確定查詢匹配的分割槽資訊,然後建立表對應的InputFormat,然後確定並行度,根據並行度確定slot 分發HiveMapredSplitReader任務。

3、在TaskManager端的slot中,Split會確定讀取的內容,基於Hive中定義的序列化工具,InputFormat執行讀取反序列化,得到value值。

4、最後迴圈執行reader。next 獲取value,將其解析成Row。

95、Flink -Hive實時寫資料時,如何保證已經寫入分割槽的資料何時才能對下游可見呢?

如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

首先可以看一下,在實時的將資料儲存到Hive數倉中,FileSystemConnector 為了與 Flink-Hive整合的大環境適配,

最大的改變就是分割槽提交

,可以看一下左下圖,官方文件給出的,分割槽可以採取日期+ 小時的策略,或者時分秒的策略。

那如何保證已經寫入分割槽的資料何時才能對下游可見呢?

這就和

觸發機制

有關, 觸發機制包含process-time和 partition-time以及時延。

partition-time

指的是根據事件時間中提取的分割槽觸發。當‘watermark’ > ‘partition-time’ + ‘delay’ ,選擇partition-time的資料才能提交成功,

process-time

指根據系統處理時間觸發,當加上時延後,要想讓分割槽進行提交,當‘currentprocessing time’ > ‘partition creation time’ + ‘delay’ 選擇 process-time的資料可以提交成功。

但選擇process-time觸發機制會有缺陷,就是當資料遲到或者程式失敗重啟時,資料不能按照事件時間被歸入正確分割槽。所以 一般會選擇 partition-time。

96、原始碼中分割槽提交的PartitionCommitTrigger介紹一下?

在原始碼中,PartitionCommitTrigger類圖如下所示

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

該類中維護了兩對必要的資訊:

1。pendingPartitions/pendingPartitionsState:等待提交的分割槽以及對應的狀態;2。watermarks/watermarksState:watermarks(用 TreeMap 儲存以保證有序)以及對應的狀態。

97、PartitionTimeCommitTigger 是如何知道該提交哪些分割槽的呢?(原始碼分析)

1、檢查checkpoint ID 是否合法;

2、取出當前checkpoint ID 對應的水印,並呼叫 TreeMap的headMap() 和 clear() 方法刪掉早於當前 checkpoint ID的水印資料(沒用了);

3、遍歷等待提交的分割槽,呼叫之前定義的PartitionTimeExtractor。

(比如${year}-${month}-${day} ${hour}:00:00)抽取分割槽時間。

如果watermark>partition-time+delay,說明可以提交,並返回它們

98、如何保證已經寫入分割槽的資料對下游可見的標誌問題(原始碼分析)

在原始碼中,主要涉及PartitionCommitPolicy類,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

99、Flink SQL CEP有沒有接觸過?

CEP的概念:

Ø

複雜事件處理(Complex Event Processing),用於識別輸入流中符合指定規則的事件,並按照指定方式輸出。

Ø

起床—>洗漱—>吃飯—>上班一系列串聯起來的事件流形成的模式

Ø

瀏覽商品—>加入購物車—>建立訂單—>支付完成—>發貨—>收貨事件流形成的模式。

透過概念可以瞭解,CEP主要是識別輸入流中使用者指定的一些基本規則的事件,然後將這些事件再透過指定方式輸出。

如下圖所示: 我們指定“方塊、圓”為基本規則的事件,在輸入的原始流中,將這些事件作為一個結果流輸出來。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

CEP的使用場景:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

像使用者異常檢測:我們指定異常操作事件為要輸出的結果流;策略營銷:指定符合要求的事件為結果流;運維監控:指定一定範圍的指標為結果流;銀行卡盜刷:指定同一時刻在兩個地方被刷兩次為異常結果流。

Flink CEP SQL 語法 是透過SQL方式進行復雜事件處理,但是與

Flink SQL語法也不太相同,其中包含許多規則。

100、Flink SQL CEP瞭解的引數介紹一下?

CEP包含的引數如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

引數介紹

p

輸出模式(

每個找到的匹配項應該輸出多少行

Ø

one row per match

每次檢測到完整的匹配後進行彙總輸出

Ø

all rows per match (

flink暫不支援

檢測到完整的匹配後會把匹配過程中每條具體記錄進行輸出

p

runningVS final語義

Ø

在計算中使用那些匹配的事件

running匹配中和final匹配結束

Ø

define語句中只可以使用running,measure兩者都可以

Ø

輸出結果區別

對於one row per match,輸出沒區別

對於all rows per match ,輸出不同

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

3、匹配後跳轉模式介紹

after match(

匹配後,從哪裡開始重新匹配

Ø

skip to next row

從匹配成功的事件序列中的第一個事件的下一個事件開始進行下一次匹配

Ø

skip past last row

從匹配成功的事件序列中的最後一個事件的下一個事件開始進行下一次匹配

Ø

skip to first pattern Item

從匹配成功的事件序列中第一個對應於patternItem的事件開始進行下一次匹配

Ø

skip to last pattern Item

從匹配成功的事件序列中最後一個對應於patternItem的事件開始進行下一次匹配

注意:

在使用skip to first/last patternItem容易出現迴圈匹配問題,需要慎重

針對上面的匹配後跳轉模式分別介紹:

(1)after match skip past last row 如下圖

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(2)after match skip to next row 如下圖

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(3)after match skip to last patternItem 如下圖

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(4)after match skip to first patternItem 如下圖

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

101、編寫一個CEP SQL案例,如銀行卡盜刷

透過Flink CEP SQL 寫的關於金融場景 銀行卡盜刷案例。

案例介紹:

在金融場景中,有時會出現銀行卡盜刷現象,犯罪分子利用網際網路等技術,在間隔10分鐘或者更短時間內,使一張銀行卡在不同的兩個地方出現多次刷卡記錄,這從常規操作來說,在間隔時間很多的情況下,使用者是無法同時在兩個城市進行刷卡交易的,所以出現這種問題,就需要後臺做出觸發報警機制。

要求:當相同的cardId在十分鐘內,從兩個不同的Location發生刷卡現象,觸發報警機制,以便檢測信用卡盜刷現象。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)編寫cep sql時,包含許多技巧,首先我們編寫最基礎的查詢語句,從一張表中查詢需要的欄位。

select starttime,endtime,cardId,event from dataStream

(2)match_recognize();

該欄位是CEP SQL 的前提條件,用於生成一個追加表,所有的 CEP SQL都是書寫在這裡面。

(3)分割槽,排序

由於是對同一ID,所以需要使用 partition by,還要根據時間進行排序 order by

(4)理解CEP SQL核心的編寫順序

如上圖示的順序

1、CEP SQL 的類為Pattern,檢測在10分鐘內兩個地方出現刷卡現象,所以定義兩個事件:

Pattern (e1 e2+) within interval ‘10’minute

2、定義在Pattern中要求的判斷語句,規定使用

define

define

e1 as a1。action = ‘’

e2 as e2。action = ‘’ and e2。location <> e1。location

3、根據上述的輸入條件構建輸出條件,規定使用 measures

measures

e2。action as event

e1。timestamp as starttime

last(e2。timestamp) as endtime

4、輸出條件匹配成功,輸出一條,規定寫法(

這塊根據不同的規則寫不同的語句)

one row per match

5、匹配後跳轉跳轉到下一行(根據不同規則寫不同語句)

after match skip to next row

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

根據核心編寫順序進行理解,然後在按照書寫正確的順序進行編寫。

102、Flink CDC瞭解嗎?什麼是 Flink SQL CDC Connectors?

在 Flink 1.11 引入了 CDC 機制

,CDC 的全稱是

Change Data Capture

,用於捕捉資料庫表的增刪改查操作,是目前非常成熟的同步資料庫變更方案。

Flink CDC Connectors 是 Apache Flink 的一組源聯結器,是可以從 MySQL、PostgreSQL 資料直接讀取全量資料和增量資料的 Source Connectors,開源地址:

https://github。com/ververica/flink-cdc-connectors

目前(1。13版本)支援的 Connectors 如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

另外

支援解析 Kafka 中 debezium-json 和 canal-json 格式的 Change Log

,透過Flink 進行計算或者直接寫入到其他外部資料儲存系統(比如 Elasticsearch),或者將 Changelog Json 格式的 Flink 資料寫入到 Kafka:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

Flink CDC Connectors 和 Flink 之間的版本對映:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

103、Flink CDC原理介紹一下

在最新CDC 調研報告中,

Debezium

Canal

是目前最流行使用的 CDC 工具,這些 CDC 工具的核心原理是抽取資料庫日誌獲取變更。在經過一系列調研後,目前Debezium (支援全量、增量同步,同時支援 MySQL、PostgreSQL、Oracle 等資料庫),使用較為廣泛。

Flink SQL CDC 內建了

Debezium

引擎,利用其抽取日誌獲取變更的能力,將 changelog 轉換為 Flink SQL 認識的 RowData 資料。(以下右側是 Debezium 的資料格式,左側是 Flink 的

RowData 資料格式

)。

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

RowData 代表了一行的資料,在 RowData 上面會有一個元資料的資訊

RowKind

,RowKind 裡面包括了插入(+I)、更新前(-U)、更新後(+U)、刪除(-D),這樣和資料庫裡面的 binlog 概念十分類似。透過 Debezium 採集的資料,包含了舊資料(before)和新資料行(after)以及原資料資訊(source),op 的 u 表示是 update 更新操作識別符號(op 欄位的值 c,u,d,r 分別對應 create,update,delete,reade),ts_ms 表示同步的時間戳。

104、透過CDC設計一種Flink SQL 採集+計算+傳輸(ETL)一體化的實時數倉

設計圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

透過 Flink CDC connectors 替換 Debezium+Kafka 的資料採集模組

實現

Flink SQL 採集+計算+傳輸(ETL)一體化

,以Mysql為Source源,Flink CDC中介軟體為外掛,ES或者Kafka,或者其他為Sink,這樣設計的優點如下:

開箱即用,簡單易上手

減少維護的元件,簡化實時鏈路,減輕部署成本

減小端到端延遲

Flink 自身支援 Exactly Once 的讀取和計算

資料不落地,減少儲存成本

支援全量和增量流式讀取

binlog 採集位點可回溯

105、Flink SQL CDC如何實現一致性保障(原始碼分析)

Flink SQL CDC 用於獲取資料庫變更日誌的 Source 函式是

DebeziumSourceFunction

,且最終返回的型別是

RowData

,該函式實現了 CheckpointedFunction,即透過 Checkpoint 機制來保證發生 failure 時不會丟數,實現 exactly once 語義,這部分在函式的註釋中有明確的解釋。

/** * The {@link DebeziumSourceFunction} is a streaming data source that pulls captured change data * from databases into Flink。 * 透過Checkpoint機制來保證發生failure時不會丟數,實現exactly once語義 *

The source function participates in checkpointing and guarantees that no data is lost * during a failure, and that the computation processes elements “exactly once”。 * 注意:這個Source Function不能同時執行多個例項 *

Note: currently, the source function can‘t run in multiple parallel instances。 * *

Please refer to Debezium’s documentation for the available configuration properties: * https://debezium。io/documentation/reference/1。2/development/engine。html#engine-properties

*/@PublicEvolvingpublic class DebeziumSourceFunction extends RichSourceFunction implementsCheckpointedFunction,ResultTypeQueryable {}

為實現 CheckpointedFunction,需要實現以下兩個方法:

public interface CheckpointedFunction {//做快照,把記憶體中的資料儲存在checkpoint狀態中void snapshotState(FunctionSnapshotContext var1) throws Exception;//程式異常恢復後從checkpoint狀態中恢復資料void initializeState(FunctionInitializationContext var1) throws Exception;}

接下來我們看看 DebeziumSourceFunction 中都記錄了哪些狀態。

/** Accessor for state in the operator state backend。 offsetState中記錄了讀取的binlog檔案和位移資訊等,對應Debezium中的*/private transient ListState offsetState;/** * State to store the history records, i。e。 schema changes。 * historyRecordsState記錄了schema的變化等資訊 * @see FlinkDatabaseHistory*/private transient ListState historyRecordsState;

我們發現在 Flink SQL CDC 是一個相對簡易的場景,沒有中間運算元,是透過 Checkpoint 持久化 binglog 消費位移和 schema 變化資訊的快照,來實現 Exactly Once。

106、Flink SQL GateWay瞭解嗎?

Flink SQL GateWay的概念:

FlinkSql Gateway是Flink叢集的“任務閘道器”,支援以restapi 的形式提交查詢、插入、刪除等任務,如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

總體架構如下圖所示:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

107、Flink SQL GateWay建立會話講解一下?

建立會話流程圖如下:

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

(1)傳入引數包含name名稱、planner執行引擎(Blink或原生的flink)、executetype(streaming或者batch)、properties(配置引數,如併發度等);

(2)在SessionMnager中,會根據這些引數建立對應的SessionContext;

SessionContext sessionContext= new SessionContext(sessionName, sessionId, sessionEnv, defaultContext);

(3)將建立Session放入Map集合中,最後返回對應的SessionId,方便後續使用。

sessions。put(sessionId,session); return sessionId;

108、Flink SQL GateWay如何處理併發請求?多個提交怎麼處理?

sql gateway內部維護SessionManager,裡面透過Map維護了各個Session,每個Session的任務執行是獨立的。同一個Session透過ExecuteContext內部的tEnv按順序提交。

109、如何維護多個SQL之間的關聯性?

在每個Session中單獨維護了tEnv,同一個session中的操作其實是在一個env中執行的。因此只要是同一個session中的任務,內部使用的tEnv就是同一個。這樣就可以實現在Asession中,先建立一個view,然後執行一個select,最後執行一個insert。

110、sql字串如何提交到叢集成為程式碼?

Session中維護了tenv,sql會透過tenv編譯生成pipeline(即DAG圖),在batch模式下是Plan執行計劃;在stream模式下是StreamGraph。然後Session內部會建立一個ProgramDeployer程式碼釋出器,根據Flink中配置的target建立不同的excutor。最後呼叫executor。execute方法提交Pipeline和config執行。

以上就是Flink 的全部內容,160張圖全部親自繪製出來!不想被白嫖呀,覺得好的,點贊,在看,分享三連擊,謝謝!!!

最後

第一時間獲取最新大資料技術,盡在本公眾號:

3分鐘秒懂大資料

--END--

Flink面試大全總結(全文6萬字、110個知識點、160張圖)

獲取本文PDF版, 加我微信threeknowbigdata,

備註:Flink