從併發模型看 Go 的語言設計

傳統的程式語言設計都不會將輸入輸出作為語言的核心,但 Tony Hoare 認為輸入輸出是基本的程式設計原語,且通訊順序程序(Communicating sequential processes,CSP)的並行組合(這裡可能用「併發」會更為準確)是基本的程式組織方法。Go 語言的併發設計就是基於 CSP 模型的。

在最初的 CSP 模型中,程式總由若干個可以相互通訊的程序構成,其中每一個程序內部是順序執行的(這也就是 CSP 名稱的含義)。注意這裡的「程序」並不一定指作業系統中的程序,也不一定是作業系統的執行緒,它是一個抽象的概念,代表一組計算的序列,例如 goroutine 這種在應用層排程的計算序列也算 CSP 中的「P」。與 Go 語言不同的地方在於,這個最初的設計中並沒有通訊管道的概念,每個程序是直接和另一個程序進行通訊的,但在 Go 語言中,goroutine 是匿名的,一個 goroutine 並沒有辦法直接將訊息發給另一個 goroutine,為了實現 goroutine 之間的通訊,Go 語言提供了 first class 的 channel,訊息透過 channel 來從一個 goroutine 發到另一個 goroutine。而且,Go 語言也不要求 goroutine 內部是順序執行的,goroutine 內部可以建立更多的 goroutine,併發地完成工作。

下面,我們透過例子說明基於 CSP 模型是如何組織程式的。

階乘計算

首先來看的是一個計算階乘的例子,階乘的一個簡單的遞迴實現可以是這樣的:

fact 0 = 1fact n = n * fact (n - 1)

而基於 CSP 組織程式,我們可以這樣做:

// 階乘計算的實體func FactCalc(in <-chan int, out chan<- int) { var subIn, subOut chan int for { n := <-in if n == 0 { out <- 1 } else { if subIn == nil { subIn, subOut = make(chan int), make(chan int) go FactCalc(subIn, subOut) } subIn <- n - 1 r := <-subOut out <- n * r } }}// 包裝一個階乘計算函式func MakeFactFunc() func(int) int { in, out := make(chan int), make(chan int) go FactCalc(in, out) return func(x int) int { in <- x return <-out }}

MakeFactFunc 就是簡單地封裝一下 FactCalc,獲取一個計算階乘的函式。主要的計算是由 FactCalc 進行的。

每一個 FactCalc 都會被作為一個獨立的 goroutine 來執行,對於第 i 個 goroutine 而言,它先從第 i - 1 個 goroutine 中讀入一個數字 n,然後,如果 n > 0,這個 goroutine 需要做 3 件事:

1。 向第 i + 1 個 goroutine 寫入一個 n - 1

2。 從第 i + 1個 goroutine 處讀回來一個數字 r

3。 將 n * r 寫入第 i - 1 個 goroutine

否則,則向第 i - 1 個 goroutine 處寫入一個 1。

如前所述,由於 Go 語言不支援直接向一個 goroutine 發訊息,所以這裡的訊息收發都要基於 channel 進行。我們可以看到,一旦 FactCalc 發現自己無法完成階乘問題的計算工作,它就會建立另一個 goroutine(只會建立一次),並將子問題傳送給這個 goroutine 進行處理,這會形成一個 FactCalc goroutine 的鏈條,鏈條上的每一個 goroutine 都與前一個和後一個 goroutine 進行通訊(這就是前文所說的「若干個可以相互通訊的程序」)。

我們又了這樣的階乘計算器後,我們可以這麼去使用它:

const limit = 5func main() { fact := MakeFactFunc() for i := 0; i < limit; i++ { fmt。Println(fact(i)) }}

執行程式,我們可以看到這樣的輸出:

112624

相比於直接使用遞迴函式呼叫,這個實現方式非常不直觀。下面這個圖可能能幫助理解:

從併發模型看 Go 的語言設計

這裡的圓形為呼叫者,每一個矩形都為一個 goroutine,當我們嘗試計算 fact(3) 時,會將 3 寫入最前面的 in channel 中,資料開始從第一個 goroutine 向後流動。第一個 goroutine 會從這個 channel 中讀到這個 3,它將 3 - 1 寫入下一個 in channel 中,然後開始阻塞等待 out channel 出現第二個 goroutine 計算的結果,第二個、第三個 goroutine 的計算是類似的,等到第 4 個 goroutine 從 in channel 中讀取輸入時,它發現這是一個 0,於是直接向 out channel 寫入一個 1,此時資料開始從最後一個 goroutine 往回流動,經過第三個和第二個 goroutine 的計算後,第一個 goroutine 會獲得 2,然後將 2 * 3 輸出。

