Go精妙的互斥鎖設計

Some people, when confronted with a problem, think, “I know, I’ll use threads,” and then two they hav erpoblesms。

競爭條件

多執行緒程式在多核CPU機器上訪問共享資源時,難免會遇到問題。我們可以來看一個例子

var Cnt intfunc Add(iter int) { for i := 0; i < iter; i++ { Cnt++ }}func main() { wg := &sync。WaitGroup{} for i := 0; i < 2; i++ { wg。Add(1) go func() { Add(100000) wg。Done() }() } wg。Wait() fmt。Println(Cnt)}

很明顯,程式的預期結果是200000,但實際的輸出卻是不可確定的,可能為100910、101364或者其他數值,這就是典型的多執行緒訪問衝突問題。

Go精妙的互斥鎖設計

利用

go tool trace

分析工具(需要在程式碼中加入

runtime/trace

包獲取程式執行資訊,此處省略),檢視該程式執行期間goroutine的執行情況如上圖所示。其中G20和G19就是執行

Add()

函式的兩個goroutine,它們在執行期間並行地訪問了共享變數

Cnt

類似這種情況,即兩個或者多個執行緒讀寫某些共享資料,而最後的結果取決於程式執行的精確時序,這就是

競爭條件

(race condition)

臨界區與互斥

怎樣避免競爭條件?實際上凡涉及共享記憶體、共享檔案以及共享任何資源的情況都會引發上文例子中類似的錯誤,要避免這種錯誤,關鍵是要找出某種途徑來阻止多執行緒同時讀寫共享的資料。換言之,我們需要的是

互斥

(mutual exclusion),即以某種手段確保當一個執行緒在使用一個共享變數或檔案時,其他執行緒不能做同樣的操作。

我們把對共享記憶體進行訪問的程式片段稱作

臨界區

(critical section),例如上例中的

Cnt++

片段。從抽象的角度看,我們希望的多執行緒行為如下圖所示。執行緒A在t1時刻進入臨界區,執行一段時間後,在t2時刻執行緒B試圖進入臨界區,但是這是不能被允許的,因為同一時刻只能執行一個執行緒在臨界區內,而此時已經有一個執行緒在臨界區內。我們透過某種互斥手段,將B暫時掛起直到執行緒A離開臨界區,即t3時刻B進入臨界區。最後,B執行完臨界區程式碼後,離開臨界區。

Go精妙的互斥鎖設計

如果我們能夠合理地安排,使得兩個執行緒不可能同時處於臨界區中,就能夠避免競爭條件。因此,我們將程式碼稍作調整如下

var ( Cnt int mu sync。Mutex)func Add(iter int) { mu。Lock() for i := 0; i < iter; i++ { Cnt++ } mu。Unlock()}

此時,程式執行得到了預期結果200000。

Go精妙的互斥鎖設計

程式執行期間的執行情況如上圖所示。其中G8和G7是執行

Add()

函式的兩個goroutine,透過加入

sync。Mutex

互斥鎖,G8和G7就不再存在競爭條件。

需要明確的是,只有在多核機器上才會發生競爭條件,只有多執行緒對共享資源做了寫操作時才有可能發生競態問題,只要資源沒有發生變化,多個執行緒讀取相同的資源就是安全的。

Go互斥鎖設計

互斥鎖是實現互斥功能的常見實現,Go中的互斥鎖即

sync。Mutex

。本文將基於Go 1。15。2版本,對互斥鎖的實現深入研究。

type Mutex struct { state int32 sema uint32}const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota // mutexWaiterShift值為3,透過右移3位的位運算,可計算waiter個數 starvationThresholdNs = 1e6 // 1ms,進入飢餓狀態的等待時間)

state

欄位表示當前互斥鎖的狀態資訊,它是

int32

型別,其低三位的二進位制位均有相應的狀態含義。

mutexLocked

state

中的低1位,用二進位制表示為

0001

(為了方便,這裡只描述後4位),它代表該互斥鎖是否被加鎖。

mutexWoken

是低2位,用二進位制表示為

0010

,它代表互斥鎖上是否有被喚醒的goroutine。

mutexStarving

是低3位,用二進位制表示為

