Flink 1.10 Native Kubernetes 原理與實踐

出處:https://zhoukaibo。com/2020/02/18/Flink-1-10-Native-Kubernetes

頭條號:

大資料與雲原生

微信公眾號:

大資料與雲原生

創作不易,在滿足創作共用版權協議的基礎上可以轉載,但請以超連結形式註明出處。

為了方便閱讀,微信公眾號已按分類排版,後續的文章將在移動端首發,想學習雲原生相關知識,請

關注我

千呼萬喚始出來,在 Kubernetes 如火如荼的今天,Flink 社群終於在 1。10 版本提供了對 Kubernetes 的原生支援,也就是 Native Kubernetes Integration[1]。不過還只是 Beta 版本,預計會在 1。11 版本里面提供完整的支援。

我們知道,在 Flink 1。9 以及之前的版本里面,如果要在 Kubernetes 上執行 Flink 任務是需要事先指定好需要的 TaskManager(TM) 的個數以及 CPU 和記憶體的。這樣的問題是:大多數情況下,你在任務啟動前根本無法精確的預估這個任務需要多少個 TM。如果指定的 TM 多了,會導致資源浪費;如果指定的 TM 個數少了,會導致任務排程不起來。本質原因是在 Kubernetes 上執行的 Flink 任務並沒有直接向 Kubernetes 叢集去申請資源。

Flink 在 1。10 版本完成了

Active Kubernetes Integration

的第一階段,支援了 session clusters。後續的第二階段會提供更完整的支援,如支援 per-job 任務提交,以及基於原生 Kubernetes API 的高可用,支援更多的 Kubernetes 引數如 toleration, label 和 node selector 等。Active Kubernetes Integration中的Active意味著 Flink 的 ResourceManager (KubernetesResourceManager) 可以直接和 Kubernetes 通訊,按需申請新的 Pod,類似於 Flink 中對 Yarn 和 Mesos 的整合所做的那樣。在多租戶環境中,使用者可以利用 Kubernetes 裡面的 namespace 做資源隔離啟動不同的 Flink 叢集。當然,Kubernetes 中的使用者帳號和賦權是需要提前準備好的。

原 理

Flink 1.10 Native Kubernetes 原理與實踐

Flink 1。10 native k8s

工作原理如下(段首的序號對應圖中箭頭所示的數字):

Flink 客戶端首先連線 Kubernetes API Server,提交 Flink 叢集的資源描述檔案,包括 configmap,job manager service,job manager deployment 和 Owner Reference[2]。

Kubernetes Master 就會根據這些資源描述檔案去建立對應的 Kubernetes 實體。以我們最關心的 job manager deployment 為例,Kubernetes 叢集中的某個節點收到請求後,Kubelet 程序會從中央倉庫下載 Flink 映象,準備和掛載 volume,然後執行啟動命令。在 flink master 的 pod 啟動後,Dispacher 和 KubernetesResourceManager 也都啟動了。前面兩步完成後,整個 Flink session cluster 就啟動好了,可以接受提交任務請求了。

使用者可以透過 flink 命令列即 flink client 往這個 session cluster 提交任務。此時 job graph 會在 flink client 端生成,然後和使用者 jar 包一起透過 RestClinet 上傳。

一旦 job 提交成功,JobSubmitHandler 收到請求就會提交 job 給 Dispatcher。接著就會生成一個 job master。

JobMaster 向 KubernetesResourceManager 請求 slots。

KubernetesResourceManager 從 Kubernetes 叢集分配 TaskManager。每個 TaskManager 都是具有唯一標識的 Pod。KubernetesResourceManager 會為 TaskManager 生成一份新的配置檔案,裡面有 Flink Master 的 service name 作為地址。這樣在 Flink Master failover之後,TaskManager 仍然可以重新連上。

Kubernetes 叢集分配一個新的 Pod 後,在上面啟動 TaskManager。

TaskManager 啟動後註冊到 SlotManager。

SlotManager 向 TaskManager 請求 slots。

TaskManager 提供 slots 給 JobMaster。然後任務就會被分配到這個 slots 上執行。

