「併發進階」Future

明天的你會感謝今天努力的你

舉手之勞,加個關注

Future框架概述

JDK中的Future框架實際就是Future 模式的實現,通常情況下我們會配合執行緒池使用,但也可以單獨使用;下面我們就單獨使用簡單舉例:

應用例項

// demo 片段FutureTask future = new FutureTask<>(()->{ log。info(“非同步任務執行。。。”); Thread。sleep(2000); log。info(“過來很久很久。。。”); return “非同步任務完成”;});log。info(“啟動非同步任務。。。”);new Thread(future)。start();log。info(“繼續其他任務。。。”);Thread。sleep(1000);log。info(“獲取非同步任務結果:{}”,future。get());

[15:38:03,231 INFO ] [main] - 啟動非同步任務。。。[15:38:03,231 INFO ] [main] - 繼續其他任務。。。[15:38:03,231 INFO ] [Thread-0] - 非同步任務執行。。。[15:38:05,232 INFO ] [Thread-0] - 過了很久很久。。。[15:38:05,236 INFO ] [main] - 獲取非同步任務結果:非同步任務完成

首先我們將要執行的任務包裝成

Callable

, 這裡如果不需要返回值也可以使用

Runnable

; 然後構建 FutureTask 由一個執行緒啟動,最後使用

Future。get()

獲取非同步任務結果;

Future執行邏輯

對於Future模式的流程圖如下

「併發進階」Future

「併發進階」Future

對比上面的例項程式碼, 大家可能會發現有些不一樣,因為在FutureTask 同事繼承了 Runnable 和 Future 介面, 所以在提交任務後沒有返回Future , 而是直接使用自身呼叫get; 下面我們就對原始碼進行分析;

public interface RunnableFuture extends Runnable, Future { /** * Sets this Future to the result of its computation * unless it has been cancelled。 */ void run();}public class FutureTask implements RunnableFuture { private volatile int state; // 任務執行狀態 private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callable callable; // 非同步任務 /** The result to return or exception to throw from get() */ private Object outcome; //返回結果 non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; // 非同步任務執行執行緒 /** Treiber stack of waiting threads */ private volatile WaitNode waiters; //等待非同步結果的執行緒棧(透過Treiber stack 演算法實現) public FutureTaks(Callable callable){ if(callable == null){ throw new NullPointerException(); } this。callable = callable ; this。state = NEW; } public FutureTask(Runnable runnable, V result){ this。callable = Exceutors。callable(runnable,result); this。state = NEW ; }}

另外在程式碼中還可以看見有很多地方都是用了

CAS

來更新變數, 而JDK1。6 甚至使用了

AQS

來實現,其原因就是同一個

FutureTask

可以多個執行緒同時提交, 也可以多個執行緒同時獲取; 所以程式碼中有很多的狀態變數;

// FutureTask。state 取值private static final int NEW = 0; // 初始化到結果返回前private static final int COMPLETING = 1; // 結果賦值private static final int NORMAL = 2; // 執行完畢private static final int EXCEPTIONAL = 3; // 執行異常private static final int CANCELLED = 4; // 任務取消private static final int INTERRUPTING = 5; // 設定中斷狀態private static final int INTERRUPTED = 6; // 任務中斷

同時原始碼的註釋中也詳細給出了可能出現的狀態轉換:

NEW -> COMPLETING -> NORMAL //任務正常執行

NEW-> COMPLETING -> EXCEPTION // 任務執行異常

NEW -> CANCELLED // 任務取消

NEW -> INITERRUPTING -> INTERRUPTED // 任務中斷

注意這裡的

COMPLETING

狀態是一個很微妙的狀態, 正因為有他的存在才能實現無鎖賦值; 大家先留意這個狀態,然後程式碼中應該能體會到; 另外這裡有一個變數需要注意, WaitNode ; 使用

Treiber stack

演算法實現的無鎖棧; 其中原理說明可以 參考下面的第三節:

public void run(){ if(state != NEW //確保任務執行完成後,不在重複執行 !UNSAFE。compareAndSwapObject(this, runnerOffset, null, Thread。currentThread())){// 確保只有一個執行緒執行 return ; }}

任務執行

public void run() { if (state != NEW || // 確保任務執行完成後,不再重複執行 !UNSAFE。compareAndSwapObject(this, runnerOffset, null, Thread。currentThread())) // 確保只有一個執行緒執行 return; try { Callable c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c。call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 設定異常結果 } if (ran) set(result); // 設定結果 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); // 確保中斷狀態已經設定 }}

// 設定非同步任務結果protected void set(V v) { if (UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設定一次 outcome = v; UNSAFE。putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); // 喚醒等待執行緒 }}

protected void setException(Throwable t) { if (UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設定一次 outcome = t; UNSAFE。putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); }}

任務取消

public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && // 只有在任務執行階段才能取消 UNSAFE。compareAndSwapInt(this, stateOffset, NEW, // 設定取消狀態 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t。interrupt(); } finally { // final state UNSAFE。putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true;}

注意

cancel(false)

也就是僅取消, 並沒有打斷;非同步任務會繼續執行,只是這裡首先設定了

FutureTask。state = CANCELLED

, 所以最後在設定結果的時候會失敗,

UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)

獲取結果

public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 阻塞等待 return report(s);}private V report(int s) throws ExecutionException { // 根據最後的狀態返回結果 Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x);}

private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System。nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread。interrupted()) { removeWaiter(q); // 移除等待節點 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任務已完成 if (q != null) q。thread = null; return s; } else if (s == COMPLETING) // 正在賦值,直接先出讓執行緒 Thread。yield(); else if (q == null) // 任務還未完成需要等待 q = new WaitNode(); else if (!queued) queued = UNSAFE。compareAndSwapObject(this, waitersOffset, q。next = waiters, q); // 使用 Treiber stack 演算法 else if (timed) { nanos = deadline - System。nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport。parkNanos(this, nanos); } else LockSupport。park(this); }}

Treiber stack

建立非阻塞演算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變數上, 同時還要維護資料的一致性

jcip-annotations。jar @ThreadSafe

@ThreadSafe public class ConcurrentStack { AtomicReference> top = new AtomicReference<>(); private static class Node { public final E item; public Node next; public Node(E item) { this。item = item; } } public void push(E item) { Node newHead = new Node<>(item); Node oldHead; do { oldHead = top。get(); newHead。next = oldHead; } while (!top。compareAndSet(oldHead, newHead)); } public E pop() { Node oldHead; Node newHead; do { oldHead = top。get(); if (oldHead == null) return null; newHead = oldHead。next; } while (!top。compareAndSet(oldHead, newHead)); return oldHead。item; }}

總結

總體來講原始碼比較簡單,因為本身只是一個Future 模式的實現

但是其中的狀態量的設定,還有很多無鎖的處理方式,才是 FutureTask 帶給我們的精華

「併發進階」Future