深入 Java 執行緒池:從設計思想到原始碼解讀

為什麼需要執行緒池

我們知道建立執行緒的常用方式就是

new Thread()

,而每一次

new Thread()

都會重新建立一個執行緒,而執行緒的建立和銷燬都需要耗時的,不僅會消耗系統資源,還會降低系統的穩定性。在 jdk1。5 的 JUC 包中有一個 Executors,他能使我們建立的執行緒得到複用,不會頻繁的建立和銷燬執行緒。

執行緒池首先建立一些執行緒,它們的集合稱為執行緒池。使用執行緒池可以很好地提高效能,執行緒池在系統啟動時即建立大量空閒的執行緒,程式將一個任務傳給執行緒池,執行緒池就會啟動一條執行緒來執行這個任務,執行結束以後,該執行緒並不會死亡,而是再次返回執行緒池中成為空閒狀態,等待執行下一個任務。

先不管它到底是個啥,先看看使用執行緒池和

new Thread()

的耗時情況:

public class ThreadPoolTest { static CountDownLatch latch = new CountDownLatch(100000); static ExecutorService es = Executors。newFixedThreadPool(4); public static void main(String[] args) throws InterruptedException { long timeStart = System。currentTimeMillis(); for (int i = 0; i < 100000; i++) { newThread(); //executors(); } latch。await(); System。out。println(System。currentTimeMillis() - timeStart); es。shutdown(); } /** * 使用執行緒池 */ public static void executors() { es。submit(() -> { latch。countDown(); }); } /** * 直接new */ public static void newThread() { new Thread(() -> { latch。countDown(); })。start(); }}

對於 10 萬個執行緒同時跑,如果使用 new 的方式耗時:

深入 Java 執行緒池:從設計思想到原始碼解讀

使用執行緒池耗時:

深入 Java 執行緒池:從設計思想到原始碼解讀

總得來說,合理的使用執行緒池可以帶來以下幾個好處:

降低資源消耗。透過重複利用已建立的執行緒,降低執行緒建立和銷燬造成的消耗。

提高響應速度。當任務到達時,任務可以不需要等到執行緒建立就能立即執行。

增加執行緒的可管理性。執行緒是稀缺資源,使用執行緒池可以進行統一分配,調優和監控。

執行緒池設計思路

我們先了解執行緒池的思路,哪怕你重來沒了解過什麼是執行緒池,所以不會一上來就給你講一堆執行緒池的引數。我嘗試多種想法來解釋它的設計思路,但都過於官方,但在查詢資料的時候在部落格上看到了非常通俗易懂的描述,它是這樣描述的,先假想一個工廠的生產流程:

深入 Java 執行緒池:從設計思想到原始碼解讀

工廠中有固定的一批工人,稱為正式工人,工廠接收的訂單由這些工人去完成。當訂單增加,正式工人已經忙不過來了,工廠會將生產原料暫時堆積在倉庫中,等有空閒的工人時再處理(因為工人空閒了也不會主動處理倉庫中的生產任務,所以需要排程員實時排程)。倉庫堆積滿了後,訂單還在增加怎麼辦?工廠只能臨時擴招一批工人來應對生產高峰,而這批工人高峰結束後是要清退的,所以稱為臨時工。當時臨時工也以招滿後(受限於工位限制,臨時工數量有上限),後面的訂單隻能忍痛拒絕了。

和執行緒池的對映如下:

工廠——執行緒池

訂單——任務(Runnable)

正式工人——核心執行緒

臨時工——普通執行緒

倉庫——任務佇列

排程員——getTask()

getTask()是一個方法,將任務佇列中的任務排程給空閒執行緒,原始碼分析再去了解。

對映後,形成執行緒池流程圖如下:

深入 Java 執行緒池:從設計思想到原始碼解讀

執行緒池的工作機制

瞭解了執行緒池設計思路,我們可以總結一下執行緒池的工作機制:

線上程池的程式設計模式下,任務是提交給整個執行緒池,而不是直接提交給某個執行緒,執行緒池在拿到任務後,

在內部尋找是否有空閒的執行緒

,如果有,則將任務交給某個空閒的執行緒。如果不存在空閒執行緒,即執行緒池中的執行緒數大於核心執行緒

corePoolSize

,則將任務新增到任務佇列中

workQueue

,如果任務佇列有界且滿了之後則會判斷執行緒池中的執行緒數是否大於最大執行緒數

maximumPoolSize

,如果小於則會建立新的執行緒來執行任務,否則在沒有空閒執行緒的情況下就會執行決絕策略

handler

深入 Java 執行緒池:從設計思想到原始碼解讀

注意:執行緒池中剛開始沒有執行緒,當一個任務提交給執行緒池後,執行緒池會建立一個新執行緒來執行任務。一個執行緒同時只能執行一個任務,但可以同時向一個執行緒池提交多個任務。

執行緒池的引數及使用

執行緒池的真正實現類是

ThreadPoolExecutor

,類的整合關係如下:

深入 Java 執行緒池:從設計思想到原始碼解讀

ThreadPoolExecutor的構造方法有幾個,掌握最主要的即可,其中包含 7 個引數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

複製程式碼

corePoolSize(必需),執行緒池中的核心執行緒數。

當提交一個任務時,執行緒池建立一個新執行緒執行任務,直到當前執行緒數等於 corePoolSize。

如果當前執行緒數小於 corePoolSize,此時存在

空閒執行緒

,提交的任務會建立一個新執行緒來執行該任務。

如果當前執行緒數等於 corePoolSize,則繼續提交的任務被儲存到阻塞佇列中,等待被執行。

如果執行了執行緒池

prestartAllCoreThreads()

方法,執行緒池會提前建立並啟動所有核心執行緒。

maximumPoolSize(必需),執行緒池中允許的最大執行緒數。

當佇列滿了,且

已建立的執行緒數小於 maximumPoolSize

,則執行緒池會建立新的執行緒來執行任務。另外,對於無界佇列,可忽略該引數。

keepAliveTime(必需),執行緒存活保持時間。

當執行緒沒有任務執行時,繼續存活的時間。預設情況下,該引數只在執行緒數大於 corePoolSize 時才有用,即當非核心執行緒處於空閒狀態的時間超過這個時間後,該執行緒將被回收。將

allowCoreThreadTimeOut

引數設定為

true

後,核心執行緒也會被回收。

unit(必需),keepAliveTime 的時間單位。

workQueue(必需),任務佇列。

用於儲存等待執行的任務的阻塞佇列。workQueue 必須是 BlockingQueue 阻塞佇列。當執行緒池中的執行緒數超過它的 corePoolSize 的時候,執行緒會進入阻塞佇列進行阻塞等待。

一般來說,我們應該儘量使用有界佇列,因為使用無界佇列作為工作佇列會對執行緒池帶來如下影響。

當執行緒池中的執行緒數達到 corePoolSize 後,新任務將在無界佇列中等待,因此執行緒池中的執行緒數不會超過 corePoolSize。

由於 1,使用無界佇列時 maximumPoolSize 將是一個無效引數。

由於 1 和 2,使用無界佇列時 keepAliveTime 將是一個無效引數。

更重要的,使用無界 queue 可能會耗盡系統資源,有界佇列則有助於防止資源耗盡,同時即使使用有界佇列,也要儘量控制佇列的大小在一個合適的範圍。一般使用,

ArrayBlockingQueue

LinkedBlockingQueue

SynchronousQueue

PriorityBlockingQueue

等。

threadFactory(可選),建立執行緒的工廠。

透過自定義的執行緒工廠可以給每個新建的執行緒設定一個具有識別度的

執行緒名

,threadFactory 建立的執行緒也是採用

new Thread()

方式,threadFactory 建立的執行緒名都具有統一的風格:

pool-m-thread-n

(m 為執行緒池的編號,n 為執行緒池內的執行緒編號)。

handler(可選),執行緒飽和策略。

當阻塞佇列滿了,且沒有空閒的工作執行緒,如果繼續提交任務,必須採取一種策略處理該任務,執行緒池提供了 四種策略:

AbortPolicy,直接丟擲異常,預設策略。

CallerRunsPolicy,用呼叫者所在的執行緒來執行任務。

DiscardOldestPolicy,丟棄阻塞佇列中靠最前的任務,並執行當前任務。

DiscardPolicy,直接丟棄任務。

當然也可以根據應用場景實現

RejectedExecutionHandler

介面,自定義飽和策略,如記錄日誌或持久化儲存不能處理的任務。

執行緒池的狀態

ThreadPoolExecutor 使用 int 的高 3 位來表示執行緒池狀態,低 29 位表示執行緒數量:

原始碼如下:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer。SIZE - 3;//29private static final int CAPACITY = (1 << COUNT_BITS) - 1;//約5億// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;

至於為什麼這麼設計,我覺得主要原因是為了避免額外的開銷,如果使用 2 個變數來分別表示狀態和執行緒數量,為了保證原子性必須進行額外的加鎖操作,而 ctl 則透過原子類就解決了該問題,在透過位運算就能得到狀態和執行緒數量。

提交任務

可以使用兩個方法向執行緒池提交任務,分別為

execute()

submit()

方法。

execute(),用於提交不需要返回值的任務,所以無法判斷任務是否被執行緒池執行成功。

submit(),用於提交需要返回值的任務。執行緒池會返回一個 future 型別的物件,透過這個 future 物件可以判斷任務是否執行成功,並且可以透過 future 的

get()

方法來獲取返回值,

get()

方法會阻塞當前執行緒直到任務完成,而使用

get(long timeout,TimeUnit unit)

方法則會阻塞當前執行緒一段時間後立即返回,這 時候有可能任務沒有執行完。

此外,

ExecutorService

還提供了兩個提交任務的方法,

invokeAny()

invokeAll()

invokeAny(),提交所有任務,哪個任務先成功執行完畢,返回此任務執行結果,其它任務取消。

invokeAll(),提交所有的任務且必須全部執行完成。

corePoolSize 和 maximumPoolSize

測試核心執行緒數為 1 ,最大執行緒數為 2,任務佇列為 1。

@Slf4j(topic = “ayue”)public class ThreadExecutorPoolTest1 { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit。SECONDS, new ArrayBlockingQueue<>(1)); for (int i = 1; i < 4; i++) { //執行任務 executor。execute(new MyTask(i)); } } //任務 static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this。taskNum = num; } @Override public void run() { log。debug(“執行緒名稱:{},正在執行task:{}”, Thread。currentThread()。getName(), taskNum); try { //模擬其他操作 Thread。currentThread()。sleep(1000); } catch (InterruptedException e) { e。printStackTrace(); } log。debug(“task{}執行完畢”, taskNum); } }}

輸出:

11:07:04。377 [pool-1-thread-2] DEBUG ayue - 執行緒名稱:pool-1-thread-2,正在執行task:311:07:04。377 [pool-1-thread-1] DEBUG ayue - 執行緒名稱:pool-1-thread-1,正在執行task:111:07:05。384 [pool-1-thread-2] DEBUG ayue - task3執行完畢11:07:05。384 [pool-1-thread-1] DEBUG ayue - task1執行完畢11:07:05。384 [pool-1-thread-2] DEBUG ayue - 執行緒名稱:pool-1-thread-2,正在執行task:211:07:06。397 [pool-1-thread-2] DEBUG ayue - task2執行完畢

當有 3 個執行緒透過執行緒池執行任務時,由於核心執行緒只有一個,且任務佇列為 1,所以當第 3 個執行緒到來的時候, 會重新開啟一個新的執行緒

pool-1-thread-2

來執行任務。

當然,這裡可能有人問核心執行緒會不會大於最大執行緒?當然不會,如果

corePoolSize > maximumPoolSize

,則程式啟動會直接報錯。

任務佇列

任務佇列是基於阻塞佇列實現的,即採用生產者消費者模式,在 Java 中需要實現

BlockingQueue

介面。但 Java 已經為我們提供了 7 種阻塞佇列的實現:

ArrayBlockingQueue:一個由陣列結構組成的有界阻塞佇列。

LinkedBlockingQueue: 一個由連結串列結構組成的有界阻塞佇列,在未指明容量時,容量預設為

Integer。MAX_VALUE

PriorityBlockingQueue: 一個支援優先順序排序的無界阻塞佇列,對元素沒有要求,可以實現 Comparable 介面也可以提供 Comparator 來對佇列中的元素進行比較。跟時間沒有任何關係,僅僅是

按照優先順序取任務

DelayQueue:類似於 PriorityBlockingQueue,是二叉堆實現的無界優先順序阻塞佇列。要求元素都實現 Delayed 介面,透過執行時延從佇列中提取任務,時間沒到任務取不出來。

SynchronousQueue: 一個不儲存元素的阻塞佇列,消費者執行緒呼叫

take()

方法的時候就會發生阻塞,直到有一個生產者執行緒生產了一個元素,消費者執行緒就可以拿到這個元素並返回;生產者執行緒呼叫

put()

方法的時候也會發生阻塞,直到有一個消費者執行緒消費了一個元素,生產者才會返回。

LinkedBlockingDeque: 使用雙向佇列實現的有界雙端阻塞佇列。雙端意味著可以像普通佇列一樣 FIFO(先進先出),也可以像棧一樣 FILO(先進後出)。

LinkedTransferQueue: 它是 ConcurrentLinkedQueue、LinkedBlockingQueue 和 SynchronousQueue 的結合體,但是把它用在 ThreadPoolExecutor 中,和 LinkedBlockingQueue 行為一致,但是是無界的阻塞佇列。

執行緒工廠

執行緒工廠預設建立的執行緒名:

pool-m-thread-n

,在

Executors。defaultThreadFactory()

可以看到:

static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System。getSecurityManager(); group = (s != null) ? s。getThreadGroup() : Thread。currentThread()。getThreadGroup(); namePrefix = “pool-” + poolNumber。getAndIncrement() + “-thread-”; } public Thread newThread(Runnable r) { //執行緒名:namePrefix + threadNumber。getAndIncrement() Thread t = new Thread(group, r, namePrefix + threadNumber。getAndIncrement(),0); if (t。isDaemon()) t。setDaemon(false); if (t。getPriority() != Thread。NORM_PRIORITY) t。setPriority(Thread。NORM_PRIORITY); return t; }}

我們也可以透過

ThreadPoolExecutor

自定義執行緒名:

@Slf4j(topic = “ayue”)public class ThreadExecutorPoolTest1 { public static void main(String[] args) { //自增執行緒id AtomicInteger threadNumber = new AtomicInteger(1); ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 2, 3, TimeUnit。SECONDS, new ArrayBlockingQueue<>(1), new ThreadFactory() { @Override public Thread newThread(Runnable r) { return new Thread(r, “javatv-” + threadNumber。getAndIncrement()); } }); for (int i = 1; i < 4; i++) { executor。execute(new MyTask(i)); } } static class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this。taskNum = num; } @Override public void run() { log。debug(“執行緒名稱:{},正在執行task:{}”, Thread。currentThread()。getName(), taskNum); try { //模擬其他操作 Thread。currentThread()。sleep(1000); } catch (InterruptedException e) { e。printStackTrace(); } log。debug(“task{}執行完畢”, taskNum); } }}

輸出:

14:08:07。166 [javatv-1] DEBUG ayue - 執行緒名稱:javatv-1,正在執行task:114:08:07。166 [javatv-2] DEBUG ayue - 執行緒名稱:javatv-2,正在執行task:314:08:08。170 [javatv-1] DEBUG ayue - task1執行完畢14:08:08。170 [javatv-2] DEBUG ayue - task3執行完畢14:08:08。170 [javatv-1] DEBUG ayue - 執行緒名稱:javatv-1,正在執行task:214:08:09。172 [javatv-1] DEBUG ayue - task2執行完畢

拒絕策略

執行緒池提供了 四種策略:

AbortPolicy,直接丟擲異常,預設策略。

CallerRunsPolicy,用呼叫者所在的執行緒來執行任務。

DiscardOldestPolicy,丟棄阻塞佇列中靠最前的任務,並執行當前任務。

DiscardPolicy,直接丟棄任務。、

把上面程式碼的迴圈次數改為 4 次,則會丟擲

java。util。concurrent。RejectedExecutionException

異常。

for (int i = 1; i < 5; i++) { executor。execute(new MyTask(i));}

關閉執行緒池

可以透過呼叫執行緒池的

shutdown

shutdownNow

方法來關閉執行緒池。它們的原理是遍歷執行緒池中的工作執行緒,然後逐個呼叫執行緒的

interrupt

方法來中斷執行緒,所以無法響應中斷的任務可能永遠無法終止。但是它們存在一定的區別,

shutdownNow

首先將執行緒池的狀態設定成

STOP

,然後嘗試停止所有的正在執行或暫停任務的執行緒,並返回等待執行任務的列表,而

shutdown

只是將執行緒池的狀態設定成

SHUTDOWN

狀態,然後中斷所有沒有正在執行任務的執行緒。 簡單來說:

shutdown():執行緒池狀態變為 SHUTDOWN,不會接收新任務,但已提交任務會執行完,不會阻塞呼叫執行緒的執行 。

shutdownNow():執行緒池狀態變為 STOP,會接收新任務,會將佇列中的任務返回,並用 interrupt 的方式中斷正在執行的任務。

只要呼叫了這兩個關閉方法中的任意一個,

isShutdown

方法就會返回 true。當所有的任務都已關閉後,才表示執行緒池關閉成功,這時呼叫

isTerminaed

方法會返回 true。至於應該呼叫哪一種方法來關閉執行緒池,應該由提交到執行緒池的任務特性決定,通常呼叫

shutdown

方法來關閉執行緒池,如果任務不一定要執行完,則可以呼叫

shutdownNow

方法。

Executors 靜態工廠

Executors,提供了一系列靜態工廠方法用於建立各種型別的執行緒池,基於 ThreadPoolExecutor。

FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue()); }

特點:核心執行緒數等於最大執行緒數,因此也無需超時時間,執行完立即回收,阻塞佇列是無界的,可以放任意數量的任務。

場景:適用於任務量已知,相對耗時的任務。

newCachedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue()); }

可根據需要建立新執行緒的執行緒池,如果現有執行緒沒有可用的,則建立一個新執行緒並新增到池中,如果有被使用完但是還沒銷燬的執行緒,就複用該執行緒。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。因此,長時間保持空閒的執行緒池不會使用任何資源。這種執行緒池比較靈活, 對於執行很多短期非同步任務的程式而言,這些執行緒池通常可提高程式效能 。

特點:核心執行緒數是 0, 最大執行緒數是

Integer。MAX_VALUE

,全部都是空閒執行緒 60s 後回收。

場景:執行大量、耗時少的任務。

newSingleThreadExecutor

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue())); }