注意到這裡進行階乘計算的實體並不是遞迴的函式,而是併發的 goroutine,它們之間透過 channel 進行通訊, 每個 goroutine 都將計算拆分併發送給其他 goroutine 進行處理,直到計算變為 trivial 的情況。當然了,這個實現相比簡單的遞迴函式會顯得很囉嗦,我們在實際使用中也不會這麼做,但這個例子說明了如何在 CSP 模型下,利用資料的流動實現我們常見的遞迴。

素數篩

下面的一個例子中,我們使用篩法來計算素數。所謂素數篩,大概就是對正整數 2 ~ n 進行遍歷,然後對每一個數字都進行一次篩選,只留下是素數的部分,對於第 i 位的篩選,我們需要依賴前面已經曬出的 m 個素數,當且僅當這 m 個素數都無法整除第 i 位的數字時,這個數字可以透過這一位的篩選,也就是這樣:

primes = primesFilter [2。。] where primesFilter (p:xs) = p : primesFilter [x | x <- xs, x `mod` p /= 0]

上面這個實現利用了 Haskell 的惰性求值能力,但對於大多數語言而言,我們的實現都不可能這麼簡潔,基於傳統的順序計算的思路,程式都會比較囉嗦,而且關鍵是很不清晰。而在 CSP 模型下,我們可以這麼實現:

func Counter(out chan<- int) { for i := 2; ; i++ { out <- i }}func PrimeFilter(prime int, in <-chan int, out chan<- int) { for { i := <-in if i%prime != 0 { out <- i } }}func PrimeSieve(out chan<- int) { c := make(chan int) go Counter(c) for { prime := <-c out <- prime newC := make(chan int) go PrimeFilter(prime, c, newC) c = newC }}

可以看到,我們的素數篩由三個部分組成,首先,Counter 從 2 開始依次產生自然數。PrimeFilter 就是每一層素數的過濾器,每一層過濾器只持有一個輸入 channel 一個輸出 channel 和一個素數 prime,它將不斷從輸入 channel 中讀入數字,並將其中無法被 prime整除的部分輸出。PrimeSieve 則是一個完整的素數篩,它每獲得一個素數,都將素數輸出,並建立一個新一層的過濾器,因此整個過程大概是這樣的:

從併發模型看 Go 的語言設計

PrimeSieve 可以向 out channel 中依次輸出被篩出來的素數,這個過程是惰性的,直到我們從 out channel 中取出素數,下一個素數才會被計算。我們可以這樣去使用它:

func main() { primes := make(chan int) go PrimeSieve(primes) for i := 0; i < 5; i++ { fmt。Println(<-primes) }}

執行程式,我們可以看到這樣的輸出:

235711

從這兩個例子中我們可以看到 CSP 模型不一定是用於平行計算,至少在這兩個例子中,每一個 goroutine 在進行計算之後都在阻塞等待,同一時間事實上僅有一個活躍的 goroutine,但 Go 語言對 CSP 併發模型的支援能讓整個設計變得簡單清晰(「併發」和「並行」的區別可以參考這個影片)。這反映到 Go 語言設計上的要點有兩個:

Goroutine 之間可以透過 channel 來進行通訊,channel 是 first class value,可以被直接傳遞。在這種情況下,goroutine 之間很容易進行協作,共同完成一個計算工作。

Goroutine 十分輕量,可以在單機建立大量 goroutine 而不至於消耗過多效能。對於素數篩的例子,每計算多一個素數都需要多一個 goroutine。而階乘計算的例子,輸入引數 + 1 都需要多一個 goroutine。顯然,如果沒有系統層排程的「process」的支援,CSP 所能應用的範圍就非常侷限了。

下面我們再透過另外一個例子看一下 Go 語言的其他設計點。

訊號量

一個訊號量有兩個操作,分別稱為 V(signal())與P(wait())。其運作方式如下:

初始化,訊號標 S 一個非負數的整數值。

執行 P 操作(wait())時,訊號標 S 的值將嘗試被減少。當訊號標 S 非正數時,程序會阻塞等待;當訊號標 S 為正數時,S 被成功減少,程序可以繼續往下執行。

執行 V 操作(signal())時,訊號標 S 的值將會被增加。

在 CSP 模型下,我們可以這樣實現:

type Semaphore struct { inc chan struct{} dec chan struct{}}func (sem *Semaphore) Wait() { sem。dec <- struct{}{}}func (sem *Semaphore) Signal() { sem。inc <- struct{}{}}func MakeSemaphore(initVal int) *Semaphore { sem := Semaphore{ inc: make(chan struct{}), dec: make(chan struct{}), } go func(s int) { for { if s > 0 { select { case <-sem。inc: s = s + 1 case <-sem。dec: s = s - 1 } } else { <-sem。inc s = s + 1 } } }(initVal) return &sem}