實 踐

Flink 的文件[3]上對如何使用已經寫的比較詳細了,不過剛開始總會踩到一些坑。如果對 Kubernetes 不熟,可能會花點時間。

(1) 首先得有個 Kubernetes 叢集,會有個 ~/。kube/config 檔案。嘗試執行 kubectl get nodes 看下叢集是否正常。

如果沒有這個

~/.kube/config

檔案,會報錯:

2020-02-17 22:27:17,253 WARN io。fabric8。kubernetes。client。Config - Error reading service account token from: [/var/run/secrets/kubernetes。io/serviceaccount/token]。 Ignoring。2020-02-17 22:27:17,437 ERROR org。apache。flink。kubernetes。cli。KubernetesSessionCli - Error while running the Flink session。io。fabric8。kubernetes。client。KubernetesClientException: Operation: [get] for kind: [Service] with name: [flink-cluster-81832d75-662e-40fd-8564-cd5a902b243c] in namespace: [default] failed。 at io。fabric8。kubernetes。client。KubernetesClientException。launderThrowable(KubernetesClientException。java:64) at io。fabric8。kubernetes。client。KubernetesClientException。launderThrowable(KubernetesClientException。java:72) at io。fabric8。kubernetes。client。dsl。base。BaseOperation。getMandatory(BaseOperation。java:231) at io。fabric8。kubernetes。client。dsl。base。BaseOperation。get(BaseOperation。java:164) at org。apache。flink。kubernetes。kubeclient。Fabric8FlinkKubeClient。getService(Fabric8FlinkKubeClient。java:334) at org。apache。flink。kubernetes。kubeclient。Fabric8FlinkKubeClient。getInternalService(Fabric8FlinkKubeClient。java:246) at org。apache。flink。kubernetes。cli。KubernetesSessionCli。run(KubernetesSessionCli。java:104) at org。apache。flink。kubernetes。cli。KubernetesSessionCli。lambda$main$0(KubernetesSessionCli。java:185) at org。apache。flink。runtime。security。NoOpSecurityContext。runSecured(NoOpSecurityContext。java:30) at org。apache。flink。kubernetes。cli。KubernetesSessionCli。main(KubernetesSessionCli。java:185)Caused by: java。net。UnknownHostException: kubernetes。default。svc: nodename nor servname provided, or not known

(2) 提前建立好使用者和賦權(RBAC[4])

kubectl create serviceaccount flinkkubectl create clusterrolebinding flink-role-binding-flink ——clusterrole=edit ——serviceaccount=default:flink

如果沒有建立使用者,使用預設的使用者去提交,會報錯:

Caused by: io。fabric8。kubernetes。client。KubernetesClientException: Failure executing: GET at: https://10。10。0。1/api/v1/namespaces/default/pods?labelSelector=app%3Dkaibo-test%2Ccomponent%3Dtaskmanager%2Ctype%3Dflink-native-kubernetes。Message: Forbidden!Configured service account doesn‘t have access。 Service account may have been revoked。 pods is forbidden: User “system:serviceaccount:default:default” cannot list resource “pods” in API group “” in the namespace “default”。

(3) 這一步是可選的。預設情況下, JobManager 和 TaskManager 只會將 log 寫到各自 pod 的 /opt/flink/log 。如果想透過 kubectl logs 看到日誌,需要將 log 輸出到控制檯。要做如下修改 FLINK_HOME/conf 目錄下的 log4j。properties 檔案。

log4j。rootLogger=INFO, file, console# Log all infos to the consolelog4j。appender。console=org。apache。log4j。ConsoleAppenderlog4j。appender。console。layout=org。apache。log4j。PatternLayoutlog4j。appender。console。layout。ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

然後啟動 session cluster 的命令列需要帶上引數:

-Dkubernetes。container-start-command-template=“%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%”

(4) 終於可以開始啟動 session cluster了。如下命令是啟動一個每個 TaskManager 是4G記憶體,2個CPU,4個slot 的 session cluster。

