全面解讀Golang的channel設計

在Go中,要理解channel,首先需要認識goroutine。

一、為什麼會有goroutine

現代作業系統中為我們提供了三種基本的構造併發程式的方法:多程序、I/O多路複用和多執行緒。其中最簡單的構造方式當屬多程序,但是多程序的併發程式,由於對程序控制和程序間通訊開銷巨大,這樣的併發方式往往會很慢。

因此,作業系統提供了更小粒度的執行單元:執行緒(確切叫法是核心執行緒)。它是一種執行在程序上下文中的邏輯流,執行緒之間透過作業系統來排程,其排程模型如下圖所示。

全面解讀Golang的channel設計

多執行緒的併發方式,相較於多程序而言要快得多。但是由於執行緒上下文切換總是不可避免的陷入核心態,它的開銷依然較大。那麼有沒有不必陷入核心態的執行載體呢?有,使用者級執行緒。 使用者級執行緒的切換由使用者程式自己控制,不需要核心干涉,因此少了進出核心態的消耗。

全面解讀Golang的channel設計

這裡的使用者級執行緒就是協程(coroutine),它們的切換由執行時系統來統一排程管理,核心並不知道它的存在。協程是抽象於核心執行緒之上的物件,一個核心執行緒可以對應多個協程。但最終的系統呼叫仍然需要核心執行緒來完成。注意,執行緒的排程是作業系統來管理,是一種搶佔式排程。而協程不同,協程之間需要合作,會主動交出執行權,是一種協作式排程,這也是為何被稱為協程的原因。

Go天生在語言層面支援了協程,即我們常說的goroutine。Go的runtime系統實現的是一種M:N排程模型,透過GMP物件來描述,其中G代表的就是協程,M是執行緒,P是排程上下文。在Go程式中,一個goroutine就代表著一個最小使用者程式碼執行流,它們也是併發流的最小單元。

二、channel的存在定位

從記憶體的角度而言,併發模型只分兩種:基於共享記憶體和基於訊息通訊(記憶體複製)。在Go中,兩種併發模型的同步

原語

均有提供:sync。*和atomic。*代表的就是基於共享記憶體;channel代表的就是基於訊息通訊。而Go提倡後者,它包括三大元素:goroutine(執行體),channel(通訊),select(協調)。

Do not communicate by sharing memory; instead, share memory by communicating。

在Go中透過goroutine+channel的方式,可以簡單、高效地解決併發問題,channel就是goroutine之間的資料橋樑。

Concurrency is the key to designing high performance network services。 Go‘s concurrency primitives (goroutines and channels) provide a simple and efficient means of expressing concurrent execution。

以下是一個簡單的channel使用示例程式碼。

func goroutineA(ch <-chan int) { fmt。Println(“[goroutineA] want a data”) val := <- ch fmt。Println(“[goroutineA] received the data”, val)}func goroutineB(ch chan<- int) { time。Sleep(time。Second*1) ch <- 1 fmt。Println(“[goroutineB] send the data 1”)}func main() { ch := make(chan int, 1) go goroutineA(ch) go goroutineB(ch) time。Sleep(2*time。Second)}

上述過程趣解圖如下

全面解讀Golang的channel設計

全面解讀Golang的channel設計

全面解讀Golang的channel設計

全面解讀Golang的channel設計

三、channel原始碼解析

channel原始碼位於src/go/runtime/chan。go。本章內容分為兩部分:channel內部結構和channel操作。

3。1 channel內部結構

ch := make(chan int,2)

對於以上channel的申明語句,我們可以在程式中加入斷點,得到ch的資訊如下。

全面解讀Golang的channel設計

很好,看起來非常的清晰。但是,這些資訊代表的是什麼含義呢?接下來,我們先看幾個重要的結構體。

hchan

當我們透過make(chan Type, size)生成channel時,在runtime系統中,生成的是一個hchan結構體物件。原始碼位於src/runtime/chan。go