*Semaphore 有兩個操作,分別是 Wait 和 Signal,它們分別向 dec channel 和 incchannel 發訊息。而 MakeSemaphore 中建立的 goroutine 則會根據 s int 狀態的不同選擇不同的操作,如果 s > 0,則從 inc channel 或 dec channel 中隨機讀取一個值,並將 s 的值進行增加/減少 1,否則,從 inc channel 中讀取一個值,並將 s 的值增加 1。注意這裡的「隨機」是非常重要的,如果 inc 和 dec 同時都有資料可讀,則實際從哪個 channel 中讀出資料是不確定的,正是因為 Go 語言的 select 是隨機的,我們才可以在這裡用它來進行排程。顯然,在大多數語言中,如果要實現 channel 這樣的型別,一般是以庫的形式進行實現,而 Go 語言將其上升到了語言層面實現,這樣雖然顯的不夠純粹乾淨,但這樣可以透過更方便的語法實現 select 這樣強大的功能,如果實現為庫的形式,是難以做到這個程度的。

一個簡單的服務模板

在這個例子中,我們將基於 Go 語言實現一個極簡單的服務模板,程式碼如下:

type Input struct { Req interface{} State interface{}}type Output struct { Rsp interface{} State interface{}}type Handler = func(input Input) (Output, error)type Response struct { Result interface{} Error error}type InMessage struct { Req interface{} OutChan chan<- Response}type Service struct { inChan chan<- InMessage}func (service *Service) RpcCall(request interface{}) (interface{}, error) { outChan := make(chan Response) service。inChan <- InMessage{request, outChan} rsp := <-outChan if rsp。Error != nil { return nil, rsp。Error } return rsp。Result, nil}func MakeService(handler Handler, initState interface{}) *Service { inChan := make(chan InMessage) go func(state interface{}) { for { in := <-inChan out, err := handler(Input{in。Req, state}) if err != nil { in。OutChan <- Response{nil, err} } else { state = out。State in。OutChan <- Response{out。Rsp, nil} } } }(initState) return &Service{inChan}}

這裡的 Service 是一個服務模板,我們透過 MakeService 來建立它。在建立服務模板的時候,我們要求呼叫者傳入一個請求處理函式 handler func(input Input) (Output, error),從型別可以知道,它接受一個請求,然後進行處理,並返回響應。請求和響應中都帶有狀態,handler 可以藉此儲存和修改狀態,由於模板並不知道狀態是什麼,因此,MakeService 還要求呼叫者傳入一個初始的狀態 initState。然後,MakeService 會啟動一個 goroutine,這個 goroutine 不斷從 inChan 讀入請求,並呼叫 handler 進行處理,最終將響應透過 outChan 發回給呼叫方。RpcCall 簡單封裝了一下從 inChan 輸入請求,從 outChan 讀取響應的過程。我們可以使用這個模板這樣實現一個簡單的電話本服務:

type Query struct { Name string}type Insert struct { Name string Phone int}type PhoneBookService = Servicefunc (s *PhoneBookService) Insert(name string, phone int) { s。RpcCall(Insert{name, phone})}func (s *PhoneBookService) Query(name string) (int, error) { phone, err := s。RpcCall(Query{“Tom”}) if err != nil { return 0, err } return phone。(int), nil}func MakePhoneBookService() *PhoneBookService { return MakeService(func(i Input) (Output, error) { st := i。State。(map[string]int) switch req := i。Req。(type) { case Query: x, ok := st[req。Name] if !ok { return Output{nil, nil}, fmt。Errorf(“%v no found”, req。Name) } return Output{x, st}, nil case Insert: st[req。Name] = req。Phone return Output{nil, st}, nil default: return Output{nil, nil}, fmt。Errorf(“unknonw input: %v”, req) } }, make(map[string]int))}func main() { service := MakePhoneBookService() phone, err := service。Query(“Tom”) if err != nil { fmt。Println(“query err:”, err) } else { fmt。Println(“query succ:”, phone) } service。Insert(“Tom”, 123456) phone, err = service。Query(“Tom”) if err != nil { fmt。Println(“query err:”, err) } else { fmt。Println(“query succ:”, phone) }}