bin/kubernetes-session。sh -Dkubernetes。container-start-command-template=“%java% %classpath% %jvmmem% %jvmopts% %logging% %class% %args%” -Dkubernetes。cluster-id=kaibo-test -Dtaskmanager。memory。process。size=4096m -Dkubernetes。taskmanager。cpu=2 -Dtaskmanager。numberOfTaskSlots=4

更多的引數詳見文件:

https://ci。apache。org/projects/flink/flink-docs-release-1。10/ops/config。html#kubernetes

使用

kubectl logs kaibo-test-6f7dffcbcf-c2p7g -f

就能看到日誌了。

如果出現大量的如下這種日誌(目前遇到是雲廠商的LoadBalance liveness探測導致):

2020-02-17 14:58:56,323 WARN org。apache。flink。runtime。dispatcher。DispatcherRestEndpoint - Unhandled exceptionjava。io。IOException: Connection reset by peer at sun。nio。ch。FileDispatcherImpl。read0(Native Method) at sun。nio。ch。SocketDispatcher。read(SocketDispatcher。java:39) at sun。nio。ch。IOUtil。readIntoNativeBuffer(IOUtil。java:223) at sun。nio。ch。IOUtil。read(IOUtil。java:192) at sun。nio。ch。SocketChannelImpl。read(SocketChannelImpl。java:377) at org。apache。flink。shaded。netty4。io。netty。buffer。PooledByteBuf。setBytes(PooledByteBuf。java:247)

可以暫時在 log4j。properties 裡面配置上:

log4j。logger。org。apache。flink。runtime。dispatcher。DispatcherRestEndpoint=ERROR, file

這個日誌太多會導致 WebUI 上開啟 jobmanger log 是空白,因為檔案太大了前端無法顯示。

如果前面第(1)和第(2)步沒有做,會出現各種異常,透過 kubectl logs 就能很方便的看到日誌了。

Session cluster 啟動後可以透過 kubectl get pods,svc 來看是否正常。

透過埠轉發來檢視 Web UI:

kubectl port-forward service/kaibo-test 8081

開啟 http://127。0。0。1:8001 就能看到 Flink 的 WebUI 了。

(5) 提交任務

。/bin/flink run -d -e kubernetes-session -Dkubernetes。cluster-id=k test。jar

我們從 Flink WebUI 頁面上可以看到,剛開始啟動時,UI上顯示 Total/Available Task Slots 為0, Task Managers 也是0。隨著任務的提交,資源會動態增加。任務停止後,資源會釋放掉。

在提交任務後,透過 kubectl get pods 能夠看到 Flink 為 TaskManager 分配了新的 Pod。

Flink 1.10 Native Kubernetes 原理與實踐

pods

(6) 停止 session cluster

echo ’stop‘ | 。/bin/kubernetes-session。sh -Dkubernetes。cluster-id=kaibo-test -Dexecution。attached=true

也可以手工刪除資源:

kubectl delete service/

總 結

可以看到,Flink 1。10 版本對和 Kubernetes 的整合做了很好的嘗試。期待社群後續的 1。11 版本能對 per-job 提供支援,以及和 Kubernetes 的深度整合,例如基於原生 Kubernetes API 的高可用。最新進展請關注 FLINK-14460[5]。

參考連結:

[1] https://flink。apache。org/news/2020/02/11/release-1。10。0。html#native-kubernetes-integration-beta

[2] https://kubernetes。io/docs/concepts/workloads/controllers/garbage-collection/

[3] https://ci。apache。org/projects/flink/flink-docs-release-1。10/ops/deployment/native_kubernetes。html

[4] https://kubernetes。io/docs/reference/access-authn-authz/rbac/

[5] https://issues。apache。org/jira/browse/FLINK-14460

作者介紹:

周凱波(寶牛),阿里巴巴技術專家,四川大學碩士,2010 年畢業後加入阿里搜尋事業部,從事搜尋離線平臺的研發工作,參與將搜尋後臺資料處理架構從 MapReduce 到 Flink 的重構。目前在阿里計算平臺事業部,專注於基於 Flink 的一站式計算平臺的建設。