阻塞佇列—DelayQueue原始碼分析

阻塞佇列—DelayQueue原始碼分析

作者公眾號:一角錢技術(org_yijiaoqian)

前言

阻塞佇列—DelayQueue原始碼分析

DelayQueue

由優先順序支援的、基於時間的排程佇列,內部使用非執行緒安全的優先佇列(PriorityQueue)實現,而無界佇列基於陣列的擴容實現。在建立元素時,可以指定多久才能從佇列中獲取當前元素。只有延時期滿後才能從佇列中獲取元素。

佇列建立

BlockingQueue blockingQueue = new DelayQueue();

要求

:入隊的物件必須要實現 Delayed介面,而Delayed整合自 Comparable 介面。

Delayed

介面使物件成為延遲物件,它使存放在DelayQueue類中的物件具有了啟用日期。該介面強制實現下列兩個方法。

CompareTo(Delayed o):Delayed介面繼承了Comparable介面,因此有了這個方法。讓元素按啟用日期排隊

getDelay(TimeUnit unit):這個方法返回到啟用日期的剩餘時間,時間單位由單位引數指定。

應用場景

快取系統的設計

:可以用 DelayQueue 儲存快取元素的有效期,使用一個執行緒迴圈查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示快取有效期到了。

定時任務排程

。使用DelayQueue 儲存當天將會執行的任務和執行時間,一旦從DelayQueue 中獲取到的任務就開始執行,比如 TimerQueue 就是使用DelayQueue實現的。

我們來看具體的電影票/火車票支付兩個例子。

案例1:電影票

透過電影票這個簡單示例來掌握延遲佇列的使用。

定義電影票並存設計存放到延遲佇列的元素