0100

,它代表當前互斥鎖是否處於飢餓模式。

state

剩下的29位用於統計在互斥鎖上的等待佇列中goroutine數目(

waiter

)。

預設的

state

欄位(無鎖狀態)如下圖所示。

Go精妙的互斥鎖設計

sema

欄位是訊號量,用於控制goroutine的阻塞與喚醒,下文中會有介紹到。

兩種模式

Go實現的互斥鎖有兩種模式,分別是

正常模式

飢餓模式

在正常模式下,waiter按照先進先出(

FIFO

)的方式獲取鎖,但是一個剛被喚醒的waiter與新到達的goroutine競爭鎖時,大機率是幹不過的。新來的goroutine有一個優勢:它已經在CPU上執行,並且有可能不止一個新來的,因此waiter極有可能失敗。在這種情況下,waiter還需要在等待佇列中排隊。為了避免waiter長時間搶不到鎖,當waiter超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換到飢餓模式,防止等待佇列中的waiter被餓死。

在飢餓模式下,鎖的所有權直接從解鎖(unlocking)的goroutine轉移到等待佇列中的隊頭waiter。新來的goroutine不會嘗試去獲取鎖,也不會自旋。它們將在等待佇列的隊尾排隊。

如果某waiter獲取到了鎖,並且滿足以下兩個條件之一,它就會將鎖從飢餓模式切換回正常模式。

它是等待佇列的最後一個goroutine

它等待獲取鎖的時間小於1ms

飢餓模式是在 Go 1。9版本引入的,它防止了佇列尾部waiter一直無法獲取鎖的問題。與飢餓模式相比,正常模式下的互斥鎖效能更好。因為相較於將鎖的所有權明確賦予給喚醒的waiter,直接競爭鎖能降低整體goroutine獲取鎖的延時開銷。

加鎖

既然被稱作鎖,那就存在加鎖和解鎖的操作。

sync。Mutex

的加鎖

Lock()

程式碼如下

func (m *Mutex) Lock() { if atomic。CompareAndSwapInt32(&m。state, 0, mutexLocked) { if race。Enabled { race。Acquire(unsafe。Pointer(m)) } return } m。lockSlow()}

程式碼非常簡潔,首先透過CAS判斷當前鎖的狀態(CAS的原理和實現可以參照小菜刀寫的《同步原語的基石》一文)。如果鎖是完全空閒的,即

m。state

為0,則對其加鎖,將

m。state

的值賦為1,此時加鎖後的

state

如下

Go精妙的互斥鎖設計

如果,當前鎖已經被其他goroutine加鎖,則進入

m。lockSlow()

邏輯。

lockSlow

函式比較長,這裡我們分段闡述。

初始化