特點:單執行緒執行緒池。希望多個任務排隊執行,執行緒數固定為 1,任務數多於 1 時,會放入無界佇列排隊,任務執行完畢,這唯一的執行緒也不會被釋放。

場景:區別於自己建立一個單執行緒序列執行任務,如果使用

new Thread

任務執行失敗而終止那麼沒有任何補救措施,而執行緒池還會新建一個執行緒,保證池的正常工作。

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

ScheduledThreadPoolExecutor 繼承自 ThreadPoolExecutor。它主要用來在給定的延遲之後執行任務,或者定期執行任務。ScheduledThreadPoolExecuto 的功能與 Timer 類似,但 ScheduledThreadPoolExecutor 功能更強大、更靈活。Timer 對應的是單個後臺執行緒,而 ScheduledThreadPoolExecutor 可以在建構函式中指定多個對應的後臺執行緒數。

特點:核心執行緒數量固定,非核心執行緒數量無限,執行完閒置 10ms 後回收,任務佇列為延時阻塞佇列。

場景:執行定時或週期性的任務。

合理地配置執行緒池

需要針對具體情況而具體處理,不同的任務類別應採用不同規模的執行緒池,任務類別可劃分為 CPU 密集型任務、IO 密集型任務和混合型任務。

CPU 密集型任務:執行緒池中執行緒個數應儘量少,不應大於 CPU 核心數;