package com。niuh。queue。delayed。v1;import java。util。concurrent。Delayed;import java。util。concurrent。TimeUnit;/** * 電影票 */public class MovieTiket implements Delayed {    //延遲時間    private final long delay;    //到期時間    private final long expire;    //資料    private final String msg;    //建立時間    private final long now;    public long getDelay() {        return delay;    }    public long getExpire() {        return expire;    }    public String getMsg() {        return msg;    }    public long getNow() {        return now;    }    /**     * @param msg 訊息     * @param delay 延期時間     */    public MovieTiket(String msg , long delay) {        this。delay = delay;        this。msg = msg;        expire = System。currentTimeMillis() + delay;    //到期時間 = 當前時間+延遲時間        now = System。currentTimeMillis();    }    /**     * @param msg     */    public MovieTiket(String msg){        this(msg,1000);    }    public MovieTiket(){        this(null,1000);    }    /**     * 獲得延遲時間   用過期時間-當前時間,時間單位毫秒     * @param unit     * @return     */    public long getDelay(TimeUnit unit) {        return unit。convert(this。expire                - System。currentTimeMillis() , TimeUnit。MILLISECONDS);    }    /**     * 用於延遲佇列內部比較排序  當前時間的延遲時間 - 比較物件的延遲時間     * 越早過期的時間在佇列中越靠前     * @param delayed     * @return     */    public int compareTo(Delayed delayed) {        return (int) (this。getDelay(TimeUnit。MILLISECONDS)                - delayed。getDelay(TimeUnit。MILLISECONDS));    }    @Override    public String toString() {        return “MovieTiket{” +                “delay=” + delay +                “, expire=” + expire +                “, msg=‘” + msg + ’\‘’ +                “, now=” + now +                ‘}’;    }}

按電影票的過期時間進行入隊測試

package com。niuh。queue。delayed。v1;import java。util。concurrent。DelayQueue;/** * 延遲佇列 */public class DelayedQueueRun {    public static void main(String[] args) {        DelayQueue delayQueue = new DelayQueue();        MovieTiket tiket = new MovieTiket(“電影票0”, 10000);        delayQueue。put(tiket);        MovieTiket tiket1 = new MovieTiket(“電影票1”, 5000);        delayQueue。put(tiket1);        MovieTiket tiket2 = new MovieTiket(“電影票2”, 8000);        delayQueue。put(tiket2);        System。out。println(“message:——->入隊完畢”);        while (delayQueue。size() > 0) {            try {                tiket = delayQueue。take();                System。out。println(“電影票出隊:” + tiket。getMsg());            } catch (InterruptedException e) {                e。printStackTrace();            }        }    }}

案例2:訂單支付

再看一個訂單支付場景稍微複雜的場景。在12306搶到火車票之後,通常需要在30分鐘內付錢,否則訂單就會取消

阻塞佇列—DelayQueue原始碼分析

解決思路

火車票提交訂單的時候,首先儲存到資料庫,並同時將訂單資料儲存到 DelayQueue 中,開啟一個執行緒監控 DelayQueue,利用 DelayQueue 的特性,先過期的資料會被 take出來,若發現此時訂單未支付,那就是過期未支付,更改訂單狀態。

阻塞佇列—DelayQueue原始碼分析

實現程式碼

SaveOrder 訂單相關服務

package com。niuh。queue。delayed。v2。service。busi;import com。niuh。queue。delayed。v2。service。IDelayOrder;import com。niuh。queue。delayed。v2。dao。OrderExpDao;import com。niuh。queue。delayed。v2。model。OrderExp;import lombok。extern。slf4j。Slf4j;import org。springframework。beans。factory。annotation。Autowired;import org。springframework。beans。factory。annotation。Qualifier;import javax。annotation。PostConstruct;import java。util。Date;import java。util。List;import java。util。Random;/** * 

 * 訂單相關的服務 * 

 **/@Slf4jpublic class SaveOrder {    // 取消付款    public final static short UNPAY = 0;    // 付款    public final static short PAYED = 1;    // 過期    public final static short EXPIRED = -1;    @Autowired    private OrderExpDao orderExpDao;    @Autowired    @Qualifier(“dq”)    private IDelayOrder delayOrder;    /**     * 接收前端頁面引數,生成訂單     *     * @param orderNumber 訂單個數     */    public void insertOrders(int orderNumber) {        Random r = new Random();        OrderExp orderExp;        for (int i = 0; i < orderNumber; i++) {            //訂單的超時時長,單位秒            long expireTime = r。nextInt(20) + 5;            orderExp = new OrderExp();            String orderNo = “DD00_” + expireTime + “S”;            orderExp。setOrderNo(orderNo);            orderExp。setOrderNote(“火車票訂單——” + orderNo);            orderExp。setOrderStatus(UNPAY);            orderExpDao。insertDelayOrder(orderExp, expireTime);            log。info(“儲存訂單到DB:” + orderNo);            delayOrder。orderDelay(orderExp, expireTime);        }    }    /**     * 應用重啟帶來的問題:     * 1、儲存在Queue中的訂單會丟失,這些丟失的訂單會在什麼時候過期,因為佇列裡已經沒有這個訂單了,無法檢查了,這些訂單就得不到處理了。     * 2、已過期的訂單不會被處理,在應用的重啟階段,可能會有一部分訂單過期,這部分過期未支付的訂單同樣也得不到處理,會一直放在資料庫裡,     * 過期未支付訂單所對應的資源比如電影票所對應的座位,就不能被釋放出來,讓別的使用者來購買。     * 解決之道 :在系統啟動時另行處理     */    @PostConstruct    public void initDelayOrder() {        log。info(“系統啟動,掃描表中過期未支付的訂單並處理……。。。”);        int counts = orderExpDao。updateExpireOrders();        log。info(“系統啟動,處理了表中[” + counts + “]個過期未支付的訂單!”);        List orderList = orderExpDao。selectUnPayOrders();        log。info(“系統啟動,發現了表中還有[” + orderList。size() + “]個未到期未支付的訂單!推入檢查佇列準備到期檢查。。。。”);        for (OrderExp order : orderList) {            long expireTime = order。getExpireTime()。getTime() - (new Date()。getTime());            delayOrder。orderDelay(order, expireTime);        }    }}

IDelayOrder 延時處理訂單的介面

package com。niuh。queue。delayed。v2。service;import com。niuh。queue。delayed。v2。model。OrderExp;/** * 

 * 延時處理訂單的介面 * 

 * */public interface IDelayOrder {    /**     * 進行延時處理的方法     * @param order 要進行延時處理的訂單     * @param expireTime 延時時長,單位秒     */    void orderDelay(OrderExp order, long expireTime);}

DqMode 阻塞佇列的實現

package com。niuh。queue。delayed。v2。service。impl;import com。niuh。queue。delayed。v2。model。OrderExp;import com。niuh。queue。delayed。v2。service。busi。DlyOrderProcessor;import com。niuh。queue。delayed。v2。vo。ItemVo;import lombok。extern。slf4j。Slf4j;import org。springframework。beans。factory。annotation。Autowired;import org。springframework。beans。factory。annotation。Qualifier;import org。springframework。stereotype。Service;import javax。annotation。PostConstruct;import javax。annotation。PreDestroy;import java。util。concurrent。DelayQueue;/** * 

 * 阻塞佇列的實現 * 

 */@Service@Qualifier(“dq”)@Slf4jpublic class DqMode {    @Autowired    private DlyOrderProcessor processDelayOrder;    private Thread takeOrder;    private static DelayQueue> delayOrder = new DelayQueue>();    public void orderDelay(OrderExp order, long expireTime) {        ItemVo itemOrder = new ItemVo(expireTime*1000,order);        delayOrder。put(itemOrder);        log。info(“訂單[超時時長:”+expireTime+“秒]被推入檢查佇列,訂單詳情:”+order);    }    private class TakeOrder implements Runnable{        private DlyOrderProcessor processDelayOrder;        public TakeOrder(DlyOrderProcessor processDelayOrder) {            super();            this。processDelayOrder = processDelayOrder;        }        public void run() {            log。info(“處理到期訂單執行緒已經啟動!”);            while(!Thread。currentThread()。isInterrupted()) {                try {                    ItemVo itemOrder = delayOrder。take();                    if (itemOrder!=null) {                        processDelayOrder。checkDelayOrder(itemOrder。getData());                    }                } catch (Exception e) {                    log。error(“The thread :”,e);                }            }            log。info(“處理到期訂單執行緒準備關閉……”);        }    }    @PostConstruct    public void init() {        takeOrder = new Thread(new TakeOrder(processDelayOrder));        takeOrder。start();    }    @PreDestroy    public void close() {        takeOrder。interrupt();    }}

ItemVo 存放到延遲佇列的元素,對業務資料進行了包裝

package com。niuh。queue。delayed。v2。vo;import java。util。concurrent。Delayed;import java。util。concurrent。TimeUnit;/** * 

 * 存放到延遲佇列的元素,對業務資料進行了包裝 * 

 */public class ItemVo implements Delayed {    //到期時間,但傳入的數值代表過期的時長,傳入單位毫秒    private long activeTime;    private T data;//業務資料,泛型    public ItemVo(long activeTime, T data) {        super();        this。activeTime = activeTime + System。currentTimeMillis();        this。data = data;    }    public long getActiveTime() {        return activeTime;    }    public T getData() {        return data;    }    /**     * 這個方法返回到啟用日期的剩餘時間,時間單位由單位引數指定。     */    public long getDelay(TimeUnit unit) {        long d = unit。convert(this。activeTime - System。currentTimeMillis(), unit);        return d;    }    /**     *Delayed介面繼承了Comparable介面,按剩餘時間排序,實際計算考慮精度為納秒數     */    public int compareTo(Delayed o) {        long d = (getDelay(TimeUnit。NANOSECONDS) - o。getDelay(TimeUnit。NANOSECONDS));        return (d == 0) ? 0 : ((d < 0) ? -1 : 1);    }}

DlyOrderProcessor 處理延期訂單的服務

package com。niuh。queue。delayed。v2。service。busi;import com。niuh。queue。delayed。v2。dao。OrderExpDao;import com。niuh。queue。delayed。v2。model。OrderExp;import lombok。extern。slf4j。Slf4j;import org。springframework。beans。factory。annotation。Autowired;import org。springframework。stereotype。Service;/** * 

 *  處理延期訂單的服務 * 

 * */@Service@Slf4jpublic class DlyOrderProcessor {    @Autowired    private OrderExpDao orderExpDao;    /**檢查資料庫中指定id的訂單的狀態,如果為未支付,則修改為已過期*/    public void checkDelayOrder(OrderExp record) {        OrderExp dbOrder = orderExpDao。selectByPrimaryKey(record。getId());        if(dbOrder。getOrderStatus()== SaveOrder。UNPAY) {            log。info(“訂單【”+record+“】未支付已過期,需要更改為過期訂單!”);            orderExpDao。updateExpireOrder(record。getId());        }else {            log。info(“已支付訂單【”+record+“】,無需修改!”);        }    }}

另外也可以使用MQ來解決,例如 ActiveMQ支援的延遲和定時投遞 修改配置檔案(activemq。xml),增加延遲和定時投遞支援——-schedulerSupport=“true”

工作原理

DelayQueue的泛型引數需要實現Delayed介面,Delayed介面繼承了Comparable介面,DelayQueue內部使用非執行緒安全的優先佇列(PriorityQueue),並使用Leader/Followers模式,最小化不必要的等待時間。DelayQueue不允許包含null元素。

Leader/Followers模式:

有若干個執行緒(一般組成執行緒池)用來處理大量的事件

有一個執行緒作為領導者,等待事件的發生;其他的執行緒作為追隨者,僅僅是睡眠

假如有事件需要處理,領導者會從追隨者中指定一個新的領導者,自己去處理事件

喚醒的追隨者作為新的領導者等待事件的發生

處理事件的執行緒處理完畢以後,就會成為追隨者的一員,直到被喚醒成為領導者

假如需要處理的事件太多,而執行緒數量不夠(能夠動態建立執行緒處理另當別論),則有的事件可能會得不到處理。

所以執行緒會有三種身份中的一種:leader 和 follower,以及一個幹活中的狀態:processser。它的基本原則就是,永遠最多隻有一個 leader。而所有 follower 都在等待成為 leader。執行緒池啟動時會自動產生一個 Leader 負責等待網路 IO 事件,當有一個事件產生時,Leader 執行緒首先通知一個 Follower 執行緒將被其提拔為新的 Leader ,然後自己就去幹活了,去處理這個網路事件,處理完畢後加入 Follower 執行緒等待佇列,等待下次成為 Leader。這種方法可以增強 CPU快取記憶體相似性,以及消除動態記憶體分配和執行緒間的資料交換。

原始碼分析

定義

DelayQueue的類繼承關係如下:

阻塞佇列—DelayQueue原始碼分析

其包含的方法定義如下:

阻塞佇列—DelayQueue原始碼分析

成員屬性

DelayQueue 透過組合一個PriorityQueue 來實現元素的儲存以及優先順序維護,透過ReentrantLock 來保證執行緒安全,透過Condition 來判斷是否可以取資料,對於leader我們後面再來分析它的作用。

// 可重入鎖private final transient ReentrantLock lock = new ReentrantLock();// 儲存元素的優先順序佇列private final PriorityQueue q = new PriorityQueue();// 獲取資料 等待執行緒標識private Thread leader = null;// 條件控制,表示是否可以從佇列中取資料private final Condition available = lock。newCondition();

建構函式

DelayQueue 內部組合PriorityQueue,對元素的操作都是透過PriorityQueue 來實現的,DelayQueue 的構造方法很簡單,對於PriorityQueue 都是使用的預設引數,不能透過DelayQueue 來指定PriorityQueue的初始大小,也不能使用指定的Comparator,元素本身就需要實現Comparable ,因此不需要指定的Comparator。

/*** 無參建構函式*/public DelayQueue() {}/*** 透過集合初始化*/public DelayQueue(Collection<? extends E> c) {    this。addAll(c);}

入隊方法

雖然提供入隊的介面方式很多,實際都是呼叫的offer 方法,透過PriorityQueue 來進行入隊操作,入隊超時方法並沒有其超時功能。

add(E e),將指定的元素插入到此佇列中,在成功時返回 true

put(E e),將指定的元素插入此佇列中,佇列達到最大值,則拋oom異常

offer(E e),將指定的元素插入到此佇列中,在成功時返回 true

offer(E e, long timeout, TimeUnit unit),指定一個等待時間將元素放入佇列中並沒有意義

public boolean add(E e) {    return offer(e);}public void put(E e) {    offer(e);}public boolean offer(E e, long timeout, TimeUnit unit) {    return offer(e);}

offer(E e)

將指定的元素插入到此佇列中,在成功時返回 true,其他幾個方法內部都呼叫了offer 方法,我們也可以直接呼叫offer 方法來完成入隊操作。

peek並不一定是當前新增的元素,隊頭是當前新增元素,說明當前元素e的優先順序最小也就即將過期的,這時候啟用avaliable變數條件佇列裡面的一個執行緒,通知他們佇列裡面有元素了。

public boolean offer(E e) {    final ReentrantLock lock = this。lock;    // 獲取鎖    lock。lock();    try {        //透過PriorityQueue 來將元素入隊        q。offer(e);        //peek 是獲取的隊頭元素,喚醒阻塞在available 條件上的一個執行緒,表示可以從佇列中取資料了        if (q。peek() == e) {            leader = null;            // 喚醒通知            available。signal();        }        return true;    } finally {        // 解鎖        lock。unlock();    }}

出隊方法

poll(),獲取並移除此佇列的頭,如果此佇列為空,則返回 null

poll(long timeout, TimeUnit unit),獲取並移除此佇列的頭部,在指定的等待時間前等待

take(),獲取並移除此佇列的頭部,在元素變得可用之前一直等待

peek(),呼叫此方法,可以返回隊頭元素,但是元素並不出隊

poll()

獲取並移除此佇列的頭,如果此佇列為空,則返回 null

public E poll() {    final ReentrantLock lock = this。lock;    // 獲取同步鎖    lock。lock();    try {        // 獲取隊頭元素        E first = q。peek();        // 如果對頭為null 或者 延時還沒有到,則返回 null        if (first == null || first。getDelay(NANOSECONDS) > 0)            return null;        else            return q。poll(); // 否則元素出隊    } finally {        lock。unlock();    }}

poll(long timeout, TimeUnit unit)

獲取並移除此佇列的頭部,在指定的等待時間前等待。

public E poll(long timeout, TimeUnit unit) throws InterruptedException {    // 超時等待時間    long nanos = unit。toNanos(timeout);    final ReentrantLock lock = this。lock;    // 獲取可中斷鎖    lock。lockInterruptibly();    try         // 無限迴圈        for (;;) {            // 獲取隊頭元素            E first = q。peek            // 隊頭為空,也就是佇列為空            if (first == null) {                // 達到超時指定時間,返回null                if (nanos <= 0)                    return null;                else                    // 如果還沒有超時,那麼在available條件上進行等待nanos時間                    nanos = available。awaitNanos(nanos);            } else {                // 獲取元素延遲時間                long delay = first。getDelay(NANOSECONDS);                // 延時到期                if (delay <= 0)                    return q。poll(); // 返回出隊元素                // 延時未到期,超時到期,返回null                if (nanos <= 0)                    return null;                first = null; // don‘t retain ref while waiting                // 超時等待時間 < 延遲時間 或者 有其他執行緒再取資料                if (nanos < delay || leader != null)                    // 在available條件上進行等待nanos時間                    nanos = available。awaitNanos(nanos);                else {                    // 超時等待時間 > 延遲時間                     // 並且沒有其他執行緒在等待,那麼當前元素成為leader,表示 leader執行緒最早 正在等待獲取元素                    Thread thisThread = Thread。currentThread();                    leader = thisThread;                    try {                        // 等待 延遲時間 超時                        long timeLeft = available。awaitNanos(delay);                        // 還需要繼續等待 nanos                        nanos -= delay - timeLeft;                    } finally {                        // 清除 leader                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        // 喚醒阻塞在 available 的一個執行緒,表示可以取資料了        if (leader == null && q。peek() != null)            available。signal        // 釋放鎖        lock。unlock();    }}

來梳理一下這裡的邏輯:

如果佇列為空,如果超時時間未到,則進行等待,否則返回null

佇列不為空,取出對頭元素,如果延遲時間到來,則返回元素,否則如果超時時間到返回null

超時時間未到,並且超時時間 < 延遲時間 或者 有執行緒正在獲取元素,那麼進行等待

超時時間 > 延遲時間,那麼肯定可以取到元素,設定 leader為當前執行緒,等待延遲時間到期。

這裡需要注意的時Condition 條件在阻塞時會釋放鎖,在被喚醒時會再次獲取鎖,獲取成功才會返回。 當進行超時等待時,阻塞在Condition 上後會釋放鎖,一旦釋放了鎖,那麼其它執行緒就有可能參與競爭,某一個執行緒就可能會成為leader(參與競爭的時間早,並且能在等待時間內能獲取到隊頭元素那麼就可能成為leader) leader是用來減少不必要的競爭,如果leader不為空說明已經有執行緒在取了,設定當前執行緒等待即可。(leader 就是一個訊號,告訴其它執行緒:你們不要再去獲取元素了,它們延遲時間還沒到期,我都還沒有取到資料呢,你們要取資料,等我取了再說)

下面用流程圖來展示這一過程:

阻塞佇列—DelayQueue原始碼分析

take()

獲取並移除此佇列的頭部,在元素變得可用之前一直等待

public E take() throws InterruptedException {    final ReentrantLock lock = this。lock;    // 獲取可中斷鎖    lock。lockInterruptibly();    try {        for (;;) {            // 獲取隊頭元素            E first = q。peek();            // 隊頭元素為空,則阻塞等待            if (first == null)                available。await();            else {                // 獲取元素延遲時間                long delay = first。getDelay(NANOSECONDS);                // 延時到期                if (delay <= 0)                    return q。poll(); // 返回出隊元素                first = null; // don’t retain ref while waiting                // 如果有其它執行緒在等待獲取元素,則當前執行緒不用去競爭,直接等待                if (leader != null)                    available。await();                else {                    Thread thisThread = Thread。currentThread();                    leader = thisThread;                    try {                        // 等待延遲時間到期                        available。awaitNanos(delay);                    } finally {                        //清除 leader                        if (leader == thisThread)                            leader = null;                    }                }            }        }    } finally {        //喚醒阻塞在available 的一個執行緒,表示可以取資料了        if (leader == null && q。peek() != null)            available。signal();        // 釋放鎖        lock。unlock();    }}

該方法就是相當於在前面的超時等待中,把超時時間設定為無限大,那麼這樣只要佇列中有元素,要是元素延遲時間要求,那麼就可以取出元素,否則就直接等待元素延遲時間到期,再取出元素,最先參與等待的執行緒會成為leader。

peek()

呼叫此方法,可以返回隊頭元素,但是元素並不出隊。

public E peek() { final ReentrantLock lock = this。lock; lock。lock();    try {     //返回佇列頭部元素,元素不出隊        return q。peek();    } finally {        lock。unlock();    }}

總結

DelayQueue 內部透過組合PriorityQueue 來實現儲存和維護元素順序的;

DelayQueue 儲存元素必須實現Delayed 介面,透過實現Delayed 介面,可以獲取到元素延遲時間,以及可以比較元素大小(Delayed 繼承Comparable);

DelayQueue 透過一個可重入鎖來控制元素的入隊出隊行為;

DelayQueue 中leader 標識 用於減少執行緒的競爭,表示當前有其它執行緒正在獲取隊頭元素;

PriorityQueue 只是負責儲存資料以及維護元素的順序,對於延遲時間取資料則是在DelayQueue 中進行判斷控制的;

DelayQueue 沒有實現序列化介面。

PS

:以上程式碼提交在

Github

https://github。com/Niuh-Study/niuh-juc-final。git

文章持續更新,可以公眾號搜一搜「

一角錢技術

」第一時間閱讀, 本文 GitHub

org_hejianhui/JavaStudy

已經收錄,歡迎 Star。