func (m *Mutex) lockSlow() { var waitStartTime int64 // 用於計算waiter的等待時間 starving := false // 飢餓模式標誌 awoke := false // 喚醒標誌 iter := 0 // 統計當前goroutine的自旋次數 old := m。state // 儲存當前鎖的狀態 。。。}

第一段程式是做一些初始化狀態、標誌的動作。

自旋

lockSlow

函式餘下的程式碼,就是一個大的

for

迴圈,首先看自旋部分。

for { // 判斷是否能進入自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 判斷當前goroutine是不是在喚醒狀態 // old&mutexWoken == 0 表示沒有其他正在喚醒的goroutine // old>>mutexWaiterShift != 0 表示等待佇列中有正在等待的goroutine if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && // 嘗試將當前鎖的低2位的Woken狀態位設定為1,表示已被喚醒 // 這是為了通知在解鎖Unlock()中不要再喚醒其他的waiter了 atomic。CompareAndSwapInt32(&m。state, old, old|mutexWoken) { awoke = true } // 自旋 runtime_doSpin() iter++ old = m。state continue } 。。。}

關於自旋,這裡需要簡單闡述一下。自旋是自旋鎖的行為,它透過忙等待,讓執行緒在某段時間內一直保持執行,從而避免執行緒上下文的排程開銷。

自旋鎖對於執行緒只會阻塞很短時間的場景是非常合適的

。很顯然,單核CPU是不適合使用自旋鎖的,因為,在同一時間只有一個執行緒是處於執行狀態,假設執行執行緒A發現無法獲取鎖,只能等待解鎖,但因為A自身不掛起,所以那個持有鎖的執行緒B沒有辦法進入執行狀態,只能等到作業系統分給A的時間片用完,才能有機會被排程。這種情況下使用自旋鎖的代價很高。

在本場景中,之所以想讓當前goroutine進入自旋行為的依據是,我們樂觀地認為:

當前正在持有鎖的goroutine能在較短的時間內歸還鎖

runtime_canSpin()

函式的實現如下

//go:linkname sync_runtime_canSpin sync。runtime_canSpinfunc sync_runtime_canSpin(i int) bool { // active_spin = 4 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched。npidle+sched。nmspinning)+1 { return false } if p := getg()。m。p。ptr(); !runqempty(p) { return false } return true}

由於自旋本身是空轉CPU的,所以如果使用不當,反倒會降低程式執行效能。結合函式中的判斷邏輯,這裡總結出來goroutine能進入自旋的條件如下

當前互斥鎖處於正常模式

當前執行的機器是多核CPU,且GOMAXPROCS>1

至少存在一個其他正在執行的處理器P,並且它的本地執行佇列(local runq)為空

當前goroutine進行自旋的次數小於4

前面說到,自旋行為就是讓當前goroutine並不掛起,佔用cpu資源。我們看一下

runtime_doSpin()

的實現。

//go:linkname sync_runtime_doSpin sync。runtime_doSpinfunc sync_runtime_doSpin() { procyield(active_spin_cnt) // active_spin_cnt = 30}

runtime_doSpin

呼叫了

procyield

,其實現如下(以amd64為例)

TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AXagain: PAUSE SUBL $1, AX JNZ again RET

很明顯,所謂的忙等待就是執行 30 次

PAUSE

指令,透過該指令佔用 CPU 並消耗 CPU 時間。

計算期望狀態

前面說過,當前goroutine進入自旋是需要滿足相應條件的。如果不滿足自旋條件,則進入以下邏輯。

// old是鎖當前的狀態,new是期望的狀態,以期於在後面的CAS操作中更改鎖的狀態 new := old if old&mutexStarving == 0 { // 如果當前鎖不是飢餓模式,則將new的低1位的Locked狀態位設定為1,表示加鎖 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 如果當前鎖已被加鎖或者處於飢餓模式,則將waiter數加1,表示當前goroutine將被作為waiter置於等待佇列隊尾 new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { // 如果當前鎖處於飢餓模式,並且已被加鎖,則將低3位的Starving狀態位設定為1,表示飢餓 new |= mutexStarving } // 當awoke為true,則表明當前goroutine在自旋邏輯中,成功修改鎖的Woken狀態位為1 if awoke { if new&mutexWoken == 0 { throw(“sync: inconsistent mutex state”) } // 將喚醒標誌位Woken置回為0 // 因為在後續的邏輯中,當前goroutine要麼是拿到鎖了,要麼是被掛起。 // 如果是掛起狀態,那就需要等待其他釋放鎖的goroutine來喚醒。 // 假如其他goroutine在unlock的時候發現Woken的位置不是0,則就不會去喚醒,那該goroutine就無法再醒來加鎖。 new &^= mutexWoken }

這裡需要重點理解一下位操作

A |= B

,它的含義就是在B的二進位制位為1的位,將A對應的二進位制位設為1,如下圖所示。因此,

new |= mutexLocked

的作用就是將

new

的最低一位設定為1。

Go精妙的互斥鎖設計

更新期望狀態

在上一步,我們得到了鎖的期望狀態,接下來透過CAS將鎖的狀態進行更新。

// 嘗試將鎖的狀態更新為期望狀態 if atomic。CompareAndSwapInt32(&m。state, old, new) { // 如果鎖的原狀態既不是被獲取狀態,也不是處於飢餓模式 // 那就直接返回,表示當前goroutine已獲取到鎖 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 如果走到這裡,那就證明當前goroutine沒有獲取到鎖 // 這裡判斷waitStartTime != 0就證明當前goroutine之前已經等待過了,則需要將其放置在等待佇列隊頭 queueLifo := waitStartTime != 0 if waitStartTime == 0 { // 如果之前沒有等待過,就以現在的時間來初始化設定 waitStartTime = runtime_nanotime() } // 阻塞等待 runtime_SemacquireMutex(&m。sema, queueLifo, 1) // 被訊號量喚醒之後檢查當前goroutine是否應該表示為飢餓 // (這裡表示為飢餓之後,會在下一輪迴圈中嘗試將鎖的狀態更改為飢餓模式) // 1。 如果當前goroutine已經飢餓(在上一次迴圈中更改了starving為true) // 2。 如果當前goroutine已經等待了1ms以上 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 再次獲取鎖狀態 old = m。state // 走到這裡,如果此時鎖仍然是飢餓模式 // 因為在飢餓模式下,鎖是直接交給喚醒的goroutine // 所以,即把鎖交給當前goroutine if old&mutexStarving != 0 { // 如果當前鎖既不是被獲取也不是被喚醒狀態,或者等待佇列為空 // 這代表鎖狀態產生了不一致的問題 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw(“sync: inconsistent mutex state”) } // 因為當前goroutine已經獲取了鎖,delta用於將等待佇列-1 delta := int32(mutexLocked - 1<>mutexWaiterShift == 1 { delta -= mutexStarving } atomic。AddInt32(&m。state, delta) // 拿到鎖退出,業務邏輯處理完之後,需要呼叫Mutex。Unlock()方法釋放鎖 break } // 如果鎖不是飢餓狀態 // 因為當前goroutine已經被訊號量喚醒了 // 那就將表示當前goroutine狀態的awoke設定為true // 並且將自旋次數的計數iter重置為0,如果能滿足自旋條件,重新自旋等待 awoke = true iter = 0 } else { // 如果CAS未成功,更新鎖狀態,重新一個大迴圈 old = m。state }

這裡需要理解一下

runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

函式,

它是用於同步庫的sleep原語

,它的實現是位於

src/runtime/sema。go

中的

semacquire1

函式,與它類似的還有

runtime_Semacquire(s *uint32)

函式。兩個睡眠原語需要等到

*s>0

(本場景中

m。sema>0

),然後原子遞減

*s

SemacquireMutex

用於分析競爭的互斥物件,如果

lifo

(本場景中

queueLifo

)為true,則將等待者排在等待佇列的隊頭。

skipframes

是從

SemacquireMutex

的呼叫方開始計數,表示在跟蹤期間要忽略的幀數。

所以,執行到

SemacquireMutex

就證明當前goroutine在前面的過程中獲取鎖失敗了,就需要sleep原語來阻塞當前goroutine,並透過訊號量來排隊獲取鎖:如果是新來的goroutine,就需要放在隊尾;如果是被喚醒的等待鎖的goroutine,就放在隊頭。

解鎖

前面說過,有加鎖就必然有解鎖。我們來看解鎖的過程

func (m *Mutex) Unlock() { if race。Enabled { _ = m。state race。Release(unsafe。Pointer(m)) } // new是解鎖的期望狀態 new := atomic。AddInt32(&m。state, -mutexLocked) if new != 0 { m。unlockSlow(new) }}

透過原子操作

AddInt32

想將鎖的低1位

Locked

狀態位置為0。然後判斷新的

m。state

值,如果值為0,則代表當前鎖已經完全空閒了,結束解鎖,否則進入

unlockSlow()

邏輯。

這裡需要注意的是,鎖空閒有兩種情況,第一種是完全空閒,它的狀態就是鎖的初始狀態。

Go精妙的互斥鎖設計

第二種空閒,是指的當前鎖沒被佔有,但是會有等待拿鎖的goroutine,只是還未被喚醒,例如以下狀態的鎖也是空閒的,它有兩個等待拿鎖的goroutine(未喚醒狀態)。

Go精妙的互斥鎖設計

以下是

unlockSlow

函式實現。

func (m *Mutex) unlockSlow(new int32) { // 1。 如果Unlock了一個沒有上鎖的鎖,則會發生panic。 if (new+mutexLocked)&mutexLocked == 0 { throw(“sync: unlock of unlocked mutex”) } // 2。 正常模式 if new&mutexStarving == 0 { old := new for { // 如果鎖沒有waiter,或者鎖有其他以下已發生的情況之一,則後面的工作就不用做了,直接返回 // 1。 鎖處於鎖定狀態,表示鎖已經被其他goroutine獲取了 // 2。 鎖處於被喚醒狀態,這表明有等待goroutine被喚醒,不用再嘗試喚醒其他goroutine // 3。 鎖處於飢餓模式,那麼鎖之後會被直接交給等待佇列隊頭goroutine if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 如果能走到這,那就是上面的if判斷沒透過 // 說明當前鎖是空閒狀態,但是等待佇列中有waiter,且沒有goroutine被喚醒 // 所以,這裡我們想要把鎖的狀態設定為被喚醒,等待佇列waiter數-1 new = (old - 1<

在這裡,需要理解一下

runtime_Semrelease(s *uint32, handoff bool, skipframes int)

函式。

它是用於同步庫的wakeup原語

Semrelease

原子增加

*s

值(本場景中

m。sema

),並通知阻塞在

Semacquire

中正在等待的goroutine。如果

handoff

為真,則跳過計數,直接喚醒隊頭waiter。

skipframes

是從

Semrelease

的呼叫方開始計數,表示在跟蹤期間要忽略的幀數。

總結

從程式碼量而言,go中互斥鎖的程式碼非常輕量簡潔,透過巧妙的位運算,僅僅採用state一個欄位就實現了四個欄位的效果,非常之精彩。

但是,程式碼量少並不代表邏輯簡單,相反,它很複雜。互斥鎖的設計中包含了大量的位運算,幷包括了兩種不同鎖模式、訊號量、自旋以及排程等內容,讀者要真正理解加解鎖的過程並不容易,這裡再做一個簡單回顧總結。

在正常模式下,waiter按照先進先出的方式獲取鎖;在飢餓模式下,鎖的所有權直接從解鎖的goroutine轉移到等待佇列中的隊頭waiter。

模式切換

如果當前 goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到飢餓模式。

如果當前 goroutine 是互斥鎖最後一個waiter,或者等待的時間小於 1ms,互斥鎖切換回正常模式。

加鎖

如果鎖是完全空閒狀態,則透過CAS直接加鎖。

如果鎖處於正常模式,則會嘗試自旋,透過持有CPU等待鎖的釋放。

如果當前goroutine不再滿足自旋條件,則會計算鎖的期望狀態,並嘗試更新鎖狀態。

在更新鎖狀態成功後,會判斷當前goroutine是否能獲取到鎖,能獲取鎖則直接退出。

當前goroutine不能獲取到鎖時,則會由sleep原語

SemacquireMutex

陷入睡眠,等待解鎖的goroutine發出訊號進行喚醒。

喚醒之後的goroutine發現鎖處於飢餓模式,則能直接拿到鎖,否則重置自旋迭代次數並標記喚醒位,重新進入步驟2中。

解鎖

如果透過原子操作

AddInt32

後,鎖變為完全空閒狀態,則直接解鎖。

如果解鎖一個沒有上鎖的鎖,則直接丟擲異常。

如果鎖處於正常模式,且沒有goroutine等待鎖釋放,或者鎖被其他goroutine設定為了鎖定狀態、喚醒狀態、飢餓模式中的任一種(非空閒狀態),則會直接退出;否則,會透過wakeup原語

Semrelease

喚醒waiter。

如果鎖處於飢餓模式,會直接將鎖的所有權交給等待佇列隊頭waiter,喚醒的waiter會負責設定

Locked

標誌位。

另外,從Go的互斥鎖帶有自旋的設計而言,如果我們透過

sync。Mutex

只鎖定執行耗時很低的關鍵程式碼,例如鎖定某個變數的賦值,效能是非常不錯的(因為等待鎖的goroutine不用被掛起,持有鎖的goroutine會很快釋放鎖)。

所以,我們在使用互斥鎖時,應該只鎖定真正的臨界區

mu。Lock()defer mu。Unlock()

寫如上的程式碼,是很爽。但是,你有想過這會帶來沒必要的效能損耗嗎?