資料平臺SQL開發詳解與函式使用

雲智慧 AIOps 社群是由雲智慧發起,針對運維業務場景,提供演算法、算力、資料集整體的服務體系及智慧運維業務場景的解決方案交流社群。該社群致力於傳播 AIOps 技術,旨在與各行業客戶、使用者、研究者和開發者們共同解決智慧運維行業技術難題,推動 AIOps 技術在企業中落地,建設健康共贏的AIOps 開發者生態。

Flink SQL動態表

建立Kafka動態表

下圖為在Flink裡建立kafka動態表。知道kafka的資訊和資料格式資訊後建立kafka表,在建表語句的最後一個欄位,我們添加了一個kafka topic元資料資訊:create_time(資料寫入kafka的時間),基於以上操作便可以完成kafka動態表建立,後續便可以在Flink SQL裡對topic進行資料讀取或者寫入。

Kafka地址:10。2。3。14:9092

Topic名稱:r_01

消費者組id:8001

樣例資料:訂單1購買了商品1,消費金額1元

{“order_id”:1,“product_id”:1,“trans_amount”:1}

資料平臺SQL開發詳解與函式使用

建立Clickhouse動態表

下圖為在Flink裡建立Clickhouse動態表。此時可以看到Clickhouse的表結構,包含相關欄位的資料型別和主鍵資訊,與Flink SQL建表語句中的欄位、資料格式和主鍵也一一對應。 WITH裡面是Clickhouse的連線資訊和資料操作的配置資訊

Clickhouse地址:10。2。3。14:8123

資料庫名稱:default

資料表:product_sale

表結構及樣例資料如下:

資料平臺SQL開發詳解與函式使用

建立Redis動態表

下圖為在Flink裡建立Redis動態表。由於Redis表設計初衷是用於做維表,故必須包含可供資料關聯的主鍵和用於補齊資料的普通欄位,在建表語句裡體現為必須設定一個或多個主鍵,還必須具有一個或多個的普通欄位。 資料在Redis中的儲存是HASH格式,可以使用HGETALL檢視資料內容。

redis地址:10。2。3。39:3301

Key字首:index:product_sale

redis例資料如下:

資料平臺SQL開發詳解與函式使用

資料平臺SQL開發詳解與函式使用

Flink SQL連線引數

連線Kafka

connector:指定聯結器型別,固定值kafka。

topic:指需要消費或寫入資料的topic。

bootstrap。servers:kafka連線地址,可以填寫多個,以逗號分割。

Broker地址:在叢集正常執行時填寫一個或多個節點均可讀取到資料。此外,當kafka節點較多,topic分割槽較少時填寫一個節點,當topic分割槽並不在該節點上時,也能夠讀取到資料。需注意,當kafka服務出現問題時,如果個別節點服務中斷,填寫多個broker地址可以提高抗風險能力。

消費模式

earliest-offset對應平臺中的從頭開始消費。任務的每次啟動都會按照從前往後的順序讀取topic內現有的所有資料。但是這個順序是相對的,如果topic有多個分割槽,存在一定的資料傾斜,那資料較少的分割槽從資料時間上來看會讀的相對快一些。kafka資料的讀取是按照分割槽來讀的;

latest-offset對應平臺中的從當前開始消費。任務在啟動時會從最新的資料開始讀取;

group-offsets對應平臺中的按照group offset消費。這種模式下,任務在第一次啟動時會讀取最新的資料,在後續任務重啟時,會接著上次執行結束時處理到的資料點位繼續處理,這種模式也是kafka消費者的預設消費模式。該模式需要配合設定group id,kafka會按照group id把處理資料的偏移量記錄下來。由於是kafka記錄著偏移量,故group id可以跨平臺、跨應用來使用。比如當有一個java任務需要做kafka資料的持久化且由flink來實現,此時flink任務使用即可用與java任務相同的group id來實現任務平滑切換,做到無資料丟失、無資料重複;

Group id:用於記錄處理資料的偏移量,在任務重啟或異常恢復的時候繼續從斷點開始處理資料。

Value。format:kafka訊息格式。常用的為csv、json、raw以及多種cdc格式。

Ignore-parse-errors:解析失敗的原因包含多種,比如部分資料格式不是json,此時便會丟棄整條資料。需注意,如果其中一條json資料中的一個欄位格式與建表語句中的格式不同,強轉失敗時只會影響這一個資料欄位,不影響其它欄位的解析。

Fail-on-missing-field:當kafka訊息中缺少create table中定義的欄位時是否終止flink任務。

資料平臺SQL開發詳解與函式使用

連線Clickhouse

url:這裡可以填寫單個jdbc url,表示以叢集的方式寫入邏輯表;也可以填寫多個jdbc url,多個url使用逗號間隔,表示以輪詢的方式寫入Clickhouse本地表;

Table-name:表名只能有一個,在輪詢寫入本地表的時候,url連線和資料庫可以相同或不同,比如同一個url上的不同庫,但是表名必須相同;

Flush-max-rows:配合使用可以實現flink到clickhouse的同步輸出、半同步輸出、非同步輸出

資料平臺SQL開發詳解與函式使用

連線Redis

僅支援redis的hash結構,詳細資料結構如下:

hash key:{key-prefix}{key-spacer}{k1}{key-spacer}{k2}

hash field:schema中除了key欄位之外的其他欄位

