JUC執行緒池-ThreadPoolExecutor

下圖是ThreadPoolExecutor的繼承體系:

JUC執行緒池-ThreadPoolExecutor

ThreadPoolExecutor繼承了AbstractExecutorService,擁有由該抽象類實現的submit、invokeAny、invokeAll方法,具體實現在上一節中已經分析過了。使用執行緒池提高了大量非同步任務的執行效能,減少了每個任務的呼叫開銷。並且提供了一種限制和管理資源(包括執行緒)的方法,還維護了一些基本的統計資訊,提供了豐富的引數來建立和調節執行緒池。

ThreadPoolExecutor是一種最常用的執行緒池,開發者建立好執行緒池,只需準備好待執行的任務並提交,執行緒池會給任務分配執行緒,執行任務,最後可獲取到任務的執行結果。

任務:Callable、Runable、FutureTask

ThreadPoolExecutor對於執行的任務,涉及到三個類:Callable、Runable、FutureTask,其中Runable和Callable應該是比較熟悉了,下面來介紹一下它們在ThreadPoolExecutor中的作用。

Runnable的執行緒執行體run方法,沒有返回值,不能向上丟擲異常;

Callable的執行緒執行體call方法,支援泛型,有返回值,可以向上丟擲異常。

可以直接將Runnable或Callable任務直接丟進執行緒池以待執行,如果是以需要獲取返回值的形式提交任務,兩者都會被包裝成FutureTask再作為任務執行,因為是需要透過FutureTask獲取執行後的返回值。

JUC執行緒池-ThreadPoolExecutor

FutureTask實現了Runnable和Future介面,FutureTask是一個可取消的非同步計算任務,提供了實現於Future的開始和取消任務的方法,可以查詢任務是否完成,以及透過get方法阻塞式等待任務結束獲取任務結果。

FutureTask

該任務有以下幾種狀態:

/* * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */private volatile int state;private static final int NEW          = 0;// 新建private static final int COMPLETING   = 1;// 即將完成private static final int NORMAL       = 2;// 正常完成private static final int EXCEPTIONAL  = 3;// 異常private static final int CANCELLED    = 4;// 被取消private static final int INTERRUPTING = 5;// 中斷private static final int INTERRUPTED  = 6;// 已中斷

/** The underlying callable; nulled out after running */// 真正執行的執行緒體回撥private Callable callable;/** The result to return or exception to throw from get() */// 執行結果private Object outcome; // non-volatile, protected by state reads/writes/** The thread running the callable; CASed during run() */// 執行執行緒private volatile Thread runner;/** Treiber stack of waiting threads */// 等待執行的執行緒節點private volatile WaitNode waiters;

FutureTask有兩個構造方法,分別對Runable和Callable進行包裝:

/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Callable}。 * * 直接將callable包裝成FutureTask,設定初始狀態NEW * * @param callable the callable task * @throws NullPointerException if the callable is null */public FutureTask(Callable callable) {    if (callable == null)        throw new NullPointerException();    this。callable = callable;    this。state = NEW;       // ensure visibility of callable}/** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion。 * * 需要將Runnable包裝成Callable物件,在包裝成FutureTask物件,設定初始狀態NEW * * @param runnable the runnable task * @param result the result to return on successful completion。 If * you don‘t need a particular result, consider using * constructions of the form: * {@code Future<?> f = new FutureTask(runnable, null)} * @throws NullPointerException if the runnable is null */public FutureTask(Runnable runnable, V result) {    this。callable = Executors。callable(runnable, result);    this。state = NEW;       // ensure visibility of callable}

將Runnable包裝成Callable是透過Executors的callable方法:

/** * Returns a {@link Callable} object that, when * called, runs the given task and returns the given result。 This * can be useful when applying methods requiring a * {@code Callable} to an otherwise resultless action。 * @param task the task to run * @param result the result to return * @param the type of the result * @return a callable object * @throws NullPointerException if task null */public static Callable callable(Runnable task, T result) {    if (task == null)        throw new NullPointerException();    return new RunnableAdapter(task, result);}/** * Returns a {@link Callable} object that, when * called, runs the given task and returns {@code null}。 * @param task the task to run * @return a callable object * @throws NullPointerException if task null */public static Callable callable(Runnable task) {    if (task == null)        throw new NullPointerException();    return new RunnableAdapter(task, null);}/** * A callable that runs given task and returns given result */static final class RunnableAdapter implements Callable {    final Runnable task;    final T result;    RunnableAdapter(Runnable task, T result) {        this。task = task;        this。result = result;   }    public T call() {        task。run();        return result;   }}

Runnable任務沒有返回值,但可以手動指定一個返回值,如果不指定,其返回值為null。使用RunnableAdapter將Runnable包裝成Callable,那麼任務就變成了Callable,任務體就是call方法,執行完直接返回給定的結果。所以無論是Callable任務還是Runnable任務,執行緒執行的任務體就是call方法。

包裝:

Callable -> FutureTask

Runnable -> Callable ->FutureTask

執行緒體:call()

FutureTask實現了Runnable介面,那麼執行緒執行的就是FutureTask的run方法了

futureTask.run()

該方法是執行緒執行任務的任務體,實際call方法才是真正的任務體,只不過已經做了層層包裝。該方法主要是執行任務,並將任務的執行結果儲存起來。

public void run() {    // 只有state==NEW才可執行    if (state != NEW ||        !UNSAFE。compareAndSwapObject(this, runnerOffset,                                     null, Thread。currentThread()))        return;    try {        Callable c = callable;        if (c != null && state == NEW) {            V result;            boolean ran;            try {                // 執行任務,並獲取返回值                result = c。call();                ran = true;           } catch (Throwable ex) {                result = null;                ran = false;                setException(ex);           }            // 儲存返回值            if (ran)                set(result);       }   } finally {        // runner must be non-null until state is settled to        // prevent concurrent calls to run()        runner = null;        // state must be re-read after nulling runner to prevent        // leaked interrupts        int s = state;        if (s >= INTERRUPTING)            handlePossibleCancellationInterrupt(s);   }}

確保任務在NEW的狀態下才可正常繼續執行,執行完畢儲存返回值。

/** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled。 * *

This method is invoked internally by the {@link #run} method * upon successful completion of the computation。 * * @param v the value */protected void set(V v) {    if (UNSAFE。compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {        // 結果儲存在FutureTask的outcome        outcome = v;        UNSAFE。putOrderedInt(this, stateOffset, NORMAL); // final state        finishCompletion();   }}

set方法是儲存任務執行結果。透過CAS的方式更改任務執行狀態,並設定返回值。任務狀態:NEW -> COMPLETING -> NORMAL。所以任務狀態在NORMAL時才可獲取返回值。

futureTask.get()

透過submit提交的任務會被打包成FutureTask物件交由執行緒池執行,並且submit方法會返回該FutureTask物件,那麼透過該物件呼叫get方法就會阻塞式等待任務執行完成,然後取得任務的執行結果。

/** * @throws CancellationException {@inheritDoc} */public V get() throws InterruptedException, ExecutionException {    int s = state;    if (s <= COMPLETING)        // 如果任務還未執行完成,則阻塞式等待        s = awaitDone(false, 0L);    // 任務被執行完成,則獲取執行結果並返回    return report(s);}

任務阻塞佇列:BlockingQueue

BlockingQueue是存放任務的阻塞佇列,分為有界阻塞佇列,和無界阻塞佇列。當核心執行緒都被佔用,那麼新來的任務就會被存放到阻塞佇列中等待被執行。內建的有以下三種佇列,也可以提供BlockingQueue介面下的其他佇列。

LinkedBlockingQueue

LinkedBlockingQueue是連結串列實現的無界阻塞佇列,可指定容量,預設不指定容量上限為Integer。MAX_VALUE。

ArrayBlockingQueue

ArrayBlockingQueue是陣列實現的有界阻塞佇列,需指定容量。

SynchronousQueue

SynchronousQueue是隻有一個只能容納一個元素的阻塞佇列。

任務執行者:Worker

worker是用來執行任務的執行緒包裝類,下面在對ThreadPoolExecutor類進行分析的時候再對它展開分析。

任務執行者容器:HashSet<Worker>

存放任務執行的容器是透過HashSet實現的,儲存執行緒池中所有的Worker。對該容器的併發控制是透過ReentrantLock和Condition實現的。

拒絕策略:RejectedExecutionHandler

拒絕策略是在使用有界阻塞佇列情況下,針對新提交的任務而言的,其觸發場景有:

執行緒池被關閉

執行緒數達到上限且阻塞佇列飽和

有四大拒絕策略,它們都實現了RejectedExecutionHandler介面:

AbortPolicy:丟擲RejectedExecutionException異常

CallerRunsPolicy:直接在該任務加入執行緒池的所線上程中執行此任務

DiscardPolicy:直接拋棄該任務

DiscardOldestPolicy:丟棄阻塞佇列中最老的未處理任務,然後重試(嘗試將新任務重新加入)

以上,就是執行緒池的主要涉及相關物件簡單介紹。接下來我們分析ThreadPoolExecutor得原始碼。

ThreadPoolExecutor

下面是Doug Lea對於ThreadPoolExecutor相關屬性的描述:

核心執行緒數corePoolSize:當一個新任務提交到執行緒池,如果執行緒池中的執行緒數小於corePoolSize,會建立一個新的執行緒去處這個新任務。

最大執行緒數maximumPoolSize:當執行緒池中的執行緒數大於corePoolSize,小於maximumPoolSize,且(有界)任務佇列飽時,會新建立物件。如果設定的corePoolSize和maximumPoolSize一樣,這就是一個固定大小的執行緒池;如果在

按需建立:在預設的情況下,每一個核心執行緒只在有新任務到臨時才建立,但是可以透過呼叫prestartCoreThread或者pestartAllCoreThreads來動態修改。

執行緒建立:執行緒是由ThreadFactory建立,預設是透過DefaultThreadFactory建立,這樣建立的執行緒在同一個執行緒組中,擁有相同的優先順序,都是非守護執行緒。可以透過使用其他的ThreadFactory,實現更改執行緒名稱、執行緒組、優先順序、守護執行緒狀態。如果ThreadFactory建立執行緒失敗,會從建立執行緒的newThread方法返回null,執行緒池可以繼續執行,但是可能無法執行任何任務

存活時長keepAliveTime:如果當前執行緒池中的執行緒數大於corePoolSize,超過核心corePoolSize的執行緒,如果空閒超過keepActiveTime的時長會被終止,線上程池活躍度降低的情況下能減少資源消耗,但是如果執行緒池比之前的還要活躍,那麼將建立新的執行緒。keepAliveTime只作用於超過核心執行緒數的執行緒,可以透過allowCoreThreadOut(boolean)方法設定keepActiveTime是否也適用於核心執行緒,

BlockingQueue:被用來傳遞和儲存被提交的任務。如果任務提交時執行緒數小於corePoolSize,則會新建立執行緒,而不是將任務放入隊裡;如果提交任務時執行緒數大於大於等於corePoolSize,新提交的任務會放入阻塞佇列;如果新任務無法入隊,則會建立新的執行緒去執行這個任務,直到執行緒數超過maximumPoolSize,就會響應拒絕策略。提供了三種佇列策略:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue。

佇列維護Queue maintenance:可以透過getQueue()對執行者佇列監控或除錯。

拒絕任務:ThreadPoolExecutor。AbortPolicy、ThreadPoolExecutor。CallerRunsPolicy\ThreadPoolExecutor。DiscardPolicy,ThreadPoolExecutor。DiscardOldestPolicy,上面已經介紹過這四種拒絕策略了。支援自定義拒絕策略。

執行緒池完成:如果執行緒池不再被引用,沒有剩餘的執行緒會自動關閉。如果想要確保在沒有呼叫shutdown的情況下沒有被引用的執行緒池被回收,必須透過設定keepAliveTime,使用0核心執行緒的下限或者設定allowCoreThreadTimeOut,來讓空閒的執行緒終止。

執行緒池的狀態

RUNNING:可以接受新任務,處理任務佇列中的任務

SHUTDOWN:不會接受新任務,但是會處理掉任務佇列中的任務

STOP:不會接受新的任務,也不會處理任務佇列中的任務,還會中斷每一個正在執行的任務

TIDYING:沒有待執行任務,執行任務的Worker為0,執行緒會變成空閒狀態,並呼叫terminated()方法

TERMINATED:已完成

執行緒池狀態轉換

RUNNING -> SHUTDOWN:呼叫shutdown()方法,ThreadPoolService重寫了finalize()方法,內部也呼叫了shutdown()方法;

(RUNNING/SHUTDOWN) -> STOP:呼叫shutdownNOw()方法;

SHUTDOWN -> TIDYING:當任務佇列和執行緒池為空;

STOP -> TIDYING:當執行緒池為空

TIDYING -> TERMINATED:terminated()執行完成;

屬性

對執行緒池的狀態控制和監測是透過一個原子變數ctl來實現的,這個原子變數維護了兩個概念:

工作者執行緒數

執行狀態

使用原子變數ctl的高3位表示狀態,低29位表示執行緒數。

// 表示執行緒池狀態和執行緒數的原子變數,初始值為RUNNING,以後稱之為控制變數private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 記錄執行緒數的位數 29private static final int COUNT_BITS = Integer。SIZE - 3;// 執行緒池的最大容量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 111 0 0000 0000 0000 0000 0000 0000 0000private static final int RUNNING    = -1 << COUNT_BITS;// 000 0 0000 0000 0000 0000 0000 0000 0000private static final int SHUTDOWN   =  0 << COUNT_BITS;// 001 0 0000 0000 0000 0000 0000 0000 0000private static final int STOP       =  1 << COUNT_BITS;// 010 0 0000 0000 0000 0000 0000 0000 0000private static final int TIDYING    =  2 << COUNT_BITS;// 011 0 0000 0000 0000 0000 0000 0000 0000private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctl// 獲取執行緒池狀態private static int runStateOf(int c)     { return c & ~CAPACITY; }// 獲取執行緒數private static int workerCountOf(int c) { return c & CAPACITY; }// private static int ctlOf(int rs, int wc) { return rs | wc; }// 任務等待佇列private final BlockingQueue workQueue;// 執行緒池的鎖private final ReentrantLock mainLock = new ReentrantLock();// 執行者佇列,也就是包裝的執行緒佇列private final HashSet workers = new HashSet();// 等待中止的條件private final Condition termination = mainLock。newCondition();// 記錄池中執行緒達到過的最大值private int largestPoolSize;// 執行緒池已完成的任務數private long completedTaskCount;// 建立執行緒的工廠private volatile ThreadFactory threadFactory;// 執行緒池拒絕策略,會線上程池飽和或關閉後執行private volatile RejectedExecutionHandler handler;// 空閒執行緒池存活時長private volatile long keepAliveTime;// 是否對核心執行緒應用keepAliveTime,預設falseprivate volatile boolean allowCoreThreadTimeOut;// 執行緒池核心大小,核心執行緒數private volatile int corePoolSize;// 限定執行緒池的最大容量private volatile int maximumPoolSize;/** * Set containing all worker threads in pool。 Accessed only when * holding mainLock。 */private final HashSet workers = new HashSet();

建立執行緒池

下面是引數最全的構造方法:

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters。 * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating。 * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed。 This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method。 * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this。acc = System。getSecurityManager() == null ? null : AccessController。getContext(); this。corePoolSize = corePoolSize; this。maximumPoolSize = maximumPoolSize; this。workQueue = workQueue; this。keepAliveTime = unit。toNanos(keepAliveTime); this。threadFactory = threadFactory; this。handler = handler; }

建立執行緒池需指定7個有效的引數:

核心執行緒數

執行緒池最大容量

空閒執行緒存活時長

時長單位

任務阻塞佇列

執行緒工廠

拒絕策略

下面是其他過載構造:

// 預設執行緒工廠public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors。defaultThreadFactory(), handler);}// 預設拒絕策略public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler);}// 預設執行緒工廠,預設拒絕策略public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors。defaultThreadFactory(), defaultHandler);}

ThreadPoolExecutor有七個配置引數,四個構造方法。四個構造就是指定或不指定執行緒工廠或拒絕策略,必須要指定核心執行緒數、最大執行緒數、執行緒空閒時長、時長單位、任務等待佇列。

Worker

worker是ThreadPoolExecutor類中的內部類,Worker是任務的執行者,繼承了AQS,實現了Runbable。它被建立時可以接受一個初始任務,同時用執行緒工廠建立一個執行緒,等待被執行。Worker繼承了AQS,它是一個獨佔鎖,當對它的執行緒和任務進行操作時,會對其加鎖。

/** * Class Worker mainly maintains interrupt control state for * threads running tasks, along with other minor bookkeeping。 * This class opportunistically extends AbstractQueuedSynchronizer * to simplify acquiring and releasing a lock surrounding each * task execution。 This protects against interrupts that are * intended to wake up a worker thread waiting for a task from * instead interrupting a task being run。 We implement a simple * non-reentrant mutual exclusion lock rather than use * ReentrantLock because we do not want worker tasks to be able to * reacquire the lock when they invoke pool control methods like * setCorePoolSize。 Additionally, to suppress interrupts until * the thread actually starts running tasks, we initialize lock * state to a negative value, and clear it upon start (in * runWorker)。 */private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning。 */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in。 Null if factory fails。 */ // 執行任務的執行緒,一個Worker繫結一個執行緒 final Thread thread; /** Initial task to run。 Possibly null。 */ // 執行緒任務 Runnable firstTask; /** Per-thread task counter */ // 該執行緒累計執行完成的任務數 volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory。 * @param firstTask the first task (null if none) * 新建執行緒執行任務 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker // 待執行任務 this。firstTask = firstTask; // 建立執行緒,呼叫該thread的start方法,就會執行下面的run方法 this。thread = getThreadFactory()。newThread(this); } /** Delegates main run loop to outer runWorker */ // 重寫Runnable的run方法,執行緒執行體 public void run() { runWorker(this); } // Lock methods // // The value 0 represents the unlocked state。 執行緒空閒 state=0 // The value 1 represents the locked state。 執行緒執行中 state=1 protected boolean isHeldExclusively() { return getState() != 0; } // 嘗試獲取執行權 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread。currentThread()); return true; } return false; } // 放棄執行權 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } // 中斷正在執行的執行緒 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t。isInterrupted()) { try { t。interrupt(); } catch (SecurityException ignore) { } } }}

Worker類封裝了執行緒物件,任務,以及該執行緒累計執行完的任務數。透過AQS狀態管理執行緒的執行狀態,支援中斷響應。

RejectedExecutionHandler

上面已經介紹了拒絕策略,再來回顧一下吧。拒絕策略是在

使用有界阻塞佇列

情況下,針對新提交的任務而言的,其觸發場景有:

執行緒池被關閉

執行緒數達到上限且阻塞佇列飽和

ThreadPoolExecutor中已實現有四大拒絕策略,它們都實現了RejectedExecutionHandler介面:

AbortPolicy:丟擲RejectedExecutionException異常

CallerRunsPolicy:直接在該任務加入執行緒池的所線上程中執行此任務

DiscardPolicy:直接拋棄該任務

DiscardOldestPolicy:丟棄阻塞佇列中最老的未處理任務,然後再次提交(嘗試將新任務重新加入)

/* Predefined RejectedExecutionHandlers *//** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded。 */public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}。 */ public CallerRunsPolicy() { } /** * Executes task r in the caller’s thread, unless the executor * has been shut down, in which case the task is discarded。 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 如果執行緒池沒有被關閉,直接在當前執行緒執行新任務 if (!e。isShutdown()) { r。run(); } }}/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}。 */public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}。 */ public AbortPolicy() { } /** * Always throws RejectedExecutionException。 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接丟擲拒絕執行異常 throw new RejectedExecutionException(“Task ” + r。toString() + “ rejected from ” + e。toString()); }}/** * A handler for rejected tasks that silently discards the * rejected task。 */public static class DiscardPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardPolicy}。 */ public DiscardPolicy() { } /** * Does nothing, which has the effect of discarding task r。 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 並未做任何事,也就是不處理,新任務會被丟棄 }}/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded。 */public static class DiscardOldestPolicy implements RejectedExecutionHandler { /** * Creates a {@code DiscardOldestPolicy} for the given executor。 */ public DiscardOldestPolicy() { } /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded。 * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { // 直接丟棄等待佇列中等待最久的任務,也就是佇列頭的任務,給新任務騰地兒,然後把新任務加入執行緒池。 if (!e。isShutdown()) { e。getQueue()。poll(); e。execute(r); } }}

提交任務submit、execute

提交的執行緒任務可分為Runnable任務和Callable任務,兩種任務都會被打包成FutureTask,上面已經詳細分析過了。提交的執行緒任務可分為submit方式提交和execute方式提交,Runnable和Callable任務都可以透過submit方式提交以獲得執行結果。execute只能提交Runnable任務,且沒有返回結果。

在上一節中已經對AbstractExecutorService中的submit方法做了分析,其中可以發現,submit提交的任務也是交給execute來執行的。submit將Runnable和Callable任務打包成FutureTask然後交給execute去執行,如下:

/** * @throws RejectedExecutionException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */public Future submit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task); execute(ftask); return ftask;}

所以不管是哪種任務、哪種提交方式,最終都是透過execute去執行。下面隆重請execute出場:

execute(Runnable command)

/** * Executes the given task sometime in the future。 The task * may execute in a new thread or in an existing pooled thread。 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}。 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */public void execute(Runnable command) { if (command == null) throw new NullPointerException(); /* * Proceed in 3 steps: * * 1。 If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task。 The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn‘t, by returning false。 * 1。 如果執行的執行緒比核心執行緒數少,則會試圖建立一個新的執行緒來執行這個新提交的任務,然後會呼叫addWorker檢查檢查執行緒池執行狀態和執行緒數,新增新執行緒失敗會返回false * 2。 If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method。 So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none。 * 2。 如果任務能成功進入等待佇列,仍然會做雙重檢查是否應該新增新的執行緒,因為可能在上一次檢查後有執行緒終止了,或者剛進入這個方法的時候執行緒池關閉了。所以重複檢查狀態,如果執行緒停止是否必要退出佇列,或者沒有執行緒時新建執行緒。 * 3。 If we cannot queue task, then we try to add a new * thread。 If it fails, we know we are shut down or saturated * and so reject the task。 * 如果任務不能入隊,則會嘗試新建一個執行緒。如果新建失敗,則表示執行緒池被關閉或者飽和,就會拒絕該任務 */ // 獲取控制變數 int c = ctl。get(); // 當前池中執行緒數小於核心執行緒數 if (workerCountOf(c) < corePoolSize) { // 新建執行緒,並執行該任務 if (addWorker(command, true)) return; c = ctl。get(); } // 執行緒池狀態正常,且能將任務放入佇列 if (isRunning(c) && workQueue。offer(command)) { // 再次獲取控制變數 int recheck = ctl。get(); // 執行緒池沒有執行,將此任務出隊,並執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 否則檢查執行執行緒的個數,沒有執行緒開啟新執行緒 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 執行緒池狀態正常,將任務放入佇列失敗,則執行緒池飽和(即執行緒佇列飽和)會新建執行緒去執行新提交的任務,如果新建執行緒失敗則會執行拒絕策略 else if (!addWorker(command, false)) reject(command);}

在execute方法中對任務提交有以下流程

直接用addworker新建執行緒來執行這個任務

將任務入隊

執行拒絕策略

addWorker(Runnable firstTask, boolean core)

private boolean addWorker(Runnable firstTask, boolean core) { retry: // 判斷執行緒池狀態及容量 for (;;) { // 獲取控制變數 int c = ctl。get(); // 獲取執行狀態 int rs = runStateOf(c); // Check if queue empty only if necessary。 判斷執行緒池狀態 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue。isEmpty())) return false; for (;;) { // 當前執行緒數 int wc = workerCountOf(c); // 執行緒數大於最大容量(對於沒有指定佇列容量而言) 或者 // 判斷是否被指定為核心執行緒 // 若為核心執行緒,則判斷當前執行緒數是否大於核心執行緒數,大於返回false,接下來會進行入隊操作 // 不為核心執行緒,則判斷當前執行緒數是否大於池指定最大容量,大於返回false,接下來會執行拒絕操作 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) // 以上情況都不能建立新執行緒 return false; // 增加一個執行緒數 if (compareAndIncrementWorkerCount(c)) // 跳出迴圈 break retry; c = ctl。get(); // Re-read ctl // 執行緒池狀態被改變 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } // 下面是新增執行緒 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 透過工廠新建執行緒,傳入當前任務作Worker中執行緒的第一個任務 w = new Worker(firstTask); final Thread t = w。thread; if (t != null) { final ReentrantLock mainLock = this。mainLock; // 對執行緒集合操作加鎖 mainLock。lock(); try { // Recheck while holding lock。 // Back out on ThreadFactory failure or if // shut down before lock acquired。 int rs = runStateOf(ctl。get()); // 判斷執行緒池狀態 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 檢查新建執行緒的狀態,剛剛新建的執行緒,都沒有加入執行緒集合,就被start運行了? if (t。isAlive()) // precheck that t is startable // 這裡面有問題,拋異常吧 throw new IllegalThreadStateException(); // 將新建的執行緒放入執行緒集合中,統一管理 workers。add(w); int s = workers。size(); // 更新執行緒池達到的容量峰值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { // 釋放鎖 mainLock。unlock(); } // 用新建執行緒執行新新增的任務 if (workerAdded) { t。start(); workerStarted = true; } } } finally { // 執行緒啟動失敗,則執行相應的移出執行緒集等操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

addWorker的主要操作就是檢查執行緒池狀態,檢查執行緒池容量,加鎖,新建核心或非核心執行緒,加入執行緒集合,可立即執行傳入的任務,然後釋放鎖。

總的來說,對於新任務提交到執行緒池:

當池中執行緒數小於核心執行緒數時,新增新任務時不會進行入隊操作,會新建Worker執行緒,然後執行該任務。

如果池中執行緒數大於等於核心執行緒數,新增的新任務就會嘗試被加入等待佇列,等待被空閒執行緒執行。

如果新任務入隊失敗(即任務佇列飽和),則會新建執行緒來執行這個任務,但是如果池中匯流排程數超過了上限(無邊界佇列上限為1<<29-1,有邊界佇列上限為指定的maximumPoolSize),則會執行拒絕策略。

根據上面分析的Worker原始碼可以知道,Worker裡的執行緒執行的Runnable就是Worker本身,Worker實現了Runnable介面,也就是

t.start();

執行的是Worker的run方法。Worker裡的執行緒第一個任務就是透過構造方法傳入的,也就是上面的firstTask。

但是,Worker中run並沒有給出具體實現,而是呼叫了ThreadPoolExecutor中的runWorker方法:

/** Delegates main run loop to outer runWorker */public void run() { runWorker(this);}

runWorker(Worker w)

/** * Main worker run loop。 Repeatedly gets tasks from queue and * executes them, while coping with a number of issues: * 1。 We may start out with an initial task, in which case we * don’t need to get the first one。 Otherwise, as long as pool is * running, we get tasks from getTask。 If it returns null then the * worker exits due to changed pool state or configuration * parameters。 Other exits result from exception throws in * external code, in which case completedAbruptly holds, which * usually leads processWorkerExit to replace this thread。 * 2。 Before running any task, the lock is acquired to prevent * other pool interrupts while the task is executing, and then we * ensure that unless pool is stopping, this thread does not have * its interrupt set。 * * 3。 Each task run is preceded by a call to beforeExecute, which * might throw an exception, in which case we cause thread to die * (breaking loop with completedAbruptly true) without processing * the task。 * * 4。 Assuming beforeExecute completes normally, we run the task, * gathering any of its thrown exceptions to send to afterExecute。 * We separately handle RuntimeException, Error (both of which the * specs guarantee that we trap) and arbitrary Throwables。 * Because we cannot rethrow Throwables within Runnable。run, we * wrap them within Errors on the way out (to the thread‘s * UncaughtExceptionHandler)。 Any thrown exception also * conservatively causes thread to die。 * * 5。 After task。run completes, we call afterExecute, which may * also throw an exception, which will also cause thread to * die。 According to JLS Sec 14。20, this exception is the one that * will be in effect even if task。run throws。 * * The net effect of the exception mechanics is that afterExecute * and the thread’s UncaughtExceptionHandler have as accurate * information as we can provide about any problems encountered by * user code。 * * @param w the worker */final void runWorker(Worker w) { // 獲取當前執行緒,也就是w中的新建執行緒 Thread wt = Thread。currentThread(); // 獲取第一個任務 Runnable task = w。firstTask; w。firstTask = null; w。unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果task為空,則表明不是第一個任務,那麼需要呼叫getTask從佇列中獲取任務,這是一個阻塞式的新式 while (task != null || (task = getTask()) != null) { // 對w物件加獨佔鎖,當前執行緒獲取鎖資源 w。lock(); // 當執行緒池停止執行,要確保執行緒被中斷 // If pool is stopping, ensure thread is interrupted; // 如果沒有停止,則要確保執行緒沒有被中斷。需要進行重複檢查來處理清除中斷時呼叫shutdownNow // 只判斷了STOP狀態 // if not, ensure thread is not interrupted。 This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl。get(), STOP) || (Thread。interrupted() && runStateAtLeast(ctl。get(), STOP))) && !wt。isInterrupted()) wt。interrupt(); try { // 執行任務前的操作 beforeExecute(wt, task); Throwable thrown = null; try { // 現在已經執行緒在運行了,執行緒執行執行的是Worker中的run方法,然後呼叫的該方法,當前就線上程體中,但還需要執行任務,所以手動呼叫Runnable任務中的run方法來執行任務 task。run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 一個任務執行結束的後序操作 afterExecute(task, thrown); } } finally { // 任務執行結束,任務變數置為null task = null; // 更新該執行緒執行任務的總數 w。completedTasks++; // 釋放鎖 w。unlock(); } } // 正常執行結束為false completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }}

這個runWorker方法就是Worker中透過執行緒工廠建立的執行緒的執行體,它建立時就會攜帶一個任務,然後執行這個任務,第一個任務就不會從佇列中獲取。

runWorker會啟動一個while迴圈,要麼是第一個任務,要麼從線任務佇列中獲取任務,然後對當前執行緒加鎖,檢查執行緒池狀態,執行任務,更新統計資料,釋放鎖資源,繼續從佇列中獲取下一個任務。這個任務並不是提交給執行緒直接執行的,而是讓執行緒執行Worker,在被執行緒執行的Worker中的run方法中再去手動呼叫Runnable真正的任務體run方法。執行完一個任務後這個執行緒並不會立即結束,因為是在while中,那麼還會呼叫getTask從任務佇列中獲取下一個待執行任務,直到getTask返回null,該執行緒就會被銷燬,方法做該執行緒的後序操作並呼叫processWorkerExit方法做該執行緒的後序操作。

processWorkerExit(Worker w, boolean completedAbruptly)

/** * Performs cleanup and bookkeeping for a dying worker。 Called * only from worker threads。 Unless completedAbruptly is set, * assumes that workerCount has already been adjusted to account * for exit。 This method removes thread from worker set, and * possibly terminates the pool or replaces the worker if either * it exited due to user task exception or if fewer than * corePoolSize workers are running or queue is non-empty but * there are no workers。 * * @param w the worker * @param completedAbruptly if the worker died due to user exception */private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted // 執行緒非正常結束,執行緒數-1 decrementWorkerCount(); final ReentrantLock mainLock = this。mainLock; // 需要更新執行緒池完成任務總數、從執行緒集合中移除該執行緒,所以需要對執行緒池加鎖 mainLock。lock(); try { completedTaskCount += w。completedTasks; workers。remove(w); } finally { mainLock。unlock(); } tryTerminate(); int c = ctl。get(); // 執行緒池狀態 if (runStateLessThan(c, STOP)) { // 執行緒由於沒有獲取到任務而正常結束 if (!completedAbruptly) { // 如果設定了核心執行緒的空閒時長 // 執行緒池中允許的最小執行緒數為0,如果沒設定,則最小執行緒數為核心執行緒數 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 設定核心執行緒空閒時長,且任務佇列不為空 if (min == 0 && ! workQueue。isEmpty()) min = 1; // 執行緒池中執行緒數大於最低執行緒數,直接返回 if (workerCountOf(c) >= min) return; // replacement not needed } // 執行緒異常結束,則會再新建執行緒 // 正常結束: // 1。設定核心執行緒空閒等待時長,任務佇列為空,則不會新建執行緒 // 2。設定核心執行緒空閒等待時長,任務佇列不為空,池中執行緒數還有,則不會新建執行緒 // 3。沒有設定核心執行緒空閒等待時長,執行緒數大於核心執行緒數,表明死掉的是非核心執行緒,則不會新建執行緒 // 4。沒有設定核心空閒等待時長,執行緒數小於核心執行緒數,表明死掉的是核心執行緒,新建執行緒 addWorker(null, false); }}

processWorkerExit方法就是在一個執行緒正常或異常結束後的首未操作。首先會更新執行緒池的總完成任務。然後根據執行緒是否正常結束、是否設定核心執行緒等待時長、佇列是否為空來判斷是否要新建一個替換執行緒:

執行緒異常結束,則會再新建一個執行緒

執行緒正常結束,設定了核心執行緒空閒等待時長,不管任務佇列有沒有,都不會再新建執行緒。既然是核心執行緒正常結束,表明執行緒池很空閒了。

執行緒正常結束,沒有設定核心執行緒空閒等待時長,任務佇列為空,如果池中執行緒數大於核心執行緒數,則表明死的是非核心執行緒,不會新建執行緒。如果池中執行緒數小於核心執行緒數,死的就是核心執行緒,那就得維持核心執行緒數,則新建執行緒。

。。。剩下的慢慢推吧

反正就是執行緒異常結束,會新建執行緒來填坑;執行緒正常結束,池未設定核心執行緒等待時長則不會新建執行緒;執行緒正常結束,池設定了核心執行緒空閒等待時長,如果池中執行緒低於核心執行緒數,就會新建執行緒來維持池中核心執行緒數。

關閉執行緒池 shutdown、shutdownNow

shutdown()

/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted。 * Invocation has no additional effect if already shut down。 *

This method does not wait for previously submitted tasks to * complete execution。 Use {@link #awaitTermination awaitTermination} * to do that。 * * @throws SecurityException {@inheritDoc} */public void shutdown() { final ReentrantLock mainLock = this。mainLock; // 獲取執行緒池鎖 mainLock。lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); // 中斷執行緒 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock。unlock(); } tryTerminate();}

執行緒池被呼叫shutdown方法後,會將池狀態置為SHUTDOWN,並不會再接受新的任務,新任務提交時會執行拒絕策略。但是會等待呼叫shutdown之前已提交的全部任務執行完畢才關閉執行緒池。

shutdownNow()

/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution。 These tasks are drained (removed) * from the task queue upon return from this method。 * *

This method does not wait for actively executing tasks to * terminate。 Use {@link #awaitTermination awaitTermination} to * do that。 * *

There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks。 This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate。 * * @throws SecurityException {@inheritDoc} */public List shutdownNow() { List tasks; final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock。unlock(); } tryTerminate(); return tasks;}

執行緒池呼叫shutdownNow方法,池狀態置為STOP,會透過interrupt的方式將執行緒置為中斷狀態,以此試圖立即去停止正在執行的任務,停止正在佇列中等待的任務,不會接受新的任務,並返回一個等待被執行的任務佇列,這些任務也會從等待佇列刪除。

也就是說只會觸發執行緒的中斷,如sleep,await,Condition等會響應中斷,然後根據任務的具體實現做出響應後序操作,可能執行緒會立即停止執行,也可能不會。不會影響不響應中斷的任務,或者對中斷響應做了處理的,執行緒池依舊會等待全部任務執行結束才會關閉。

awaitTermination(long timeout, TimeUnit unit)

public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit。toNanos(timeout); final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { for (;;) { if (runStateAtLeast(ctl。get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination。awaitNanos(nanos); } } finally { mainLock。unlock(); }}

awaitTermination是檢測執行緒池的狀態是否為TERMINATED,支援設定等待時長阻塞等待再判斷,並不會影響池狀態。池已終止返回ture,否則返回false。

至此我們分析了執行緒池從建立的可操作的狀態。執行緒池的狀態:RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED。執行緒池被建立時狀態為RUNNING,當呼叫shutdown時執行緒池狀態為SHUTDOWN,當呼叫shuwdownNow時執行緒池狀態為STOP,另外的TIDYING、TERMINATED不需要我們進行控制,當池和佇列為空時執行緒池就處於TIDYING狀態,當執行緒池終止執行的時候狀態為TERMINATED。

動態配置

上面介紹了透過構造引數執行緒池,執行緒池也支援透過方法來動態配置來進行最佳化。

prestartCoreThread()

執行緒池建立後,預設是沒有執行緒的,只有新增任務才會動態新建核心執行緒,直到達到核心執行緒數。

prestartCoreThread是啟動一個空閒的核心執行緒,等待執行任務

/** * Starts a core thread, causing it to idly wait for work。 This * overrides the default policy of starting core threads only when * new tasks are executed。 This method will return {@code false} * if all core threads have already been started。 * * @return {@code true} if a thread was started */public boolean prestartCoreThread() { return workerCountOf(ctl。get()) < corePoolSize && addWorker(null, true);}

prestartAllCoreThreads

prestartAllCoreThreads把設定數量的核心執行緒都啟動起來,等待執行任務

/** * Starts all core threads, causing them to idly wait for work。 This * overrides the default policy of starting core threads only when * new tasks are executed。 * * @return the number of threads started */public int prestartAllCoreThreads() { int n = 0; while (addWorker(null, true)) ++n; return n;}

setCorePoolSize(int corePoolSize)

設定核心執行緒數,會覆蓋透過構造方法設定的核心執行緒數,如果新值小於已配置的核心執行緒數,則會把超出新核心執行緒數的執行緒在它們空閒時終止掉。如果新值更大,則會再需要的時候新建然後去執行新任務

/** * Sets the core number of threads。 This overrides any value set * in the constructor。 If the new value is smaller than the * current value, excess existing threads will be terminated when * they next become idle。 If larger, new threads will, if needed, * be started to execute any queued tasks。 * * @param corePoolSize the new core size * @throws IllegalArgumentException if {@code corePoolSize < 0} * @see #getCorePoolSize */public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0) throw new IllegalArgumentException(); int delta = corePoolSize - this。corePoolSize; this。corePoolSize = corePoolSize; if (workerCountOf(ctl。get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don’t really know how many new threads are “needed”。 // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so。 int k = Math。min(delta, workQueue。size()); while (k—— > 0 && addWorker(null, true)) { if (workQueue。isEmpty()) break; } }}

setMaximumPoolSize(int maximumPoolSize)

設定池中允許的最大執行緒數,會覆蓋透過構造方法設定的最大執行緒數。如果新值小於已配置的值,則會把超出新最大執行緒數的執行緒在它們空閒時終止掉。

/** * Sets the maximum allowed number of threads。 This overrides any * value set in the constructor。 If the new value is smaller than * the current value, excess existing threads will be * terminated when they next become idle。 * * @param maximumPoolSize the new maximum * @throws IllegalArgumentException if the new maximum is * less than or equal to zero, or * less than the {@linkplain #getCorePoolSize core pool size} * @see #getMaximumPoolSize */public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this。maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl。get()) > maximumPoolSize) interruptIdleWorkers();}

setKeepAliveTime(long time, TimeUnit unit)

設定執行緒終止前的空閒時長,如果是非核心執行緒,在空閒等待time後依舊沒有獲得任務執行,則會被終止。會覆蓋透過構造方法設定的值

/** * Sets the time limit for which threads may remain idle before * being terminated。 If there are more than the core number of * threads currently in the pool, after waiting this amount of * time without processing a task, excess threads will be * terminated。 This overrides any value set in the constructor。 * * @param time the time to wait。 A time value of zero will cause * excess threads to terminate immediately after executing tasks。 * @param unit the time unit of the {@code time} argument * @throws IllegalArgumentException if {@code time} less than zero or * if {@code time} is zero and {@code allowsCoreThreadTimeOut} * @see #getKeepAliveTime(TimeUnit) */public void setKeepAliveTime(long time, TimeUnit unit) { if (time < 0) throw new IllegalArgumentException(); if (time == 0 && allowsCoreThreadTimeOut()) throw new IllegalArgumentException(“Core threads must have nonzero keep alive times”); long keepAliveTime = unit。toNanos(time); long delta = keepAliveTime - this。keepAliveTime; this。keepAliveTime = keepAliveTime; if (delta < 0) interruptIdleWorkers();}

setThreadFactory(ThreadFactory threadFactory)

設定執行緒池建立執行緒的執行緒工廠

/** * Sets the thread factory used to create new threads。 * * @param threadFactory the new thread factory * @throws NullPointerException if threadFactory is null * @see #getThreadFactory */public void setThreadFactory(ThreadFactory threadFactory) { if (threadFactory == null) throw new NullPointerException(); this。threadFactory = threadFactory;}

setRejectedExecutionHandler(RejectedExecutionHandler handler)

設定對新任務的拒絕策略

/** * Sets a new handler for unexecutable tasks。 * * @param handler the new handler * @throws NullPointerException if handler is null * @see #getRejectedExecutionHandler */public void setRejectedExecutionHandler(RejectedExecutionHandler handler) { if (handler == null) throw new NullPointerException(); this。handler = handler;}

allowCoreThreadTimeOut(boolean value)

設定是否將keepAliveTime適用於核心執行緒,核心執行緒若空閒超時也會被終止

/** * Sets the policy governing whether core threads may time out and * terminate if no tasks arrive within the keep-alive time, being * replaced if needed when new tasks arrive。 When false, core * threads are never terminated due to lack of incoming * tasks。 When true, the same keep-alive policy applying to * non-core threads applies also to core threads。 To avoid * continual thread replacement, the keep-alive time must be * greater than zero when setting {@code true}。 This method * should in general be called before the pool is actively used。 * * @param value {@code true} if should time out, else {@code false} * @throws IllegalArgumentException if value is {@code true} * and the current keep-alive time is not greater than zero * * @since 1。6 */public void allowCoreThreadTimeOut(boolean value) { if (value && keepAliveTime <= 0) throw new IllegalArgumentException(“Core threads must have nonzero keep alive times”); if (value != allowCoreThreadTimeOut) { allowCoreThreadTimeOut = value; if (value) interruptIdleWorkers(); }}

finalize()

ThreadPoolExecutor重寫了Object的finalize方法,當執行緒池物件不再被引用時,會先呼叫shutdown方法,結束掉池中的任務,再執行銷燬。

/** * Invokes {@code shutdown} when this executor is no longer * referenced and it has no threads。 */protected void finalize() { SecurityManager sm = System。getSecurityManager(); if (sm == null || acc == null) { shutdown(); } else { PrivilegedAction pa = () -> { shutdown(); return null; }; AccessController。doPrivileged(pa, acc); }}