編排系統K8S Ingress-nginx原始碼解析

編排系統K8S Ingress-nginx原始碼解析

上篇文章介紹了Ingress-nginx的基本架構原理,具體可參考:

編排系統K8S Ingress-nginx介紹

本篇重點以原始碼為基礎,深入講解 Ingress-nginx的內部工作流以及整體工作模式。先來張工作流圖:

編排系統K8S Ingress-nginx原始碼解析

如上述工作流圖所述:Ingress-nginx 模組在執行時主要包括三個主體:NginxController、Store、SyncQueue。其中:

Store(協程)模組主要負責從 kubernetes API Server 收集執行時資訊,感知各類資源(如 Ingress、Service等)的變化,並及時將更新事件訊息(event)寫入一個環形管道。 SyncQueue(協程)定期掃描 syncQueue 佇列,發現有任務就執行更新操作,即藉助 Store 完成最新執行資料的拉取,然後根據一定的規則產生新的 nginx 配置,(有些更新必須 reload,就本地寫入新配置,執行 reload),然後執行動態更新操作,即構造 POST 資料,向本地 Nginx Lua 服務模組傳送 post 請求,以實現配置更新。

NginxController(主程)作為中間的聯絡者,監聽 updateChannel,一旦收到配置更新事件,就向同步佇列 syncQueue 裡寫入一個更新請求。

下面我們看下相關原始碼解析:

[因篇幅有限,僅列出核心的部分]

整個流程入口為main()函式,以下為~/nginx/main。go原始碼

func main() { 。。。 ngx := controller。NewNGINXController(conf, mc, fs) 。。。 ngx。Start()}

在 main 函式中,程式首先構造了 NginxController,並執行了其 Start 方法,啟動了 Controller 主程式。

關於ngx。Start(),我們可以追溯到internal/ingress/controller/nginx。go#Start(),具體:

func (n *NGINXController) Start() { 。。。 n。store。Run(n。stopCh) 。。。 go n。syncQueue。Run(time。Second, n。stopCh) 。。。 for { select { 。。。 case event := <-n。updateCh。Out(): if n。isShuttingDown { break } if evt, ok := event。(store。Event); ok { if evt。Type == store。ConfigurationEvent { n。syncQueue。EnqueueTask(task。GetDummyObject(“configmap-change”)) continue } n。syncQueue。EnqueueSkippableTask(evt。Obj) } 。。。 }}

NginxController 首先啟動了 Store 協程,然後啟動了 syncQueue 協程,最後監聽 updateCh,當收到事件後,經過相關條件判斷然後向 syncQueue 寫入了一個 task。

關於Store 協程,跟蹤到 internal/ingress/controller/store/store。go#Run(),具體:

func (s k8sStore) Run(stopCh chan struct{}) { s。informers。Run(stopCh) 。。。}

透過程式碼我們可以看到,此方法繼續呼叫了 informer 的 Run 方法,繼續跟蹤,可看到如下:

// Run initiates the synchronization of the informers against the API server。func (i *Informer) Run(stopCh chan struct{}) { go i。Endpoint。Run(stopCh) go i。Service。Run(stopCh) go i。Secret。Run(stopCh) go i。ConfigMap。Run(stopCh) 。。。 go i。Ingress。Run(stopCh) 。。。}

informer 的 Run 方法,會起更多的協程,去監聽不同資源的變化,包括 Endpoint、Service、Secret、ConfigMap、Ingress等等。以 Ingress 為例,在其定義處,可以找到 New() :

// New creates a new object store to be used in the ingress controllerfunc New(。。。 updateCh *channels。RingChannel 。。。) Storer { 。。。 store。informers。Ingress = infFactory。Extensions()。V1beta1()。Ingresses()。Informer() 。。。 ingEventHandler := cache。ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { 。。。 updateCh。In() <- Event{ Type: CreateEvent, Obj: obj, } }, DeleteFunc: func(obj interface{}) { 。。。 updateCh。In() <- Event{ Type: DeleteEvent, Obj: obj, } }, UpdateFunc: func(old, cur interface{}) { 。。。 updateCh。In() <- Event{ Type: UpdateEvent, Obj: cur, } }, }。。。 store。informers。Ingress。AddEventHandler(ingEventHandler) 。。。}

可以看出,Ingress 協程定義了監聽 ingress 資訊的 informer 物件,並註冊了相關事件的回撥方法,在回撥方法內向之前提到的 updateCh 寫入了事件,進而也就達到了當資源變化時通知 Controller 主程向同步佇列寫入task的目的。

關於syncQueue,可追溯到internal/ingress/controller/nginx。go # NewNGINX-Controller():

// NewNGINXController creates a new NGINX Ingress controller。func NewNGINXController(config *Configuration, mc metric。Collector, fs file。Filesystem) *NGINXController { 。。。 n。syncQueue = task。NewTaskQueue(n。syncIngress) 。。。}

佇列的建立是透過 task。NewTaskQueue() 完成的,而且傳入了關鍵的處理函式 n。syncIngress。繼續跟蹤到 internal/task/queue。go#NewTaskQueue():

// NewTaskQueue creates a new task queue with the given sync function。// The sync function is called for every element inserted into the queue。func NewTaskQueue(syncFn func(interface{}) error) *Queue { return NewCustomTaskQueue(syncFn, nil)} // NewCustomTaskQueue 。。。func NewCustomTaskQueue(syncFn func(interface{}) error, fn func(interface{}) (interface{}, error)) *Queue { q := &Queue{ queue: workqueue。NewRateLimitingQueue(workqueue。DefaultControllerRateLimiter()), sync: syncFn, workerDone: make(chan bool), fn: fn, } 。。。 return q}

可以看出,傳入的處理函式 n。syncIngress 被賦值給 Queue 的 sync 屬性了。實際上,syncQueue 的執行就是在反覆執行該方法以消費佇列裡的元素。有關Queue 的 Run 定義可以追溯至:

// Run starts processing elements in the queuefunc (t *Queue) Run(period time。Duration, stopCh <-chan struct{}) { wait。Until(t。worker, period, stopCh)} // worker processes work in the queue through sync。func (t *Queue) worker() { for { key, quit := t。queue。Get() 。。。 if err := t。sync(key); err != nil { t。queue。AddRateLimited(Element{ Key: item。Key, Timestamp: time。Now()。UnixNano(), }) } else { t。queue。Forget(key) t。lastSync = ts } t。queue。Done(key) }}

同步佇列協程的主要工作就是定期取出佇列裡的元素,並利用傳入的 n。syncIngress (即 t。sync(key))方法處理佇列裡的元素。n。syncIngress 方法的定義在 internal/-ingress-/controller/controller。go#syncIngress():

// syncIngress collects all the pieces required to assemble the NGINX// configuration file and passes the resulting data structures to the backend// (OnUpdate) when a reload is deemed necessary。func (n *NGINXController) syncIngress(interface{}) error { // 獲取最新配置資訊 。。。。 // 構造 nginx 配置 pcfg := &ingress。Configuration{ Backends: upstreams, Servers: servers, PassthroughBackends: passUpstreams, BackendConfigChecksum: n。store。GetBackendConfiguration()。Checksum, } 。。。 // 不能避免 reload,就執行 reload 更新配置 if !n。IsDynamicConfigurationEnough(pcfg) { 。。。 err := n。OnUpdate(*pcfg) 。。。 } 。。。 // 動態更新配置 err := wait。ExponentialBackoff(retry, func() (bool, error) { err := configureDynamically(pcfg, n。cfg。ListenPorts。Status, n。cfg。DynamicCertificatesEnabled) 。。。 }) 。。。}

關於動態更新的工作機制,函式定義位於 internal/ingress/controller/nginx。go#-configureDynamically():

/ configureDynamically encodes new Backends in JSON format and POSTs the// payload to an internal HTTP endpoint handled by Lua。func configureDynamically(pcfg *ingress。Configuration, port int, isDynamicCertificatesEnabled bool) error { backends := make([]*ingress。Backend, len(pcfg。Backends)) for i, backend := range pcfg。Backends { var service *apiv1。Service if backend。Service != nil { service = &apiv1。Service{Spec: backend。Service。Spec} } luaBackend := &ingress。Backend{ Name: backend。Name, Port: backend。Port, SSLPassthrough: backend。SSLPassthrough, SessionAffinity: backend。SessionAffinity, UpstreamHashBy: backend。UpstreamHashBy, LoadBalancing: backend。LoadBalancing, Service: service, NoServer: backend。NoServer, TrafficShapingPolicy: backend。TrafficShapingPolicy, AlternativeBackends: backend。AlternativeBackends, } var endpoints []ingress。Endpoint for _, endpoint := range backend。Endpoints { endpoints = append(endpoints, ingress。Endpoint{ Address: endpoint。Address, Port: endpoint。Port, }) } luaBackend。Endpoints = endpoints backends[i] = luaBackend } url := fmt。Sprintf(“http://localhost:%d/configuration/backends”, port) err := post(url, backends) if err != nil { return err } if isDynamicCertificatesEnabled { err = configureCertificates(pcfg, port) if err != nil { return err } } return nil}

結合原始碼:透過請求 Lua 後端來實現動態配置更新的,使用的是典型的 http post 方法。後續的動態更新動作轉交給 Lua 模組負責。因為 Lua 以模組形式嵌入 Nginx 執行,因此其更新配置也就在一定程度上避免了 reload。

至於reload 配置的函式定義,可參考:internal/ingress/controller/nginx。go#OnUpdate-()。

- EOF -

如果您喜歡本文,歡迎

點贊

收藏

留言

,或者點選右下角,把文章分享給你的朋友們~~~

編排系統K8S Ingress-nginx原始碼解析

編排系統K8S Ingress-nginx原始碼解析

Luga Lee

“路,在自己腳下~”