這個電話本功能很簡單,只有 Insert 和 Query 兩種方法。Service 模板的作用是將整個 Go 語言的併發模型封裝在函式呼叫內,從 PhoneBookService 的實現中,我們可以發現,這裡沒有任何 goroutine 的產生程式碼,也沒有 channel 的使用,僅僅出現了簡單的函式呼叫。對於 handler 的實現,裡面也是一個簡單的迴圈。這樣一來,具體服務的實現者就不需要接觸 Go 語言的併發模型,也可以實現簡單的服務了。

執行程式,我們可以看到如下的輸出:

query err: Tom no foundquery succ: 123456

在這裡,我們可以注意到 Go 語言的另外兩點設計,一個是使用錯誤返回值的錯誤的處理方式,另一個是隻有介面沒有泛型。

首先說錯誤處理。

Go 語言的錯誤處理方式有很大爭議,支持者認為,Go 的錯誤返回值方式讓錯誤的出現更加明確,不會擾亂讓開發者的邏輯,更清晰地表達了意圖。而反對者則認為異常丟擲的缺失導致 Go 程式碼的錯誤處理非常冗長,且頻繁打斷主要邏輯。顯然,這兩個觀點都有各自的道理,且在不同的語言裡我們也看到了這兩種錯誤處理方式的廣泛應用,但是我認為在 Go 的併發模型的限制下,使用錯誤返回值的方式是一個合理正確的選擇。如前所述,Go 語言每當建立一個 goroutine 之後,這個 goroutine 就和建立者沒有什麼關係了,它甚至不能像執行緒一樣直接被等待執行結束。goroutine 和 goroutine 唯一進行關聯的方式就是透過 channel 的訊息傳遞。假設 Go 語言支援了丟擲異常,那麼,一個 goroutine 中丟擲了一個沒有被捕獲的異常,這個異常將會導致什麼呢?由於沒有任何實體有責任捕獲並處理這個異常,因此這裡唯一正確的處理方式就是 panic 了,這個處理顯然是很不可靠的,一個 goroutine 中的異常導致整個系統的 panic 無法讓人接受。當然,有人會說,那在每個 goroutine 的最頂層都 try-catch 一下就可以了。那問題又來了,try-catch 之後呢?如果出現了一個已經被拋到頂層的異常,說明這個異常應該無法被這個 goroutine 自身處理了,應該交由其監視者來處理,例如上面的例子中,呼叫者就應該負責處理 Service goroutine 中產生的錯誤。那麼,在這個時候,唯一正確的做法就是將丟擲的異常以錯誤值的形式透過 channel 傳送給監視者,以期待上層能夠正確處理這個異常。那麼這樣一來,開發者就必須頻繁混合使用兩種錯誤處理方式,這樣的開發方式是極其混亂且易錯的。所以,使用錯誤返回值的方式應該是更加合理統一的方式了。

第二點,關於泛型的問題。

Go 語言只有介面沒有泛型,這導致了很多麻煩,例如我們無法實現帶有靜態檢查的自定義容器,泛型演算法也難以實現。許多 Go 語言的開發者對於泛型的看法是:你不需要這個。我承認在實際工程中泛型的使用場合遠少於介面,但是,即便從服務開發這個 Go 語言的主戰場來看,泛型的必要性也依然很高。從上面的例子中我們可以看到,程式碼中大量充斥著 interface{} 和對 interface{} 的型別轉換。其原因就是我們在實現這樣一個服務模板時,我們並不知道模板的使用者需要處理怎樣的 request,返回怎樣的 response,也不知道這裡的 state 是什麼。由於泛型的缺失,我們的程式碼相當於失去了靜態的型別檢查,將靜態的型別錯誤變為了執行時錯誤,這樣一來,Go 語言的靜態能力就缺失了很多,甚至我們可以說,Go 語言泛型的缺失使得 Go 語言在型別安全性上不如帶有泛型能力的靜態型別語言,卻比這些語言在使用上還要更囉嗦(各種型別轉換和錯誤判斷)。

總結

Go 語言是一個原生支援併發的語言,其併發模型基於 CSP 模型。透過使用 Go 語言的併發能力,我們可以設計出非常直觀易懂的程式碼。經過上面幾個例子的分析中我們可以看出,從併發模型和併發程式設計的角度來看,Go 在語言設計上的優勢在於:

擁有輕量的應用層程序 goroutine,允許開發者基於大量 goroutine 來設計併發程式

First class channel 的支援,使得 goroutine 之間能夠很輕易地相互合作

select 關鍵字的隨機能力使得開發者可以基於 channel 來對程式實現排程

使用返回值的形式處理錯誤,很好地契合了 goroutine + channel 的併發模型

而 Go 在語言設計上的劣勢在於:

泛型的缺失導致許多程式設計變得脆弱,增加程式碼量且失去了安全性