IO 密集型任務:由於 IO 操作速度遠低於 CPU 速度,那麼在執行這類任務時,CPU 絕大多數時間處於空閒狀態,那麼執行緒池可以配置儘量多些的執行緒,以提高 CPU 利用率;

混合型任務:可以拆分為 CPU 密集型任務和 IO 密集型任務,當這兩類任務執行時間相差無幾時,透過拆分再執行的吞吐率高於序列執行的吞吐率,但若這兩類任務執行時間有資料級的差距,那麼沒有拆分的意義。

執行緒池的監控

如果在系統中大量使用執行緒池,則有必要對執行緒池進行監控,方便在出現問題時,可以根據執行緒池的使用狀況快速定位問題。利用執行緒池提供的引數進行監控,引數如下:

taskCount:執行緒池需要執行的任務數量。

completedTaskCount:執行緒池在執行過程中已完成的任務數量,小於或等於 taskCount。

largestPoolSize:執行緒池曾經建立過的最大執行緒數量,透過這個資料可以知道執行緒池是否滿過。如等於執行緒池的最大大小,則表示執行緒池曾經滿了。

getPoolSize:執行緒池的執行緒數量。如果執行緒池不銷燬的話,池裡的執行緒不會自動銷燬,所以這個大小隻增不減。

getActiveCount:獲取活動的執行緒數。

