Some people, when confronted with a problem, think, “I know, I’ll use threads,” and then two they hav erpoblesms。
競爭條件
多執行緒程式在多核CPU機器上訪問共享資源時,難免會遇到問題。我們可以來看一個例子
var Cnt intfunc Add(iter int) { for i := 0; i < iter; i++ { Cnt++ }}func main() { wg := &sync。WaitGroup{} for i := 0; i < 2; i++ { wg。Add(1) go func() { Add(100000) wg。Done() }() } wg。Wait() fmt。Println(Cnt)}
很明顯,程式的預期結果是200000,但實際的輸出卻是不可確定的,可能為100910、101364或者其他數值,這就是典型的多執行緒訪問衝突問題。
利用
go tool trace
分析工具(需要在程式碼中加入
runtime/trace
包獲取程式執行資訊,此處省略),檢視該程式執行期間goroutine的執行情況如上圖所示。其中G20和G19就是執行
Add()
函式的兩個goroutine,它們在執行期間並行地訪問了共享變數
Cnt
。
類似這種情況,即兩個或者多個執行緒讀寫某些共享資料,而最後的結果取決於程式執行的精確時序,這就是
競爭條件
(race condition)
臨界區與互斥
怎樣避免競爭條件?實際上凡涉及共享記憶體、共享檔案以及共享任何資源的情況都會引發上文例子中類似的錯誤,要避免這種錯誤,關鍵是要找出某種途徑來阻止多執行緒同時讀寫共享的資料。換言之,我們需要的是
互斥
(mutual exclusion),即以某種手段確保當一個執行緒在使用一個共享變數或檔案時,其他執行緒不能做同樣的操作。
我們把對共享記憶體進行訪問的程式片段稱作
臨界區
(critical section),例如上例中的
Cnt++
片段。從抽象的角度看,我們希望的多執行緒行為如下圖所示。執行緒A在t1時刻進入臨界區,執行一段時間後,在t2時刻執行緒B試圖進入臨界區,但是這是不能被允許的,因為同一時刻只能執行一個執行緒在臨界區內,而此時已經有一個執行緒在臨界區內。我們透過某種互斥手段,將B暫時掛起直到執行緒A離開臨界區,即t3時刻B進入臨界區。最後,B執行完臨界區程式碼後,離開臨界區。
如果我們能夠合理地安排,使得兩個執行緒不可能同時處於臨界區中,就能夠避免競爭條件。因此,我們將程式碼稍作調整如下
var ( Cnt int mu sync。Mutex)func Add(iter int) { mu。Lock() for i := 0; i < iter; i++ { Cnt++ } mu。Unlock()}
此時,程式執行得到了預期結果200000。
程式執行期間的執行情況如上圖所示。其中G8和G7是執行
Add()
函式的兩個goroutine,透過加入
sync。Mutex
互斥鎖,G8和G7就不再存在競爭條件。
需要明確的是,只有在多核機器上才會發生競爭條件,只有多執行緒對共享資源做了寫操作時才有可能發生競態問題,只要資源沒有發生變化,多個執行緒讀取相同的資源就是安全的。
Go互斥鎖設計
互斥鎖是實現互斥功能的常見實現,Go中的互斥鎖即
sync。Mutex
。本文將基於Go 1。15。2版本,對互斥鎖的實現深入研究。
type Mutex struct { state int32 sema uint32}const ( mutexLocked = 1 << iota mutexWoken mutexStarving mutexWaiterShift = iota // mutexWaiterShift值為3,透過右移3位的位運算,可計算waiter個數 starvationThresholdNs = 1e6 // 1ms,進入飢餓狀態的等待時間)
state
欄位表示當前互斥鎖的狀態資訊,它是
int32
型別,其低三位的二進位制位均有相應的狀態含義。
mutexLocked
是
state
中的低1位,用二進位制表示為
0001
(為了方便,這裡只描述後4位),它代表該互斥鎖是否被加鎖。
mutexWoken
是低2位,用二進位制表示為
0010
,它代表互斥鎖上是否有被喚醒的goroutine。
mutexStarving
是低3位,用二進位制表示為
0100
,它代表當前互斥鎖是否處於飢餓模式。
state
剩下的29位用於統計在互斥鎖上的等待佇列中goroutine數目(
waiter
)。
預設的
state
欄位(無鎖狀態)如下圖所示。
sema
欄位是訊號量,用於控制goroutine的阻塞與喚醒,下文中會有介紹到。
兩種模式
Go實現的互斥鎖有兩種模式,分別是
正常模式
和
飢餓模式
。
在正常模式下,waiter按照先進先出(
FIFO
)的方式獲取鎖,但是一個剛被喚醒的waiter與新到達的goroutine競爭鎖時,大機率是幹不過的。新來的goroutine有一個優勢:它已經在CPU上執行,並且有可能不止一個新來的,因此waiter極有可能失敗。在這種情況下,waiter還需要在等待佇列中排隊。為了避免waiter長時間搶不到鎖,當waiter超過 1ms 沒有獲取到鎖,它就會將當前互斥鎖切換到飢餓模式,防止等待佇列中的waiter被餓死。
在飢餓模式下,鎖的所有權直接從解鎖(unlocking)的goroutine轉移到等待佇列中的隊頭waiter。新來的goroutine不會嘗試去獲取鎖,也不會自旋。它們將在等待佇列的隊尾排隊。
如果某waiter獲取到了鎖,並且滿足以下兩個條件之一,它就會將鎖從飢餓模式切換回正常模式。
它是等待佇列的最後一個goroutine
它等待獲取鎖的時間小於1ms
飢餓模式是在 Go 1。9版本引入的,它防止了佇列尾部waiter一直無法獲取鎖的問題。與飢餓模式相比,正常模式下的互斥鎖效能更好。因為相較於將鎖的所有權明確賦予給喚醒的waiter,直接競爭鎖能降低整體goroutine獲取鎖的延時開銷。
加鎖
既然被稱作鎖,那就存在加鎖和解鎖的操作。
sync。Mutex
的加鎖
Lock()
程式碼如下
func (m *Mutex) Lock() { if atomic。CompareAndSwapInt32(&m。state, 0, mutexLocked) { if race。Enabled { race。Acquire(unsafe。Pointer(m)) } return } m。lockSlow()}
程式碼非常簡潔,首先透過CAS判斷當前鎖的狀態(CAS的原理和實現可以參照小菜刀寫的《同步原語的基石》一文)。如果鎖是完全空閒的,即
m。state
為0,則對其加鎖,將
m。state
的值賦為1,此時加鎖後的
state
如下
如果,當前鎖已經被其他goroutine加鎖,則進入
m。lockSlow()
邏輯。
lockSlow
函式比較長,這裡我們分段闡述。
初始化
func (m *Mutex) lockSlow() { var waitStartTime int64 // 用於計算waiter的等待時間 starving := false // 飢餓模式標誌 awoke := false // 喚醒標誌 iter := 0 // 統計當前goroutine的自旋次數 old := m。state // 儲存當前鎖的狀態 。。。}
第一段程式是做一些初始化狀態、標誌的動作。
自旋
lockSlow
函式餘下的程式碼,就是一個大的
for
迴圈,首先看自旋部分。
for { // 判斷是否能進入自旋 if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) { // !awoke 判斷當前goroutine是不是在喚醒狀態 // old&mutexWoken == 0 表示沒有其他正在喚醒的goroutine // old>>mutexWaiterShift != 0 表示等待佇列中有正在等待的goroutine if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && // 嘗試將當前鎖的低2位的Woken狀態位設定為1,表示已被喚醒 // 這是為了通知在解鎖Unlock()中不要再喚醒其他的waiter了 atomic。CompareAndSwapInt32(&m。state, old, old|mutexWoken) { awoke = true } // 自旋 runtime_doSpin() iter++ old = m。state continue } 。。。}
關於自旋,這裡需要簡單闡述一下。自旋是自旋鎖的行為,它透過忙等待,讓執行緒在某段時間內一直保持執行,從而避免執行緒上下文的排程開銷。
自旋鎖對於執行緒只會阻塞很短時間的場景是非常合適的
。很顯然,單核CPU是不適合使用自旋鎖的,因為,在同一時間只有一個執行緒是處於執行狀態,假設執行執行緒A發現無法獲取鎖,只能等待解鎖,但因為A自身不掛起,所以那個持有鎖的執行緒B沒有辦法進入執行狀態,只能等到作業系統分給A的時間片用完,才能有機會被排程。這種情況下使用自旋鎖的代價很高。
在本場景中,之所以想讓當前goroutine進入自旋行為的依據是,我們樂觀地認為:
當前正在持有鎖的goroutine能在較短的時間內歸還鎖
。
runtime_canSpin()
函式的實現如下
//go:linkname sync_runtime_canSpin sync。runtime_canSpinfunc sync_runtime_canSpin(i int) bool { // active_spin = 4 if i >= active_spin || ncpu <= 1 || gomaxprocs <= int32(sched。npidle+sched。nmspinning)+1 { return false } if p := getg()。m。p。ptr(); !runqempty(p) { return false } return true}
由於自旋本身是空轉CPU的,所以如果使用不當,反倒會降低程式執行效能。結合函式中的判斷邏輯,這裡總結出來goroutine能進入自旋的條件如下
當前互斥鎖處於正常模式
當前執行的機器是多核CPU,且GOMAXPROCS>1
至少存在一個其他正在執行的處理器P,並且它的本地執行佇列(local runq)為空
當前goroutine進行自旋的次數小於4
前面說到,自旋行為就是讓當前goroutine並不掛起,佔用cpu資源。我們看一下
runtime_doSpin()
的實現。
//go:linkname sync_runtime_doSpin sync。runtime_doSpinfunc sync_runtime_doSpin() { procyield(active_spin_cnt) // active_spin_cnt = 30}
runtime_doSpin
呼叫了
procyield
,其實現如下(以amd64為例)
TEXT runtime·procyield(SB),NOSPLIT,$0-0 MOVL cycles+0(FP), AXagain: PAUSE SUBL $1, AX JNZ again RET
很明顯,所謂的忙等待就是執行 30 次
PAUSE
指令,透過該指令佔用 CPU 並消耗 CPU 時間。
計算期望狀態
前面說過,當前goroutine進入自旋是需要滿足相應條件的。如果不滿足自旋條件,則進入以下邏輯。
// old是鎖當前的狀態,new是期望的狀態,以期於在後面的CAS操作中更改鎖的狀態 new := old if old&mutexStarving == 0 { // 如果當前鎖不是飢餓模式,則將new的低1位的Locked狀態位設定為1,表示加鎖 new |= mutexLocked } if old&(mutexLocked|mutexStarving) != 0 { // 如果當前鎖已被加鎖或者處於飢餓模式,則將waiter數加1,表示當前goroutine將被作為waiter置於等待佇列隊尾 new += 1 << mutexWaiterShift } if starving && old&mutexLocked != 0 { // 如果當前鎖處於飢餓模式,並且已被加鎖,則將低3位的Starving狀態位設定為1,表示飢餓 new |= mutexStarving } // 當awoke為true,則表明當前goroutine在自旋邏輯中,成功修改鎖的Woken狀態位為1 if awoke { if new&mutexWoken == 0 { throw(“sync: inconsistent mutex state”) } // 將喚醒標誌位Woken置回為0 // 因為在後續的邏輯中,當前goroutine要麼是拿到鎖了,要麼是被掛起。 // 如果是掛起狀態,那就需要等待其他釋放鎖的goroutine來喚醒。 // 假如其他goroutine在unlock的時候發現Woken的位置不是0,則就不會去喚醒,那該goroutine就無法再醒來加鎖。 new &^= mutexWoken }
這裡需要重點理解一下位操作
A |= B
,它的含義就是在B的二進位制位為1的位,將A對應的二進位制位設為1,如下圖所示。因此,
new |= mutexLocked
的作用就是將
new
的最低一位設定為1。
更新期望狀態
在上一步,我們得到了鎖的期望狀態,接下來透過CAS將鎖的狀態進行更新。
// 嘗試將鎖的狀態更新為期望狀態 if atomic。CompareAndSwapInt32(&m。state, old, new) { // 如果鎖的原狀態既不是被獲取狀態,也不是處於飢餓模式 // 那就直接返回,表示當前goroutine已獲取到鎖 if old&(mutexLocked|mutexStarving) == 0 { break // locked the mutex with CAS } // 如果走到這裡,那就證明當前goroutine沒有獲取到鎖 // 這裡判斷waitStartTime != 0就證明當前goroutine之前已經等待過了,則需要將其放置在等待佇列隊頭 queueLifo := waitStartTime != 0 if waitStartTime == 0 { // 如果之前沒有等待過,就以現在的時間來初始化設定 waitStartTime = runtime_nanotime() } // 阻塞等待 runtime_SemacquireMutex(&m。sema, queueLifo, 1) // 被訊號量喚醒之後檢查當前goroutine是否應該表示為飢餓 // (這裡表示為飢餓之後,會在下一輪迴圈中嘗試將鎖的狀態更改為飢餓模式) // 1。 如果當前goroutine已經飢餓(在上一次迴圈中更改了starving為true) // 2。 如果當前goroutine已經等待了1ms以上 starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs // 再次獲取鎖狀態 old = m。state // 走到這裡,如果此時鎖仍然是飢餓模式 // 因為在飢餓模式下,鎖是直接交給喚醒的goroutine // 所以,即把鎖交給當前goroutine if old&mutexStarving != 0 { // 如果當前鎖既不是被獲取也不是被喚醒狀態,或者等待佇列為空 // 這代表鎖狀態產生了不一致的問題 if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 { throw(“sync: inconsistent mutex state”) } // 因為當前goroutine已經獲取了鎖,delta用於將等待佇列-1 delta := int32(mutexLocked - 1<
這裡需要理解一下
runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
函式,
它是用於同步庫的sleep原語
,它的實現是位於
src/runtime/sema。go
中的
semacquire1
函式,與它類似的還有
runtime_Semacquire(s *uint32)
函式。兩個睡眠原語需要等到
*s>0
(本場景中
m。sema>0
),然後原子遞減
*s
。
SemacquireMutex
用於分析競爭的互斥物件,如果
lifo
(本場景中
queueLifo
)為true,則將等待者排在等待佇列的隊頭。
skipframes
是從
SemacquireMutex
的呼叫方開始計數,表示在跟蹤期間要忽略的幀數。
所以,執行到
SemacquireMutex
就證明當前goroutine在前面的過程中獲取鎖失敗了,就需要sleep原語來阻塞當前goroutine,並透過訊號量來排隊獲取鎖:如果是新來的goroutine,就需要放在隊尾;如果是被喚醒的等待鎖的goroutine,就放在隊頭。
解鎖
前面說過,有加鎖就必然有解鎖。我們來看解鎖的過程
func (m *Mutex) Unlock() { if race。Enabled { _ = m。state race。Release(unsafe。Pointer(m)) } // new是解鎖的期望狀態 new := atomic。AddInt32(&m。state, -mutexLocked) if new != 0 { m。unlockSlow(new) }}
透過原子操作
AddInt32
想將鎖的低1位
Locked
狀態位置為0。然後判斷新的
m。state
值,如果值為0,則代表當前鎖已經完全空閒了,結束解鎖,否則進入
unlockSlow()
邏輯。
這裡需要注意的是,鎖空閒有兩種情況,第一種是完全空閒,它的狀態就是鎖的初始狀態。
第二種空閒,是指的當前鎖沒被佔有,但是會有等待拿鎖的goroutine,只是還未被喚醒,例如以下狀態的鎖也是空閒的,它有兩個等待拿鎖的goroutine(未喚醒狀態)。
以下是
unlockSlow
函式實現。
func (m *Mutex) unlockSlow(new int32) { // 1。 如果Unlock了一個沒有上鎖的鎖,則會發生panic。 if (new+mutexLocked)&mutexLocked == 0 { throw(“sync: unlock of unlocked mutex”) } // 2。 正常模式 if new&mutexStarving == 0 { old := new for { // 如果鎖沒有waiter,或者鎖有其他以下已發生的情況之一,則後面的工作就不用做了,直接返回 // 1。 鎖處於鎖定狀態,表示鎖已經被其他goroutine獲取了 // 2。 鎖處於被喚醒狀態,這表明有等待goroutine被喚醒,不用再嘗試喚醒其他goroutine // 3。 鎖處於飢餓模式,那麼鎖之後會被直接交給等待佇列隊頭goroutine if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return } // 如果能走到這,那就是上面的if判斷沒透過 // 說明當前鎖是空閒狀態,但是等待佇列中有waiter,且沒有goroutine被喚醒 // 所以,這裡我們想要把鎖的狀態設定為被喚醒,等待佇列waiter數-1 new = (old - 1< 在這裡,需要理解一下 runtime_Semrelease(s *uint32, handoff bool, skipframes int) 函式。 它是用於同步庫的wakeup原語 , Semrelease 原子增加 *s 值(本場景中 m。sema ),並通知阻塞在 Semacquire 中正在等待的goroutine。如果 handoff 為真,則跳過計數,直接喚醒隊頭waiter。 skipframes 是從 Semrelease 的呼叫方開始計數,表示在跟蹤期間要忽略的幀數。 總結 從程式碼量而言,go中互斥鎖的程式碼非常輕量簡潔,透過巧妙的位運算,僅僅採用state一個欄位就實現了四個欄位的效果,非常之精彩。 但是,程式碼量少並不代表邏輯簡單,相反,它很複雜。互斥鎖的設計中包含了大量的位運算,幷包括了兩種不同鎖模式、訊號量、自旋以及排程等內容,讀者要真正理解加解鎖的過程並不容易,這裡再做一個簡單回顧總結。 在正常模式下,waiter按照先進先出的方式獲取鎖;在飢餓模式下,鎖的所有權直接從解鎖的goroutine轉移到等待佇列中的隊頭waiter。 模式切換 如果當前 goroutine 等待鎖的時間超過了 1ms,互斥鎖就會切換到飢餓模式。 如果當前 goroutine 是互斥鎖最後一個waiter,或者等待的時間小於 1ms,互斥鎖切換回正常模式。 加鎖 如果鎖是完全空閒狀態,則透過CAS直接加鎖。 如果鎖處於正常模式,則會嘗試自旋,透過持有CPU等待鎖的釋放。 如果當前goroutine不再滿足自旋條件,則會計算鎖的期望狀態,並嘗試更新鎖狀態。 在更新鎖狀態成功後,會判斷當前goroutine是否能獲取到鎖,能獲取鎖則直接退出。 當前goroutine不能獲取到鎖時,則會由sleep原語 SemacquireMutex 陷入睡眠,等待解鎖的goroutine發出訊號進行喚醒。 喚醒之後的goroutine發現鎖處於飢餓模式,則能直接拿到鎖,否則重置自旋迭代次數並標記喚醒位,重新進入步驟2中。 解鎖 如果透過原子操作 AddInt32 後,鎖變為完全空閒狀態,則直接解鎖。 如果解鎖一個沒有上鎖的鎖,則直接丟擲異常。 如果鎖處於正常模式,且沒有goroutine等待鎖釋放,或者鎖被其他goroutine設定為了鎖定狀態、喚醒狀態、飢餓模式中的任一種(非空閒狀態),則會直接退出;否則,會透過wakeup原語 Semrelease 喚醒waiter。 如果鎖處於飢餓模式,會直接將鎖的所有權交給等待佇列隊頭waiter,喚醒的waiter會負責設定 Locked 標誌位。 另外,從Go的互斥鎖帶有自旋的設計而言,如果我們透過 sync。Mutex 只鎖定執行耗時很低的關鍵程式碼,例如鎖定某個變數的賦值,效能是非常不錯的(因為等待鎖的goroutine不用被掛起,持有鎖的goroutine會很快釋放鎖)。 所以,我們在使用互斥鎖時,應該只鎖定真正的臨界區 。 mu。Lock()defer mu。Unlock() 寫如上的程式碼,是很爽。但是,你有想過這會帶來沒必要的效能損耗嗎?