Go語言進階之路(五):通道和goroutine、GPM

通道

Go語言可以有效地利用多核CPU,併發效能好,這正是由於goroutine和通道還有GPM模型的原因。

我們知道,Python語言由於全域性鎖GIL,單個Python應用的多執行緒程式碼沒辦法有效利用多核CPU,只能寫多程序來利用多核CPU。如果用標準庫的multiprocessing會對監控和管理造成不少的挑戰。部署Python應用的時候通常是每個CPU核部署一個應用,這樣會浪費不少資源。Go語言的goroutine和GPM模型效能遠遠超越Python的併發模型。先來看看通道和goroutine。

Go語言中通道型別是chan關鍵字,透過make來建立。可以建立帶緩衝的通道和無緩衝的通道。

建立通道

unbuffered := make(chan int)  // 無緩衝的整型通道buffered := make(chan string, 10)  // 有緩衝的字元型通道

緩衝通道和無緩衝通道有什麼區別?

無緩衝通道沒有容量,我們想要往通道中傳送資料,那麼一定要有另一個操作在往這個通道取資料,否則,傳送資料的那段程式碼也會被阻塞。

緩衝通道只是有一段容量可以緩衝一下,當緩衝區未滿時,往通道中傳送資料不會阻塞當前操作;當緩衝區滿了時往通道中傳送資料,會想無緩衝通道那樣阻塞。

傳送資料

buffered <- “Gopher”  // 透過通道傳送一個字串

接收資料

received :=  <- buffered        // 透過通道接收一個字串<- unbuffered                   // 接收並丟棄字串,如果unbuffered通道里沒資料,則會一直阻塞直到有資料放入receiver, ok := <- bufferedif !ok {    xxx                         // 通道被關閉等問題}

關閉通道

通常來說,我們往通道里傳送完資料後,我們應該關閉通道(一般來說由生產者來關閉通道),這就相當於告訴消費者,我的資料傳送完了,你取完了資料之後就離開吧。

我們應該在所有資料都放入通道後再關閉通道,否則,如果先關閉通道,再往裡放資料的話會產生panic。看下面的例子:

close(buffered)go func() {  for {    j, more := <-jobs        // 如果通道已被關閉,並且資料已全被取走,則more為false    if more {      fmt。Println(“received job”, j)    } else {      fmt。Println(“received all jobs”)      done <- true      return    }  }}()for post := 1; post <= taskLoad; post++ {  tasks <- fmt。Sprintf(“Task : %d”, post)}// 當所有工作都處理完時關閉通道,以便所有goroutine退出close(tasks)通道可以用來實現同步功能:func worker(done chan bool) {  fmt。Print(“working。。。”)  time。Sleep(time。Second)  fmt。Println(“done”)  done <- true  close(done)}func main() {  done := make(chan bool)  go worker(done)  <-done                        // 阻塞,直到通道中有資料}

單向通道

預設地,我們使用make(ch chan int)建立的通道都是雙向通道,我們可以往裡面傳送資料也可以從裡面讀取資料。我們也可以在定義通道的地方指明通道的方向,看下面的例子:

func ping(pings chan<- string, msg string) {  // pings單向通道,函式中只能往pings傳送資料  pings <- msg}func pong(pings <-chan string, pongs chan<- string) {  // pings單向通道,函式中只能從pings接收資料  // pongs單向通道,函式中只能往pongs傳送資料  msg := <-pings  pongs <- msg}

宣告單向通道最大的好處就是可以限制程式的行為。比如我們在定義介面給其他人呼叫時,我們就可以指明通道為單向通道,介面使用者只能往通道傳送資料或者只能從通道讀取資料。

生產者消費者問題

生產者只往通道里面傳送資料,傳送完後關閉通道。消費者只從通道里面讀取資料。這可比Java中使用wait/notify來實現生產者消費者方便多了。

func producer(ch chan<- int) {  for i := 1; i <= 10; i++ {    ch <- i  }  close(ch)}func consumer(ch <-chan int) {  for i := range ch {    fmt。Println(“consumed something: ”, i)  }}func main() {  ch := make(chan int)  go producer(ch)  go consumer(ch)  time。Sleep(time。Second)}

goroutine

Go語言中的goroutine,一般也稱作協程,不由OS排程,而是使用者層自行釋放CPU,從而在執行體之間切換排程執行。Go語言在底層進行協助實現。涉及系統呼叫的地方由Go標準庫協助釋放CPU。總之,協程執行不透過OS進行切換,由Go語言自行切換,系統執行開支大大降低。一個程序內部可以執行多個執行緒,而每個執行緒又可以執行很多協程。執行緒要負責對協程進行排程。當一個協程睡眠時,它將執行緒的執行權讓給其它的協程來執行。同一個執行緒內部最多隻會有一個協程正在執行。

建立Goroutine

建立goroutine很方便,建立個匿名函式並呼叫,然後在前面加上go關鍵字就可以了。

go func(msg string) {    fmt。Println(msg)}(“going”)

還可以把函式賦給一個變數,然後建立goroutine執行:

var myFunc = func(msg string) {    fmt。Println(msg)}go myFunc(“going”)

主協程和子協程

Go語言所有程式碼都執行在協程中,由Go執行時進行排程,其中main函式所在的協程叫做主協程,其餘的協程叫做子協程。

func main() {  for i := 0; i < 5; i++ {    go func(i int) {      fmt。Print(i, “ ”)    }(i)  }  time。Sleep(time。Second)}// 輸出:0 4 3 1 2

當主協程執行完而子協程未執行完時,所有子協程會隨著主協程的結束而退出執行。上面的程式碼輸出“0 4 3 1 2 ”,這是因為建立完協程後放入執行時佇列到真正執行有個過程,哪個協程先被放入執行時佇列,哪個就能先執行。

執行緒的排程是由作業系統負責的,排程演算法執行在核心態,而協程的呼叫是由 Go 語言的執行時負責的,排程演算法執行在使用者態。這就使得Go語言的協程比Java的執行緒更加輕量級。單一的Java應用建立幾十上百個執行緒並執行就已經非常消耗記憶體和CPU資源了,而單一的Go應用可以建立上千萬的goroutine還正常地執行。

GPM

先回顧一下Java中執行緒和作業系統核心執行緒的對應關係。Java中執行緒模型如下:

Go語言進階之路(五):通道和goroutine、GPM

Java執行緒的實現方式是透過作業系統提供的高階程式設計介面——LWP(輕量級程序)來對應到核心執行緒上的,一個Java執行緒對應到一個LWP,對應到一個KLT(核心執行緒)。當Java執行緒阻塞,需要排程時,由核心排程器來進行切換,把當前KLT-LWP對應的Java執行緒切換到另一個Java執行緒,這種切換操作在核心態中進行,這種切換是很耗時的(相對CPU來說)。Java的一個執行緒對應到一個核心執行緒,一般建立Java執行緒佔用的記憶體都大於1M。

Go語言使用了使用者級執行緒的實現方式,Go語言中的goroutine可以理解為使用者態的執行緒,排程切換goroutine直接在使用者態進行,不用切換到核心態。

Go 排程器模型我們通常叫做GPM 模型,包括 4 個重要結構:

G:Goroutine,每個 Goroutine 對應一個 G 結構體,我們使用go關鍵字建立goroutine,並非就一定建立了G結構體的例項,只有當沒有可用的G時,才會建立G來裝載我們建立的goroutine,否則,會複用現有可用的G來裝載goroutine。G 儲存 Goroutine 的執行堆疊、狀態以及任務函式,可重用。G 並非執行體,每個 G 需要繫結到 P 才能被排程執行。

P: Processor,表示邏輯處理器,對 G 來說,P 相當於 CPU 核心,G 只有繫結到 P 才能被排程。對 M 來說,P 提供了相關的執行環境(Context),如記憶體分配狀態(mcache),任務佇列(G)等。P 的數量決定了系統內最大可並行的 G 的數量(前提:物理 CPU 核數 >= P 的數量)。P 的數量由使用者設定的 GOMAXPROCS 決定,但是不論 GOMAXPROCS 設定為多大,P 的數量最大為 256。

M: Machine,OS 核心執行緒抽象,代表著真正執行計算的資源,在繫結有效的 P 後,進入 schedule 迴圈;而 schedule 迴圈的機制大致是從 Global 佇列、P 的 Local 佇列以及 wait 佇列中獲取。M 的數量是不定的,由 Go Runtime 調整,為了防止建立過多 OS 執行緒導致系統排程不過來,目前預設最大限制為 10000 個。M 並不保留 G 狀態,這是 G 可以跨 M 排程的基礎。

Sched:Go 排程器,它維護有儲存 M 和 G 的佇列以及排程器的一些狀態資訊等。

GPM和Sched的關係如下:

Go語言進階之路(五):通道和goroutine、GPM

​Go語言排程器Sched擁有全域性執行佇列(GPQ)和P的本地執行佇列(LRQ)。GRQ中維護著可以執行但是沒有繫結到P的所有goroutine,LRQ中維護著繫結到P的可執行的所有goroutine。可以看到,上圖中有一個繫結到P的goroutine正在M上執行,M對應到作業系統核心執行緒。

當正在M上執行的G在某些情況阻塞時,Sched可以直接把P上其他可執行的G排程進來,這種排程操作在使用者態進行,十分輕量級。而且,Go語言中建立的goroutine只需要大約佔用2KB的記憶體,比Java中執行緒佔用空間小多了。因此,Go語言程式可以輕鬆hold住百萬goroutine的排程執行。

GPM原始碼和Sched原始碼如下,從原始碼中,我們更能清楚GPM之間的關係:

G

G中會儲存goroutine執行函式,goroutine的id,還有被繫結執行的M的指標:

struct G { uintptr stackguard; // 分段棧的可用空間下界 uintptr stackbase; // 分段棧的棧基址 Gobuf sched; //程序切換時,利用sched域來儲存上下文 uintptr stack0; FuncVal* fnstart; // goroutine執行的函式 void* param; // 用於傳遞引數,睡眠時其它goroutine設定param,喚醒時此goroutine可以獲取 int16 status; // 狀態Gidle,Grunnable,Grunning,Gsyscall,Gwaiting,Gdead int64 goid; // goroutine的id號 G* schedlink; M* m; // for debuggers, but offset not hard-coded M* lockedm; // G被鎖定只能在這個m上執行 uintptr gopc; // 建立這個goroutine的go表示式的pc 。。。};

P

P中含有本地執行佇列LRQ,P當前的狀態:

struct P { Lock; uint32 status; // Pidle或Prunning等 P* link; uint32 schedtick; // 每次排程時將它加一 M* m; // 連結到它關聯的M (nil if idle) MCache* mcache; G* runq[256]; int32 runqhead; int32 runqtail; // Available G‘s (status == Gdead) G* gfree; int32 gfreecnt; byte pad[64];};

M

M中包含了繫結到M的P的指標,正在M上執行的G的指標,M的狀態等等:

struct M { G* g0; // 帶有排程棧的goroutine G* gsignal; // signal-handling G 處理訊號的goroutine void (*mstartfn)(void); G* curg; // M中當前執行的goroutine P* p; // 關聯P以執行Go程式碼 (如果沒有執行Go程式碼則P為nil) P* nextp; int32 id; int32 mallocing; //狀態 int32 throwing; int32 gcing; int32 locks; int32 helpgc; //不為0表示此m在做幫忙gc。helpgc等於n只是一個編號 bool blockingsyscall; bool spinning; Note park; M* alllink; // 這個域用於連結allm M* schedlink; MCache *mcache; G* lockedg; M* nextwaitm; // next M waiting for lock GCStats gcstats; 。。。};

Sched

Sched包含當前空閒的Md指標,當前空閒的P的指標,全域性執行佇列GRQ等等:

struct Sched { Lock; uint64 goidgen; M* midle; // idle m’s waiting for work int32 nmidle; // number of idle m‘s waiting for work int32 nmidlelocked; // number of locked m’s waiting for work int3 mcount; // number of m‘s that have been created int32 maxmcount; // maximum number of m’s allowed (or die) P* pidle; // idle P‘s uint32 npidle; //idle P的數量 uint32 nmspinning; // Global runnable queue。 G* runqhead; G* runqtail; int32 runqsize; // Global cache of dead G’s。 Lock gflock; G* gfree; int32 stopwait; Note stopnote; uint32 sysmonwait; Note sysmonnote; uint64 lastpoll; int32 profilehz; // cpu profiling rate}