透過擴充套件執行緒池進行監控:繼承執行緒池並重寫執行緒池的

beforeExecute()

afterExecute()

terminated()

方法,可以在任務執行前、後和執行緒池關閉前自定義行為。如監控任務的平均執行時間,最大執行時間和最小執行時間等。

原始碼分析

在使用執行緒池的時候,我其實有一些問題也隨之而來,比如執行緒池的執行緒怎麼建立?任務怎麼執行?任務怎麼分配?執行緒執行完後怎麼辦?是存活還是死亡?什麼時候死亡?為什麼要使用阻塞佇列等等問題。帶著這些問題,我們去讀讀原始碼,讀原始碼怎麼入手?透過

ThreadPoolExecutor

execute()

方法。submit 底層也是呼叫了

execute()

execute

public void execute(Runnable command) { //如果沒有任務直接丟擲異常 if (command == null) throw new NullPointerException(); //獲取當前執行緒的狀態+執行緒個數 int c = ctl。get(); /** * workerCountOf,執行緒池當前執行緒數,並判斷是否小於核心執行緒數 */ if (workerCountOf(c) < corePoolSize) {//如果小於 if (addWorker(command, true)) return; c = ctl。get(); } if (isRunning(c) && workQueue。offer(command)) { // 這裡是向任務佇列投放任務成功,對執行緒池的執行中狀態做二次檢查 // 如果執行緒池二次檢查狀態是非執行中狀態,則從任務佇列移除當前的任務呼叫拒絕策略處理(也就是移除前面成功入隊的任務例項) int recheck = ctl。get(); if (! isRunning(recheck) && remove(command)) reject(command); /* 走到下面的else if分支,說明有以下的前提: * 1、待執行的任務已經成功加入任務佇列 * 2、執行緒池可能是RUNNING狀態 * 3、傳入的任務可能從任務佇列中移除失敗(移除失敗的唯一可能就是任務已經被執行了) * * 如果當前工作執行緒數量為0,則建立一個非核心執行緒並且傳入的任務物件為null - 返回 * 也就是建立的非核心執行緒不會馬上執行,而是等待獲取任務佇列的任務去執行 * 如果前工作執行緒數量不為0,原來應該是最後的else分支,但是可以什麼也不做, * 因為任務已經成功入佇列,總會有合適的時機分配其他空閒執行緒去執行它。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } /* 走到這裡說明有以下的前提: * 1、執行緒池中的工作執行緒總數已經大於等於corePoolSize(簡單來說就是核心執行緒已經全部懶建立完畢) * 2、執行緒池可能不是RUNNING狀態 * 3、執行緒池可能是RUNNING狀態同時任務佇列已經滿了 * * 如果向任務佇列投放任務失敗,則會嘗試建立非核心執行緒傳入任務執行 * 建立非核心執行緒失敗,此時需要拒絕執行任務 */ else if (!addWorker(command, false)) reject(command);}

addWorker

第一個 if 判斷執行緒池當前執行緒數是否小於核心執行緒數。

if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl。get();}

