明天的你會感謝今天努力的你
舉手之勞,加個關注
Future框架概述
JDK中的Future框架實際就是Future 模式的實現,通常情況下我們會配合執行緒池使用,但也可以單獨使用;下面我們就單獨使用簡單舉例:
應用例項
// demo 片段FutureTask
[15:38:03,231 INFO ] [main] - 啟動非同步任務。。。[15:38:03,231 INFO ] [main] - 繼續其他任務。。。[15:38:03,231 INFO ] [Thread-0] - 非同步任務執行。。。[15:38:05,232 INFO ] [Thread-0] - 過了很久很久。。。[15:38:05,236 INFO ] [main] - 獲取非同步任務結果:非同步任務完成
首先我們將要執行的任務包裝成
Callable
, 這裡如果不需要返回值也可以使用
Runnable
; 然後構建 FutureTask 由一個執行緒啟動,最後使用
Future。get()
獲取非同步任務結果;
Future執行邏輯
對於Future模式的流程圖如下
對比上面的例項程式碼, 大家可能會發現有些不一樣,因為在FutureTask 同事繼承了 Runnable 和 Future 介面, 所以在提交任務後沒有返回Future , 而是直接使用自身呼叫get; 下面我們就對原始碼進行分析;
public interface RunnableFuture
另外在程式碼中還可以看見有很多地方都是用了
CAS
來更新變數, 而JDK1。6 甚至使用了
AQS
來實現,其原因就是同一個
FutureTask
可以多個執行緒同時提交, 也可以多個執行緒同時獲取; 所以程式碼中有很多的狀態變數;
// FutureTask。state 取值private static final int NEW = 0; // 初始化到結果返回前private static final int COMPLETING = 1; // 結果賦值private static final int NORMAL = 2; // 執行完畢private static final int EXCEPTIONAL = 3; // 執行異常private static final int CANCELLED = 4; // 任務取消private static final int INTERRUPTING = 5; // 設定中斷狀態private static final int INTERRUPTED = 6; // 任務中斷
同時原始碼的註釋中也詳細給出了可能出現的狀態轉換:
NEW -> COMPLETING -> NORMAL //任務正常執行
NEW-> COMPLETING -> EXCEPTION // 任務執行異常
NEW -> CANCELLED // 任務取消
NEW -> INITERRUPTING -> INTERRUPTED // 任務中斷
注意這裡的
COMPLETING
狀態是一個很微妙的狀態, 正因為有他的存在才能實現無鎖賦值; 大家先留意這個狀態,然後程式碼中應該能體會到; 另外這裡有一個變數需要注意, WaitNode ; 使用
Treiber stack
演算法實現的無鎖棧; 其中原理說明可以 參考下面的第三節:
public void run(){ if(state != NEW //確保任務執行完成後,不在重複執行 !UNSAFE。compareAndSwapObject(this, runnerOffset, null, Thread。currentThread())){// 確保只有一個執行緒執行 return ; }}
任務執行
public void run() { if (state != NEW || // 確保任務執行完成後,不再重複執行 !UNSAFE。compareAndSwapObject(this, runnerOffset, null, Thread。currentThread())) // 確保只有一個執行緒執行 return; try { Callable
// 設定非同步任務結果protected void set(V v) { if (UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設定一次 outcome = v; UNSAFE。putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); // 喚醒等待執行緒 }}
protected void setException(Throwable t) { if (UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設定一次 outcome = t; UNSAFE。putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}
任務取消
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && // 只有在任務執行階段才能取消 UNSAFE。compareAndSwapInt(this, stateOffset, NEW, // 設定取消狀態 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t。interrupt(); } finally { // final state UNSAFE。putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true;}
注意
cancel(false)
也就是僅取消, 並沒有打斷;非同步任務會繼續執行,只是這裡首先設定了
FutureTask。state = CANCELLED
, 所以最後在設定結果的時候會失敗,
UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
;
獲取結果
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 阻塞等待 return report(s);}private V report(int s) throws ExecutionException { // 根據最後的狀態返回結果 Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System。nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread。interrupted()) { removeWaiter(q); // 移除等待節點 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任務已完成 if (q != null) q。thread = null; return s; } else if (s == COMPLETING) // 正在賦值,直接先出讓執行緒 Thread。yield(); else if (q == null) // 任務還未完成需要等待 q = new WaitNode(); else if (!queued) queued = UNSAFE。compareAndSwapObject(this, waitersOffset, q。next = waiters, q); // 使用 Treiber stack 演算法 else if (timed) { nanos = deadline - System。nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport。parkNanos(this, nanos); } else LockSupport。park(this); }}
Treiber stack
建立非阻塞演算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變數上, 同時還要維護資料的一致性
jcip-annotations。jar @ThreadSafe
@ThreadSafe public class ConcurrentStack
總結
總體來講原始碼比較簡單,因為本身只是一個Future 模式的實現
但是其中的狀態量的設定,還有很多無鎖的處理方式,才是 FutureTask 帶給我們的精華