AbstratQueueSynchronizer (簡稱AQS) 剖析 head節點和tail節點 加鎖

AbstratQueueSynchronizer (簡稱AQS) 剖析 head節點和tail節點 加鎖

AQS定義了兩種訪問資源的方式

獨佔模式 ,也就是說只有

一個執行緒

可以訪問資源,如ReentranctLock

共享模式,表示可以有

多個執行緒

訪問資源,如Semaphore/CountDownLatch

AQS 關於head節點和tail節點

當第一個執行緒過來獲取鎖時,是不會初始化佇列的,只是將exclusiveOwnerThread這個變數變為當前執行緒(獨佔鎖的情況下),此時head和tail都是null;第一個執行緒沒執行完,第二個執行緒又來了,這時會初始化佇列,new一個空Node賦值給head和tail,然後在tail(也就是head)後面拼上這個要排隊的執行緒(詳見下面的enq方法),,此時形成了一個有兩個節點的雙向佇列,head是new Node(),tail是新加入需排隊的Node;阻塞佇列中,不包括Head節點。然後再獲取不到鎖就會將head的waitStatus置為-1,自身掛起,等待unparkSuccessor(head)來喚醒。

//頭結點,當前持有鎖的執行緒private transient volatile Node head;//尾節點,每次有新的節點進來,都要放在尾節點後面private transient volatile Node tail;//當前鎖的狀態//值為0的時候,表示共享資源沒有被佔用,//1的時候表示有一個執行緒佔用,如果大於1則表示重入private volatile int state;// 代表當前持有獨佔鎖的執行緒,舉個最重要的使用例子,因為鎖可以重入// reentrantLock。lock()可以巢狀呼叫多次,所以每次用這個來判斷當前執行緒是否已經擁有了鎖// if (currentThread == getExclusiveOwnerThread()) {state++}private transient Thread exclusiveOwnerThread;

以獨佔模式的ReentractLock的公平鎖為例:

用ReentranctLock只是為了方便舉例子,不同的同步器實現不同的方法而已。

其實在每個具體的同步類(獨佔模式)的操作資源的介面中,

最終呼叫的是AQS的acquire方法(比如說ReentractLock的公平鎖)

//arg = 1,表示同步器想要1個state資源public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node。EXCLUSIVE), arg)) selfInterrupt(); }// tryAcquire,顧名思義,就是先嚐試獲取一下,如果獲取成功,就返回true,// 那麼獲取資源也就結束了,//否則,就把當前執行緒設定為獨佔模式(EXCLUSIVE),壓到阻塞佇列中。// addWaiter就是把當前執行緒封裝成Node物件,//並且設定為獨佔模式(EXCLUSIVE),加入阻塞佇列

protected final boolean tryAcquire(int acquires) { final Thread current = Thread。currentThread(); //獲取資源 int c = getState(); //如果c為0,說明資源沒有執行緒佔用,則可以去搶 if (c == 0) { //既然是公平鎖,那麼肯定就講究先來後到 //hasQueuedPredecessors先看看前面有沒有Node節點在等待, //如果沒有,就透過CAS去獲取一下 //在這裡存在著執行緒競爭,所以有可能成功有可能失敗,如果成功獲得資源, //那麼compareAndSetState返回true,否則false if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { //到這裡說明前面沒有執行緒在等待並且成功搶佔到臨界資源 //所以就設定當前執行緒為佔有資源的執行緒,方便後面判斷重入 setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //到這裡說明是重入的問題 int nextc = c + acquires; if (nextc < 0) throw new Error(“Maximum lock count exceeded”); setState(nextc); return true; } //到這裡就是臨界資源被佔用,而且不是重入的情況,也就是說head節點都還沒釋放資源! return false; }

addWaiter()的功能是將Node入隊

private Node addWaiter(Node mode) { //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨佔)和SHARED(共享) Node node = new Node(Thread。currentThread(), mode); //嘗試快速方式直接放到隊尾。 Node pred = tail; if (pred != null) { node。prev = pred; //CAS,如果Tail是pred,則把Tail設定為node。 if (compareAndSetTail(pred, node)) { pred。next = node; return node; } } //上一步失敗則透過enq入隊。 enq(node); return node;}

在AQS類中的enq方法程式碼如下:

private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize // 初始化頭節點,這時head指標指向的就是new Node()物件 if (compareAndSetHead(new Node())) tail = head; // 將tail也一起初始化了,這時會繼續再走一遍for迴圈 } else { node。prev = t; // 先將node的prev指標指向區域性變數t; // 將tail所在記憶體地址的物件值換成node if (compareAndSetTail(t, node)) { // 將tail的next指標指向node節點,這時node節點就成了新的隊尾了 t。next = node; return t; // 返回舊的隊尾t } } } }

enq內部是個死迴圈

,透過CAS設定尾結點,不成功就一直重試。

很經典的CAS自旋的用法

這是一種

樂觀的併發策略