type hchan struct { qcount uint // 迴圈佇列中資料數 dataqsiz uint // 迴圈佇列的大小 buf unsafe。Pointer // 指向大小為dataqsize的包含資料元素的陣列指標 elemsize uint16 // 資料元素的大小 closed uint32 // 代表channel是否關閉 elemtype *_type // _type代表Go的型別系統,elemtype代表channel中的元素型別 sendx uint // 傳送索引號,初始值為0 recvx uint // 接收索引號,初始值為0 recvq waitq // 接收等待佇列,儲存試圖從channel接收資料(<-ch)的阻塞goroutines sendq waitq // 傳送等待佇列,儲存試圖傳送資料(ch<-)到channel的阻塞goroutines lock mutex // 加鎖能保護hchan的所有欄位,包括waitq中sudoq物件}

waitq

waitq用於表達處於阻塞狀態的goroutines連結串列資訊,first指向鏈頭goroutine,last指向鏈尾goroutine

type waitq struct { first *sudog last *sudog}

sudug

sudog代表的就是一個處於等待列表中的goroutine物件,原始碼位於src/runtime/runtime2。go

type sudog struct { g *g next *sudog prev *sudog elem unsafe。Pointer // data element (may point to stack) c *hchan // channel 。。。}

為了更好理解hchan結構體,我們將透過以下程式碼來理解hchan中的欄位含義。

package mainimport “time”func goroutineA(ch chan int) { ch <- 100}func goroutineB(ch chan int) { ch <- 200}func goroutineC(ch chan int) { ch <- 300}func goroutineD(ch chan int) { ch <- 300}func main() { ch := make(chan int, 4) for i := 0; i < 4; i++ { ch <- i * 10 } go goroutineA(ch) go goroutineB(ch) go goroutineC(ch) go goroutineD(ch) // 第一個sleep是為了給上足夠的時間讓所有goroutine都已啟動 time。Sleep(time。Millisecond * 500) time。Sleep(time。Second)}

開啟程式碼除錯功能,將程式執行至斷點time。Sleep(time。Second)處,此時得到的chan資訊如下。

全面解讀Golang的channel設計

在該channel中,透過make(chan int, 4)定義的channel大小為4,即dataqsiz的值為4。同時由於迴圈佇列中已經添加了4個元素,所以qcount值也為4。此時,有4個goroutine(A-D)想傳送資料給channel,但是由於存放資料的迴圈佇列已滿,所以只能進入傳送等待列表,即sendq。同時要注意到,此時的傳送和接收索引值均為0,即下一次接收資料的goroutine會從迴圈佇列的第一個元素拿,傳送資料的goroutine會發送到迴圈佇列的第一個位置。

上述hchan結構視覺化圖解如下

全面解讀Golang的channel設計

3。2 channel操作

將channel操作分為四部分:建立、傳送、接收和關閉。

建立

本文的參考Go版本為1。15。2。其channel的建立實現程式碼位於src/go/runtime/chan。go的makechan方法。

func makechan(t *chantype, size int) *hchan { elem := t。elem // 傳送元素大小限制 if elem。size >= 1<<16 { throw(“makechan: invalid channel element type”) } // 對齊檢查 if hchanSize%maxAlign != 0 || elem。align > maxAlign { throw(“makechan: bad alignment”) } // 判斷是否會記憶體溢位 mem, overflow := math。MulUintptr(elem。size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError(“makechan: size out of range”)) } // 為構造的hchan物件分配記憶體 var c *hchan switch { // 無緩衝的channel或者元素大小為0的情況 case mem == 0: c = (*hchan)(mallocgc(hchanSize, nil, true)) c。buf = c。raceaddr() // 元素不包含指標的情況 case elem。ptrdata == 0: c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) c。buf = add(unsafe。Pointer(c), hchanSize) // 元素包含指標 default: c = new(hchan) c。buf = mallocgc(mem, elem, true) } // 初始化相關引數 c。elemsize = uint16(elem。size) c。elemtype = elem c。dataqsiz = uint(size) lockInit(&c。lock, lockRankHchan) if debugChan { print(“makechan: chan=”, c, “; elemsize=”, elem。size, “; dataqsiz=”, size, “\n”) } return c}

可以看到,makechan方法主要就是檢查傳送元素的合法性,併為hchan分配記憶體,初始化相關引數,包括對鎖的初始化。

傳送

channel的傳送實現程式碼位於src/go/runtime/chan。go的chansend方法。傳送過程,存在以下幾種情況。

當傳送的channel為nil

if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw(“unreachable”)}

往一個nil的channel中傳送資料時,呼叫gopark函式將當前執行的goroutine從running態轉入waiting態。

往已關閉的channel中傳送資料

if c。closed != 0 { unlock(&c。lock) panic(plainError(“send on closed channel”)) }

如果向已關閉的channel中傳送資料,會引發panic。

如果已經有阻塞的接收goroutines(即recvq中指向非空),那麼資料將被直接傳送給接收goroutine。

if sg := c。recvq。dequeue(); sg != nil { // Found a waiting receiver。 We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any)。 send(c, sg, ep, func() { unlock(&c。lock) }, 3) return true}

該邏輯的實現程式碼在send方法和sendDirect中。

func send(c *hchan, sg *sudog, ep unsafe。Pointer, unlockf func(), skip int) { 。。。 // 省略了競態程式碼 if sg。elem != nil { sendDirect(c。elemtype, sg, ep) sg。elem = nil } gp := sg。g unlockf() gp。param = unsafe。Pointer(sg) if sg。releasetime != 0 { sg。releasetime = cputicks() } goready(gp, skip+1)}func sendDirect(t *_type, sg *sudog, src unsafe。Pointer) { dst := sg。elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t。size) memmove(dst, src, t。size)}

其中,memmove我們已經在原始碼系列中遇到多次了,它的目的是將記憶體中src的內容複製至dst中去。另外,注意到goready(gp, skip+1)這句程式碼,它會使得之前在接收等待佇列中的第一個goroutine的狀態變為runnable,這樣go的排程器就可以重新讓該goroutine得到執行。

對於有緩衝的channel來說,如果當前緩衝區hchan。buf有可用空間,那麼會將資料複製至緩衝區

if c。qcount < c。dataqsiz { qp := chanbuf(c, c。sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c。elemtype, qp, ep) // 傳送索引號+1 c。sendx++ // 因為儲存資料元素的結構是迴圈佇列,所以噹噹前索引號已經到隊末時,將索引號調整到隊頭 if c。sendx == c。dataqsiz { c。sendx = 0 } // 當前迴圈佇列中儲存元素數+1 c。qcount++ unlock(&c。lock) return true}

其中,chanbuf(c, c。sendx)是獲取指向對應記憶體區域的指標。typememmove會呼叫memmove方法,完成資料的複製工作。另外注意到,當對hchan進行實際操作時,是需要呼叫lock(&c。lock)加鎖,因此,在完成資料複製後,透過unlock(&c。lock)將鎖釋放。

有緩衝的channel,當hchan。buf已滿;或者無緩衝的channel,當前沒有接收的goroutine

gp := getg()mysg := acquireSudog()mysg。releasetime = 0if t0 != 0 { mysg。releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp。waiting where copystack can find it。mysg。elem = epmysg。waitlink = nilmysg。g = gpmysg。isSelect = falsemysg。c = cgp。waiting = mysggp。param = nilc。sendq。enqueue(mysg)gopark(chanparkcommit, unsafe。Pointer(&c。lock), waitReasonChanSend, traceEvGoBlockSend, 2)

透過getg獲取當前執行的goroutine。acquireSudog是先獲得當前執行goroutine的執行緒M,再獲取M對應的P,最後將P的sudugo快取佇列中的隊頭sudog取出(詳見原始碼src/runtime/proc。go)。透過c。sendq。enqueue將sudug加入到channel的傳送等待列表中,並呼叫gopark將當前goroutine轉為waiting態。

傳送操作會對hchan加鎖。

當recvq中存在等待接收的goroutine時,資料元素將會被直接複製給接收goroutine。

當recvq等待佇列為空時,會判斷hchan。buf是否可用。如果可用,則會將傳送的資料複製至hchan。buf中。

如果hchan。buf已滿,那麼將當前傳送goroutine置於sendq中排隊,並在執行時中掛起。

向已經關閉的channel傳送資料,會引發panic。

對於無緩衝的channel來說,它天然就是hchan。buf已滿的情況,因為它的hchan。buf的容量為0。

package mainimport “time”func main() { ch := make(chan int) go func(ch chan int) { ch <- 100 }(ch) time。Sleep(time。Millisecond * 500) time。Sleep(time。Second)}

在上述示例中,傳送goroutine向無緩衝的channel傳送資料,但是沒有接收goroutine。將斷點置於time。Sleep(time。Second),得到此時ch結構如下。

全面解讀Golang的channel設計

可以看到,在無緩衝的channel中,其hchan的buf長度為0,當沒有接收groutine時,傳送的goroutine將被置於sendq的傳送佇列中。

接收

channel的接收實現分兩種,v :=<-ch對應於chanrecv1,v, ok := <- ch對應於chanrecv2,但它們都依賴於位於src/go/runtime/chan。go的chanrecv方法。

func chanrecv1(c *hchan, elem unsafe。Pointer) { chanrecv(c, elem, true)}func chanrecv2(c *hchan, elem unsafe。Pointer) (received bool) { _, received = chanrecv(c, elem, true) return}

chanrecv的詳細程式碼此處就不再展示,和chansend邏輯對應,具體處理準則如下。

接收操作會對hchan加鎖。

當sendq中存在等待發送的goroutine時,意味著此時的hchan。buf已滿(無快取的天然已滿),分兩種情況(見程式碼src/go/runtime/chan。go的recv方法):1。 如果是有快取的hchan,那麼先將緩衝區的資料複製給接收goroutine,再將sendq的隊頭sudog出隊,將出隊的sudog上的元素複製至hchan的快取區。 2。 如果是無快取的hchan,那麼直接將出隊的sudog上的元素複製給接收goroutine。兩種情況的最後都會喚醒出隊的sudog上的傳送goroutine。

當sendq傳送佇列為空時,會判斷hchan。buf是否可用。如果可用,則會將hchan。buf的資料複製給接收goroutine。

如果hchan。buf不可用,那麼將當前接收goroutine置於recvq中排隊,並在執行時中掛起。

與傳送不同的是,當channel關閉時,goroutine還能從channel中獲取資料。如果recvq等待列表中有goroutines,那麼它們都會被喚醒接收資料。如果hchan。buf中還有未接收的資料,那麼goroutine會接收緩衝區中的資料,否則goroutine會獲取到元素的零值。

以下是channel關閉之後,接收goroutine的讀取示例程式碼。

func main() { ch := make(chan int, 1) ch <- 10 close(ch) a, ok := <-ch fmt。Println(a, ok) b, ok := <-ch fmt。Println(b, ok) c := <-ch fmt。Println(c)}//輸出如下10 true0 false0

注意:在channel中進行的所有元素轉移都伴隨著記憶體的複製。

func main() { type Instance struct { ID int name string } var ins = Instance{ID: 1, name: “Golang”} ch := make(chan Instance, 3) ch <- ins fmt。Println(“ins的原始值:”, ins) ins。name = “Python” go func(ch chan Instance) { fmt。Println(“channel接收值:”, <-ch) }(ch) time。Sleep(time。Second) fmt。Println(“ins的最終值:”, ins)}// 輸出結果ins的原始值: {1 Golang}channel接收值: {1 Golang}ins的最終值: {1 Python}

前半段圖解如下

全面解讀Golang的channel設計

後半段圖解如下

全面解讀Golang的channel設計

注意,如果把channel傳遞型別替換為Instance指標時,那麼儘管channel存入到buf中的元素已經是複製物件了,從channel中取出又被複製了一次。但是由於它們的型別是Instance指標,複製物件與原始物件均會指向同一個記憶體地址,修改原有元素物件的資料時,會影響到取出資料。

func main() { type Instance struct { ID int name string } var ins = &Instance{ID: 1, name: “Golang”} ch := make(chan *Instance, 3) ch <- ins fmt。Println(“ins的原始值:”, ins) ins。name = “Python” go func(ch chan *Instance) { fmt。Println(“channel接收值:”, <-ch) }(ch) time。Sleep(time。Second) fmt。Println(“ins的最終值:”, ins)}// 輸出結果ins的原始值: &{1 Golang}channel接收值: &{1 Python}ins的最終值: &{1 Python}

因此,在使用channel時,儘量避免傳遞指標,如果傳遞指標,則需謹慎。

關閉

channel的關閉實現程式碼位於src/go/runtime/chan。go的chansend方法,詳細執行邏輯已透過註釋寫明。

func closechan(c *hchan) { // 如果hchan物件為nil,則會引發painc if c == nil { panic(plainError(“close of nil channel”)) } // 對hchan加鎖 lock(&c。lock) // 不同多次呼叫close(c chan<- Type)方法,否則會引發painc if c。closed != 0 { unlock(&c。lock) panic(plainError(“close of closed channel”)) } if raceenabled { callerpc := getcallerpc() racewritepc(c。raceaddr(), callerpc, funcPC(closechan)) racerelease(c。raceaddr()) } // close標誌 c。closed = 1 // gList代表Go的GMP排程的G集合 var glist gList // 該for迴圈是為了釋放recvq上的所有等待接收sudog for { sg := c。recvq。dequeue() if sg == nil { break } if sg。elem != nil { typedmemclr(c。elemtype, sg。elem) sg。elem = nil } if sg。releasetime != 0 { sg。releasetime = cputicks() } gp := sg。g gp。param = nil if raceenabled { raceacquireg(gp, c。raceaddr()) } glist。push(gp) } // 該for迴圈會釋放sendq上的所有等待發送sudog for { sg := c。sendq。dequeue() if sg == nil { break } sg。elem = nil if sg。releasetime != 0 { sg。releasetime = cputicks() } gp := sg。g gp。param = nil if raceenabled { raceacquireg(gp, c。raceaddr()) } glist。push(gp) } // 釋放sendq和recvq之後,hchan釋放鎖 unlock(&c。lock) // 將上文中glist中的加入的goroutine取出,讓它們均變為runnable(可執行)狀態,等待排程器執行 // 注意:我們上文中分析過,試圖向一個已關閉的channel傳送資料,會引發painc。 // 所以,如果是釋放sendq中的goroutine,它們一旦得到執行將會引發panic。 for !glist。empty() { gp := glist。pop() gp。schedlink = 0 goready(gp, 3) }}

關於關閉操作,有幾個點需要注意一下。

如果關閉已關閉的channel會引發painc。

對channel關閉後,如果有阻塞的讀取或傳送goroutines將會被喚醒。讀取goroutines會獲取到hchan的已接收元素,如果沒有,則獲取到元素零值;傳送goroutine的執行則會引發painc。

對於第二點,我們可以很好利用這一特性來實現對程式執行流的控制(類似於sync。WaitGroup的作用),以下是示例程式程式碼。

func main() { ch := make(chan struct{}) // go func() { // do something work。。。 // when work has done, call close() close(ch) }() // waiting work done <- ch // other work continue。。。}

四、總結

channel是Go中非常強大有用的機制,為了更有效地使用它,我們必須瞭解它的實現原理,這也是寫作本文的目的。

hchan結構體有鎖的保證,對於併發goroutine而言是安全的

channel接收、傳送資料遵循FIFO(First In First Out)原語

channel的資料傳遞依賴於記憶體複製

channel能阻塞(gopark)、喚醒(goready)goroutine

所謂無快取的channel,它的工作方式就是直接傳送goroutine複製資料給接收goroutine,而不透過hchan。buf

另外,可以看到Go在channel的設計上權衡了簡單與效能。為了簡單性,hchan是有鎖的結構,因為有鎖的佇列會更易理解和實現,但是這樣會損失一些效能。考慮到整個 channel 操作帶鎖的成本較高,其實官方也曾考慮過使用無鎖 channel 的設計,但是由於目前已有提案中(https://github。com/golang/go/issues/8899),無鎖實現的channel可維護性差、且實際效能測試不具有說服力,而且也不符合Go的簡單哲學,因此官方目前為止並沒有採納無鎖設計。

在效能上,有一點,我們需要認識到:所謂channel中阻塞goroutine,只是在runtime系統中被blocked,它是使用者層的阻塞。而實際的底層核心執行緒不受影響,它仍然是unblocked的。

參考連結

https://speakerdeck。com/kavya719/understanding-channels

https://codeburst。io/diving-deep-into-the-golang-channels-549fd4ed21a8

https://github。com/talkgo/night/issues/450