如果小於,則進入

addWorker

方法:

private boolean addWorker(Runnable firstTask, boolean core) { retry: //外層迴圈:判斷執行緒池狀態 for (;;) { int c = ctl。get(); //獲取執行緒池狀態 int rs = runStateOf(c); // 檢查執行緒池的狀態是否存活。 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue。isEmpty())) return false; //內層迴圈:執行緒池新增核心執行緒並返回是否新增成功的結果 for (;;) { //執行緒數量 int wc = workerCountOf(c); //執行緒數量超過容量,返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS增加執行緒數量,若成功跳出外層迴圈 if (compareAndIncrementWorkerCount(c)) break retry; //否則失敗,並更新c c = ctl。get(); // Re-read ctl //如果這時的執行緒池狀態發生變化,重新對外層迴圈進行自旋 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //如果CAS成功了,則繼續往下走 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //建立一個Worker,這個Worker實現了Runable,把它看成一個任務單元 w = new Worker(firstTask); //這個Thread就是當前的任務單元Worker,即this final Thread t = w。thread; if (t != null) { //加鎖,因為可能有多個執行緒來呼叫 final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { // 再次檢查執行緒池的狀態,避免在獲取鎖前呼叫shutdown方法 int rs = runStateOf(ctl。get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { //如果t執行緒已經啟動尚未終止,則丟擲異常 if (t。isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //否則,加入執行緒池 workers。add(w); int s = workers。size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock。unlock(); } //加入執行緒池後,啟動該執行緒,上面已經設定為true if (workerAdded) { t。start(); workerStarted = true; } } } finally { //如果執行緒啟動失敗,則呼叫addWorkerFailed,回滾操作 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

Worker

Worker 是 ThreadPoolExecutor 的內部類,繼承了 AQS 並且實現了 Runnable。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ final Thread thread; /** Initial task to run。 Possibly null。 */ Runnable firstTask; //構造方法 Worker(Runnable firstTask) { //在呼叫runWorker前禁止中斷 //當其它執行緒呼叫了執行緒池的 shutdownNow 時候,如果 worker 狀態 >= 0 則會中斷該執行緒 //具體方法在 interruptIfStarted() 中可以看到 setState(-1); // inhibit interrupts until runWorker this。firstTask = firstTask; this。thread = getThreadFactory()。newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } //省略其他程式碼。。。}

可以看到,在 Worker 的構造方法可以知道,其中的 thread 屬性就是透過 this 去建立的,所以執行緒池核心執行緒的建立主要是 run 方法中的

runWorker

方法:

runWorker

runWorker 核心執行緒執行邏輯。

final void runWorker(Worker w) { Thread wt = Thread。currentThread(); Runnable task = w。firstTask; w。firstTask = null; // 呼叫unlock()是為了讓外部可以中斷 w。unlock(); // allow interrupts // 執行緒退出的原因,true是任務導致,false是執行緒正常退出 boolean completedAbruptly = true; try { // 1。 如果firstTask不為null,則執行firstTask // 2。 如果firstTask為null,則呼叫getTask()從佇列獲取任務 // 3。 阻塞佇列的特性就是:當佇列為空時,當前執行緒會被阻塞等待 while (task != null || (task = getTask()) != null) { w。lock(); // 判斷執行緒池的狀態,如果執行緒池正在停止,則對當前執行緒進行中斷操作 if ((runStateAtLeast(ctl。get(), STOP) || (Thread。interrupted() && runStateAtLeast(ctl。get(), STOP))) && !wt。isInterrupted()) wt。interrupt();//中斷 try { //該方法裡面沒有內容,可以自己擴充套件實現,比如上面提到的執行緒池的監控 beforeExecute(wt, task); Throwable thrown = null; try { //執行具體的任務 task。run(); } catch (RuntimeException x) {//執行緒異常後操作 thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //同 beforeExecute() afterExecute(task, thrown); } } finally { task = null;//help gc //統計當前worker完成了多少個任務 w。completedTasks++; //釋放鎖 w。unlock(); } } completedAbruptly = false; } finally { // 處理執行緒退出,completedAbruptly為true說明由於任務異常導致執行緒非正常退出 processWorkerExit(w, completedAbruptly); }}

getTask

而對於其中的

getTask()

方法,任務佇列中的任務排程給空閒執行緒,該方法是非常重要的,為什麼重要?其中就涉及到面試官常問的 執行緒池如何保證核心執行緒不會被銷燬,而空閒執行緒會被銷燬?

private Runnable getTask() { //判斷最新一次的poll是否超時 //poll:取走BlockingQueue裡排在首位的物件 boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl。get(); int rs = runStateOf(c); // Check if queue empty only if necessary。 /** * 條件1:執行緒池狀態SHUTDOWN、STOP、TERMINATED狀態 * 條件2:執行緒池STOP、TERMINATED狀態或workQueue為空 * 條件1與條件2同時為true,則workerCount-1,並且返回null * 注:條件2是考慮到SHUTDOWN狀態的執行緒池不會接受任務,但仍會處理任務(前面也講到了) */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue。isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? /* * 該屬性的作用是判斷當前執行緒是否允許超時: * 1。allowCoreThreadTimeOut * 如果為 false(預設),核心執行緒即使在空閒時也保持活動狀態。 * 如果為 true,則核心執行緒使用 keepAliveTime 超時等待工作。 * 2。wc > corePoolSize * 當前執行緒是否已經超過核心執行緒數量。 */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /* * 判斷當前執行緒是否可以退出: * 1。wc > maximumPoolSize || (timed && timedOut) * wc > maximumPoolSize = true,說明當前的工作執行緒總數大於執行緒池最大執行緒數。 * timed && timedOut = true,說明當前執行緒允許超時並且已經超時。 * 2。wc > 1 || workQueue。isEmpty() * 工作執行緒總數大於1或者任務佇列為空,則透過CAS把執行緒數減去1,同時返回null */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue。isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { /* * 1。poll(long timeout, TimeUnit unit):從BlockingQueue取出一個隊首的物件, * 如果在指定時間內,佇列一旦有資料可取,則立即返回佇列中的資料。否則直到時間超時還沒有資料可取,返回失敗。 * * 2。take():取走BlockingQueue裡排在首位的物件,若BlockingQueue為空,阻斷進入等待狀態直到BlockingQueue有新的資料被加入。 * * * 如果timed為true,透過poll()方法做超時拉取,keepAliveTime時間內沒有等待到有效的任務,則返回null。 * * 如果timed為false,透過take()做阻塞拉取,會阻塞到有下一個有效的任務時候再返回(一般不會是null)。 */ Runnable r = timed ? workQueue。poll(keepAliveTime, TimeUnit。NANOSECONDS) : workQueue。take(); if (r != null) return r; //透過poll()方法從任務佇列中拉取任務為null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }}

① 對於

getTask()

下面的這段程式碼,這段邏輯大多數情況下是針對非核心執行緒:

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue。isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue;}

② 我們這樣來閱讀這段程式碼,當工作執行緒數大於核心執行緒

corePoolSize

,此時進入

execute()

方法中的第二個 if 語句:

if (isRunning(c) && workQueue。offer(command)) { int recheck = ctl。get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);}

此時執行緒池總數已經超過了

corePoolSize

但小於

maximumPoolSize

,當任務佇列已經滿了的時候,會透過

addWorker(task,false)

新增非核心執行緒。

而在高併發的情況下,肯定會產生多餘的執行緒,也就是出現 ① 中的情況

wc > maximumPoolSize

,而這些多餘的執行緒怎麼辦,是不是會被回收?如果

workQueue。poll

沒有獲取到有效的任務,那麼①中的邏輯剛好與

addWorker(task,false)

相反,透過 CAS 減少非核心執行緒,使得工作執行緒總數趨向於

corePoolSize

如果對於非核心執行緒,上一輪迴圈獲取任務物件為

null

,在預設情況下

allowCoreThreadTimeOut = false

,因此,

getTask()

timed = true

,如果沒有獲取到任務,此時

timedOut = true

,這一輪迴圈很容易滿足

timed && timedOut

為 true,這個時候

getTask()

返回 null 會導致

Worker#runWorker()

方法跳出死迴圈,之後執行

processWorkerExit()

方法處理後續工作,而該非核心執行緒對應的

Worker

則變成

遊離物件

,等待被 JVM 回收。

allowCoreThreadTimeOut

設定為 true 的時候,這裡分析的非核心執行緒的生命週期終結邏輯同時會適用於核心執行緒。

由此推出一個面試題:

執行緒池有多個執行緒同時沒取到任務,會全部回收嗎?

舉個例子:執行緒池核心執行緒數是 5,最大執行緒數為 5,當前工作執行緒數為 6(6>5,意味著當前可以觸發執行緒回收),如果此時有 3 個執行緒同時超時沒有獲取到任務,這 3 個執行緒會都被回收銷燬嗎?

思路:這道題的核心點在於有多個執行緒同時超時獲取不到任務。正常情況下,此時會觸發執行緒回收的流程。但是我們知道,正常不設定 allowCoreThreadTimeOut 變數時,執行緒池即使沒有任務處理,也會保持核心執行緒數的執行緒。如果這邊 3 個執行緒被全部回收,那此時執行緒數就變成了 3 個,不符合核心執行緒數 5 個,所以這邊我們可以首先得出答案:不會被全部回收。這個時候面試官肯定會問為什麼?

根據答案不難推測,為了防止本題的這種併發回收問題的出現,執行緒回收的流程必然會有併發控制。compareAndDecrementWorkerCount(c) 用的是 CAS 方法,如果 CAS 失敗就 continue,進入下一輪迴圈,重新判斷。

像上述例子,其中一條執行緒會 CAS 失敗,然後重新進入迴圈,發現工作執行緒數已經只有 5 了,

timed = false

, 這條執行緒就不會被銷燬,可以一直阻塞了,此時就會呼叫

workQueue。take()

阻塞等待下一次的任務,也就是說核心執行緒並不會死亡。

從這裡也可以看出,雖然有核心執行緒數,但執行緒並沒有區分是核心還是非核心,並不是先建立的就是核心,超過核心執行緒數後建立的就是非核心,最終保留哪些執行緒,完全隨機。

然後可以回答出前面的問題,執行緒池如何保證核心執行緒不會被銷燬,而空閒執行緒會被銷燬?

核心執行緒是因為呼叫了阻塞方法而不會被銷燬,空閒執行緒呼叫了超時方法在下次執行時獲取不到任務而死亡。

這樣回答其實是可以的,但是這可能顯示出你是背得八股文,所以你應該回答核心執行緒不僅僅是因為呼叫了阻塞方法而不會被銷燬,同時利用了 CAS 來保證。

還可以得出

getTask()

返回 null 的情況

執行緒池的狀態已經為 STOP,TIDYING, TERMINATED,或者是 SHUTDOWN 且工作佇列為空。

工作執行緒數大於最大執行緒數或當前工作執行緒已超時,且,其存在工作執行緒或任務佇列為空。

runWorker 的流程:

深入 Java 執行緒池:從設計思想到原始碼解讀

processWorkerExit

在 runWorker 的 finally 塊中,當任務執行之後,要對其做處理,作執行緒在執行完

processWorkerExit()

方法才算真正的終結,該方法如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) { // 因為丟擲使用者異常導致執行緒終結,直接使工作執行緒數減1即可 // 如果沒有任何異常丟擲的情況下是透過getTask()返回null引導執行緒正常跳出runWorker()方法的while死迴圈從而正常終結,這種情況下,在getTask()中已經把執行緒數減1 if (completedAbruptly) // If abrupt, then workerCount wasn‘t adjusted decrementWorkerCount(); final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { // 全域性的已完成任務記錄數加上此將要終結的Worker中的已完成任務數 completedTaskCount += w。completedTasks; // 工作執行緒集合中移除此將要終結的Worker workers。remove(w); } finally { mainLock。unlock(); } // 見下一小節分析,用於根據當前執行緒池的狀態判斷是否需要進行執行緒池terminate處理 tryTerminate(); int c = ctl。get(); // 如果執行緒池的狀態小於STOP,也就是處於RUNNING或者SHUTDOWN狀態的前提下: // 1。如果執行緒不是由於丟擲使用者異常終結,如果允許核心執行緒超時,則保持執行緒池中至少存在一個工作執行緒 // 2。如果執行緒由於丟擲使用者異常終結,或者當前工作執行緒數,那麼直接新增一個新的非核心執行緒 if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // 如果允許核心執行緒超時,最小值為0,否則為corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果最小值為0,同時任務佇列不空,則更新最小值為1 if (min == 0 && ! workQueue。isEmpty()) min = 1; // 工作執行緒數大於等於最小值,直接返回不新增非核心執行緒 if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}

程式碼的後面部分割槽域,會判斷執行緒池的狀態,如果執行緒池是

RUNNING

或者

SHUTDOWN

狀態的前提下,如果當前的工作執行緒由於丟擲異常被終結,那麼會新建立一個非核心執行緒。如果當前的工作執行緒並不是丟擲使用者異常被終結(正常情況下的終結),那麼會這樣處理:

allowCoreThreadTimeOut

為 true,也就是允許核心執行緒超時的前提下,如果任務佇列空,則會透過建立一個非核心執行緒保持執行緒池中至少有一個工作執行緒。

allowCoreThreadTimeOut

為 false,如果工作執行緒總數大於

corePoolSize

則直接返回,否則建立一個非核心執行緒,也就是會趨向於保持執行緒池中的工作執行緒數量趨向於

corePoolSize

processWorkerExit()

執行完畢之後,意味著該工作執行緒的生命週期已經完結。