在《Go精妙的互斥鎖設計》一文中,我們詳細地講解了互斥鎖的實現原理。互斥鎖為了避免競爭條件,它只允許一個執行緒進入程式碼臨界區,而由於鎖競爭的存在,程式的執行效率會被降低。同時我們知道,只有多執行緒在共享資源中有寫操作,才會引發競態問題,只要資源沒有發生變化,多執行緒讀取相同的資源就是安全的。因此,我們引申出更細粒度的鎖:讀寫鎖。
什麼是讀寫鎖
讀寫鎖是一種多讀單寫鎖,分讀和寫兩種鎖,多個執行緒可以同時加讀鎖,但是寫鎖和寫鎖、寫鎖與讀鎖之間是互斥的。
讀寫鎖對臨界區的處理如上圖所示。其中,t1時刻,由於執行緒1已加寫鎖,執行緒2被互斥等待寫鎖的釋放;t2時刻,執行緒2已加讀鎖,執行緒3可以對其繼續加讀鎖並進入臨界區;t3時刻,執行緒3加了讀鎖,執行緒4被互斥等待讀鎖的釋放。
飢餓問題
根據讀寫鎖的性質,讀者應該能猜到讀寫鎖適用於讀寫分明的場景。根據優先順序,可以分為讀優先鎖和寫優先鎖。讀優先鎖能允許最大併發,但是寫執行緒可能被餓死;同理,寫優先鎖是優先服務寫執行緒,這樣讀執行緒就可能被餓死。
相對而言,寫鎖飢餓的問題更為突出。因為讀鎖是共享的,如果當前臨界區已經加了讀鎖,後續的執行緒繼續加讀鎖是沒問題的,但是如果一直有讀鎖的執行緒加鎖,那嘗試加寫鎖的執行緒則會一直獲取不到鎖,這樣加寫鎖的執行緒一直被阻塞,導致了寫鎖飢餓。
同時,由於多讀鎖共享,可能會有讀者問:為什麼不直接去掉讀鎖,在寫操作執行緒進來時只加寫鎖就好了呢,這樣豈不是很好實現了。道理很簡單,如果當前臨界區加了寫鎖,在寫鎖解開之前又有新的寫操作執行緒進來,等到寫鎖釋放,新的寫操作執行緒又上了寫鎖。這種情況如果連續不斷,那整個程式就只能執行寫操作執行緒,讀操作執行緒就活活被餓死了。
所以,為了避免飢餓問題,通用的做法是實現公平讀寫鎖,它將請求鎖的執行緒用佇列進行排隊,保證了先入先出(FIFO)的原則進行加鎖,這樣就能有效地避免執行緒飢餓問題。
那Go語言的讀寫鎖,對於飢餓問題,它是如何處理的呢?
Go讀寫鎖設計
本文程式碼版本為Go 1。15。2。如下所示,
sync。RWMutex
結構體包含5個欄位。
type RWMutex struct { w Mutex writerSem uint32 readerSem uint32 readerCount int32 readerWait int32 }
w
互斥鎖
sync。Mutex
,用於互斥寫操作。
writerSem
寫操作goroutine阻塞等待訊號量。最後一個阻塞寫操作的讀操作goroutine釋放讀鎖時,會通知阻塞的寫鎖goroutine。
readerSem
讀操作goroutine阻塞等待訊號量。寫鎖goroutine釋放寫鎖後,會通知阻塞的讀鎖goroutine。
readerCount
讀操作goroutine數量。
readerWait
阻塞寫操作goroutine的讀操作goroutine數量。
對於互斥鎖的幾個欄位,現在可能不好理解,先不用著急,看完我們對
sync。RWMutex
對外提供的四個方法介面解析,就自然明白了。
RLock()
:加讀鎖
RUnlock()
:解讀鎖
Lock()
:加寫鎖
Unlock()
:解寫鎖
下面,我們來依次分析。
加讀鎖
這裡,需要說明一下的是,為了更好理解程式碼邏輯,本文所有的程式碼塊均去除了競態檢測的邏輯部分,即
if race。Enabled {}
方法塊。
func (rw *RWMutex) RLock() { if atomic。AddInt32(&rw。readerCount, 1) < 0 { runtime_SemacquireMutex(&rw。readerSem, false, 0) }}
atomic。AddInt32
是一個原子性操作,其底層透過硬體指令
LOCK
實現封裝(詳情可見文章《同步原語的基石》)。
rw。readerCount
代表讀操作goroutine數量,如果將其+1,還小於0,則透過用於同步庫的sleep原語
runtime_SemacquireMutex
阻塞等待寫鎖釋放。
簡單地說,如果當前有寫操作goroutine已經進來了,則新來的讀操作goroutine會被排隊阻塞等待。但是,讀者肯定會覺得判斷條件很奇怪,為什麼
rw。readerCount
會是負值?不要急,下文會有答案。
當然,如果此時沒有寫鎖,則僅僅將
rw。readerCount
數目加1,然後直接退出,代表加讀鎖成功。
解讀鎖
func (rw *RWMutex) RUnlock() { if r := atomic。AddInt32(&rw。readerCount, -1); r < 0 { rw。rUnlockSlow(r) }}
將讀操作goroutine數目-1,如果其數目
r
大於等於0,則直接退出,代表解讀鎖成功。否則,帶著當前處於負值的數目
r
進入以下
rUnlockSlow
邏輯
func (rw *RWMutex) rUnlockSlow(r int32) { if r+1 == 0 || r+1 == -rwmutexMaxReaders { race。Enable() throw(“sync: RUnlock of unlocked RWMutex”) } if atomic。AddInt32(&rw。readerWait, -1) == 0 { runtime_Semrelease(&rw。writerSem, false, 1) }}
如果
r+1==0
,則證明在解讀鎖時,其實並沒有讀goroutine加讀鎖;
rwmutexMaxReaders = 1 << 30
,這代表讀寫鎖所能接收的最大讀操作goroutine數量。至於這裡為什麼
r+1 == -rwmutexMaxReaders
也代表並沒有goroutine加讀鎖,同樣留在下文解答。在沒有加讀鎖的鎖上解讀鎖,會丟擲異常並panic。
rw。readerWait
代表寫操作被阻塞時,讀操作goroutine數量。如果該值為1,代表當前是最後一個阻塞寫操作的goroutine,則透過用於同步庫的wakeup原語
runtime_Semrelease
喚醒阻塞的寫操作goroutine。
讀者此時只看了加解讀鎖的程式碼,理解上會有困難,不要急,我們接著看加解寫鎖的邏輯。
加寫鎖
func (rw *RWMutex) Lock() { rw。w。Lock() r := atomic。AddInt32(&rw。readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders if r != 0 && atomic。AddInt32(&rw。readerWait, r) != 0 { runtime_SemacquireMutex(&rw。writerSem, false, 0) }}
在加寫鎖時,首先會透過互斥鎖加鎖,這保證只會有一個寫鎖加鎖成功。當互斥鎖加鎖成功之後,我們就能看到
寫操作是如何阻止讀操作,讀操作是如何感知到寫操作的。
我們已經知道
rw。readerCount
是代表讀操作goroutine數量,如果在不存在寫操作的情況下,每次加讀鎖,該值就會+1,每次解讀鎖該值就會-1,那麼我們可以合理地認為
rw。readerCount
的取值範圍是[0,rwmutexMaxReaders],即最大支援2^30個併發讀,最小是0個。
然而,在當前寫操作goroutine加互斥鎖成功後,會透過原子操作
atomic。AddInt32
將
readerCount
減去2^30,此時
readerCount
會變成負值,那麼如果之後再有讀操作goroutine加讀鎖時,能透過該負值知道當前已經有寫鎖了,從而阻塞等待。這裡也解釋了加讀鎖和解讀鎖兩小節中留下的問題。最後,為了持有真實的讀操作goroutine數目,再加回2^30即可。
這裡需要格外注意的是:互斥鎖加鎖成功並不意味著加寫鎖成功。我們需要知道
讀操作是如何阻止寫操作,寫操作是如何感知到讀操作的。
r != 0
即代表當前讀操作goroutine不為0,這意味著寫操作要等待排在前面的讀操作結束後才算是加上寫鎖。寫操作獲得互斥鎖後,透過
atomic。AddInt32
把
rw。readerCount
值複製到
rw。readerWait
中,用於標記排在寫操作goroutine前面的讀操作goroutine個數。透過用於同步庫的sleep原語
runtime_SemacquireMutex
阻塞等待這些讀操作結束。在解讀鎖小結中我們知道,讀操作結束時,除了會遞減
rw。readerCount
,同時需要遞減
rw。readerWait
值,當
rw。readerWait
值變為0時就會喚醒阻塞的寫操作goroutine。
解寫鎖
func (rw *RWMutex) Unlock() { r := atomic。AddInt32(&rw。readerCount, rwmutexMaxReaders) if r >= rwmutexMaxReaders { race。Enable() throw(“sync: Unlock of unlocked RWMutex”) } for i := 0; i < int(r); i++ { runtime_Semrelease(&rw。readerSem, false, 0) } rw。w。Unlock()}
在解寫鎖時,將負值的rw。readerCount變更為正值,解除對讀鎖的互斥,並喚醒r個因為寫鎖而阻塞的讀操作goroutine。最後,透過呼叫互斥鎖的Unlock方法,解除對寫鎖的互斥。
到這裡,我們可以圖解一下
Go是如何解決飢餓問題的
。
假設G1、G2、G3是正在共享讀的goroutine,
rw。readerCount
值為3。此時寫操作G4進來,把
rw。readerCount
值變為了負值,同時它發現
rw。readerCount
不為0,因此阻塞等待。但是在G4的等待期間又有新的讀操作G5、G6和寫操作G7進來。由於此時的
rw。readerCount
值為負,所以G5和G6是不能加讀鎖成功的,會陷入阻塞等待。G7由於G4加了互斥鎖也會陷入等待。當排在寫操作G4前面的最後一個讀操作G3結束,G3會喚醒G4。當G4結束時,它將G5、G6和G7均喚醒。但是G7需要等待G5和G6退出(因為它在試圖加寫鎖時,會發現
rw。readerCount
不為0,會再次陷入阻塞等待)才能加寫鎖成功。以此反覆,保證了讀寫鎖的相對公平,避免一方捱餓。
總結
讀寫鎖基於互斥鎖,提供了更細粒度的控制,它適用於讀寫分明的場景,準確而言是讀操作遠多於寫操作的情況。在多讀少寫的場景中,使用讀寫鎖替代互斥鎖能有效地提高程式執行效率。
讀讀共享、讀寫互斥和寫寫互斥。在優先順序方面,偏袒讀鎖或者寫鎖要分幾種情況。
鎖空閒,此時是完全公平的,誰先進來誰就可以上鎖。
如果沒有讀操作,均是寫操作,讀寫鎖會退化成互斥鎖,只有在互斥鎖處於飢餓模式下才會公平。
如果沒有寫操作,均是讀操作,讀操作均可以進來,讀寫鎖退化成無鎖設計(也並不是真正的無鎖,因為加解鎖均有原子操作
atomic。AddInt32
對讀操作goroutine的統計)。
被加讀鎖時,寫操作進來會被阻塞。在寫操作阻塞期間,如果有讀操作試圖進來,它們也會被阻塞。當阻塞寫操作的最後一個讀操作解讀鎖時,它只會喚醒被阻塞的寫操作,之後進來的讀操作需要該寫操作完成之後被喚醒。這些被喚醒的讀操作會比新的寫操作(可以是新來的,也可以是因互斥鎖而被阻塞的)先拿到鎖,等待這些讀操作完成,新的寫操作才能拿到寫鎖。
因為讀寫鎖是基於互斥鎖之上的設計,不可避免地多做了一些工作。因此,並不是說使用讀寫鎖的收益一定會比互斥鎖高。在選擇何種鎖時,需要綜合考量讀寫操作的比例,臨界區程式碼的耗時。效能比對的內容本文就不再討論,讀者可自行測試。