hash value:儲存key之外其他欄位對應的值,flink redis schema支援的型別:STRING、BOOLEAN、TINYINT、SMALLINT、INT、BIGINT、DOUBLE

資料平臺SQL開發詳解與函式使用

資料平臺SQL開發詳解與函式使用

Flink SQL函式

Flink SQL函式分為內建函式和自定義函式

自定義函式:

ScalarFunction:行級資料處理,一行的一列或多列資料處理輸出一個數據

TableFunction:也是行級資料處理,接受一個或多個引數輸入,但是可以輸出多行多列資料

AggregateFunction:聚合函式,配合group by使用,可根據多行多列資料計算輸出一個指標值

TableAggregateFunction:表聚合函式,配合group by使用,根據多行多列資料計算輸出多行多列資料,表聚合函式目前還不能使用在FlinkSQL中,適用於Flink Table API

操作示例:

這裡以水果的ID獲取到水果名稱,演示一下自定義函式的用法。Udf可以使用Java程式碼來開發,具備java語言的特性,比如多型。示例裡的類繼承Flink的ScalarFunction,實現了一個eval函式,自定義函式的編寫較為簡單。此外,還可以根據Java的多型特性編寫多個eval函式,實現多型別資料的處理。 編寫好的自定義函式,打成jar包後透過平臺的資源庫頁面上傳至平臺,在編寫資料處理任務時,使用 create temporary function的方式引入該自定義函式,即可使用。

資料平臺SQL開發詳解與函式使用

Flink SQL案例

需求描述

對topic r_01 中的水果銷售流水進行統計,得到每種水果每分鐘的銷售額,將計算結果分別輸出到kafka、clickhouse、redis。

topic中資料格式: { “order_id”:1, “product_id”:1, “trans_amount”:1 }

計算結果應包含如下欄位: { “product_id”:1, “product_name”:“蘋果”, “create_minute”:‘2021-12-02 12:00:00’, “trans_amount”:3 }

具體操作

kafka to kafka

建立一個kafka source

建立一個kafka sink

編寫資料處理sql

資料平臺SQL開發詳解與函式使用

kafka to clickhouse

建立一個kafka source

建立一個clickhouse sink

編寫資料處理sql

資料平臺SQL開發詳解與函式使用

kafka to redis

建立一個kafka source

建立一個redis sink

編寫資料處理sql

資料平臺SQL開發詳解與函式使用

Checkpoint

狀態的作用

Checkpoint是Flink儲存任務執行狀態的一個檢查點,狀態是flink的一等公民,可以讓程式記住執行的中間結果,以便任務異常時的重啟恢復

Flink應用異常示例

實時統計當日訂單總額的程式異常中斷,從狀態恢復不需要從0點開始重新計算

實時ETL同步kafka資料到外部儲存異常中斷,從狀態恢復則不需要從頭消費

狀態資料的儲存

可以儲存在記憶體中,當狀態資料過大,記憶體Oom

儲存在持久化的檔案系統中,比如本地或者hdfs

透過狀態過期時間控制flink應用的狀態大小

狀態資料需要週期性地儲存下來,用於故障恢復

如何從檢查點恢復

讀取最近一次checkpoint中的狀態資料,比如累計銷售額sum值為8000元

讀取最近一次checkpoint中提交的offset,比如partition 0,offset 1000

上述狀態資料表明,flink應用程式在消費到(0,1000)這個位點時統計的銷售額為8000元

應用恢復正常後,從(0,1001)開始消費,sum從8000開始累加

檢查點內部狀態資料的一致性

內部狀態資料一致性語義:精確一致或者至少一次

同樣是上述樣例,如果是精確一致性語義,則sum值對每條kafka訊息只統計一次,如果是至少一次,則sum值的統計結果有可能偏高

如果topic只有一個分割槽,則是精確一致,因為flink連線kafka source的並行度為分割槽數,在並行度為1的情況下不存在多流不同時到達的情況

Kafka多分割槽情況下,flink預設是多並行度,此時設定為 至少一次語義,再加上多流很大機率不會同時到達,會導致統計結果偏高。

檢查點設定

建立flink流任務時可選選擇是否開啟檢查點並設定檢查點週期 。開啟檢查點有兩個作用,一是在任務執行發生意外自動重啟的時候會從檢查點恢復,可以確保任務從異常點繼續計算,保持資料連貫性與準確性;另一個作用是在手動停止任務後,再次啟動的時候,可以選擇是否從上一個檢查點繼續執行任務。

資料平臺SQL開發詳解與函式使用

檢查點恢復

任務中斷後再次啟動時可以選擇是否從最近一個檢查點恢復狀態資料。目前支援固定延遲和失敗比率的重啟策略,分別對應固定重啟次數和一段時間內失敗次數超過閾值則不再重啟。

資料平臺SQL開發詳解與函式使用

總結了很多有關於java面試的資料,希望能夠幫助正在學習java的小夥伴。由於資料過多不便發表文章,創作不易,望小夥伴們能夠給我一些動力繼續建立更好的java類學習資料文章,

請多多支援和關注小作,別忘了點贊+評論+轉發。右上角私信我回復【03】即可領取免費學習資料謝謝啦!

資料平臺SQL開發詳解與函式使用

原文出處:https://toutiao。io/posts/1w6yrjt