最強執行緒池解析,執行緒池的這些細節你真的會了嗎

本文整體整體結構

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

ThreadPoolExecutor介紹

首先我們要思考一下為什麼要使用執行緒池。Java提供了多執行緒機制讓我們能夠同時執行多個任務,就像多個任務由多個人同時執行,而不是一個人依次執行這些任務。但是如果我們每次執行任務都建立一個執行緒,導致的問題有

每次建立銷燬執行緒都有一定的開銷

執行緒數量不好控制,過多的執行緒會導致記憶體佔用過多,也可能超過作業系統的限制導致異常

因此Java提供了預設的執行緒池,幫助大家解決這些問題,透過ThreadPoolExecutor,我們可以實現多種執行緒建立回收策略,以適應不同的需求場景。

執行緒池可以使用的場景有

批次處理,例如我們進行一個分表掃描任務,各個任務之間也沒有有依賴,則我們可以把每個分表的任務提交到執行緒池中,提高整體任務的執行速度。

最佳化介面耗時,假如我們的介面中會進行3個互相獨立的耗時的IO操作,則我們可以把這三個IO操作提交到執行緒池中,再等待這三個操作完成,從序列到並行,可以減少介面的耗時。

程序內非同步解耦,例如註冊流程中,使用者資訊寫完資料庫後會給使用者傳送一個郵件再返回,把傳送郵件放到執行緒池中執行,可以減少註冊介面耗時,還可以避免傳送郵件介面失敗影響註冊介面。當然這個也可以用kafka實現類似功能。

ThreadPoolExecutor引數介紹、使用示例

要正確使用執行緒池,我們需要理解其中的幾個重要引數,透過ThreadPoolExecutor的建構函式可以看到引數如下

(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

corePoolSize: 執行緒池核心執行緒數,後面執行緒的建立回收機制中會詳細介紹

maximumPoolSize: 執行緒池最大執行緒,後面執行緒的建立回收機制中會詳細介紹數

keepAliveTime: 執行緒可回收時,空閒時間超過這個時間會回收執行緒。預設情況下,非核心執行緒會進行回收,也可以透過allowCoreThreadTimeOut來控制讓核心執行緒也能夠被回收。

workQueue: 任務佇列,需要是BlockingQueue,常見的有SynchronousQueue(無佇列直接遞交的佇列),ArrayBlockingQueue, LinkedBlockingQueue(有界無界兩種),DelayQueue(延遲佇列), PriorityBlockingQueue(優先順序佇列)等

threadFactory: 建立執行緒的工廠,可以控制執行緒名稱、priority等

handler: 拒絕策略,當佇列已滿,並且執行緒數量已經達到maximumPoolSize時,再提交的任務會交給RejectedExecutionHandler來處理,常見的拒絕策略有AbortPolicy, CallerRunsPoicy, DiscardPolicy, DiscardOldestPolicy

瞭解了這些引數之後我們就可以建立一個執行緒池並使用了,透過下面的註釋先對執行緒池的使用和機制有一個初步認識,後面會進行詳細分析。

// 透過建構函式建立一個核心執行緒數為1,最大執行緒數為4,keepAliveTime為1分鐘,任務佇列是容量為10的陣列阻塞佇列// 拒絕策略是CallerRunsPolicy,即會由呼叫執行緒來執行任務ExecutorService executorService = new ThreadPoolExecutor(1, 4, 1, TimeUnit。MINUTES, new ArrayBlockingQueue<>(10), new ThreadPoolExecutor。CallerRunsPolicy());// 透過submit介面提交一個Callable任務,返回一個字串,這個任務會先sleep 1秒Future task1Result = executorService。submit(() -> { Thread。sleep(1000); return “hello”;});// 再透過submit介面提交一個Callable任務,返回一個字串,這個任務會先sleep 1秒Future task2Result = executorService。submit(() -> { Thread。sleep(1000); return “hello”;});// 透過Future。get獲取第一個任務的結果System。out。println(task1Result。get());// 透過Future。get獲取第二個任務的結果System。out。println(task2Result。get());// 執行緒池使用完了,我們需要關閉,不然JVM不會退出,因為JVM退出的條件是當前沒有非daemon狀態的執行緒了// 呼叫完shutdown之後再提交的執行緒會被reject,由拒絕策略處理。執行緒池會繼續處理執行任務佇列中的任務executorService。shutdown();// 等待執行緒池結束if (!executorService。awaitTermination(1, TimeUnit。MINUTES)) { // 如果執行時間內還沒結束,呼叫shutdownNow情況任務佇列 executorService。shutdownNow(); // 再等一分鐘 executorService。awaitTermination(1, TimeUnit。MINUTES);}

ThreadPoolExecutor實現機制分析

瞭解了ThreadPoolExecutor的使用之後,我們透過一張圖看一下執行緒池的內部大體架構

透過上圖可以看到ThreadPoolExecutor比較重要的元件是workerPool(工作執行緒池)、workQueue(任務佇列)、rejectionExecutionHandler(拒絕策略)。

workerPool中包含了Worker物件,每個Worker物件中有一個執行緒負責執行提交的任務,並且不斷到workQueue中獲取新任務來執行。

workQueue是存放緩衝任務的佇列,當corePool也就是核心執行緒滿了之後,會優先把任務放到workQueue中,workQueue滿了則會嘗試新增非核心執行緒

當非核心執行緒也滿了,或者線上程池SHUTDOWN關閉後仍然提交的任務,會透過拒絕策略來執行。

整個ThreadPoolExecutor比較重要的部分有

任務接收、執行流程

worker建立回收機制

關閉流程

執行緒池狀態

先看一下執行緒池的狀態,執行緒池一共有5個狀態 每個狀態的描述為

RUNNING: 執行緒池建立後正常執行

SHUTDOWN: 呼叫shutdown後變成SHUTDOWN狀態,拒絕新的execute提交任務,佇列和Worker中已有的任務會繼續執行

STOP: 呼叫shutdownNow方法後進入STOP狀態,會清空任務佇列,並中斷Worker

TIDYING: SHUTDOWN階段完成或STOP階段完成

TERMINATED: 呼叫完terminated hook方法後,執行緒池已經完成關閉

狀態機如下

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

任務提交處理流程(execute方法邏輯)

首先看一下ThreadPoolExecutor的execute的執行邏輯,流程圖如下 核心邏輯為

如果worker數量小於核心執行緒數,則優先建立一個核心執行緒來處理該任務

如果worker數量大於等於核心執行緒數,則嘗試入隊,如果入隊失敗,則建立非核心執行緒來處理該任務

如果執行緒池處於關閉中狀態或入隊失敗,則使用rejectHandler拒絕策略來處理該任務

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

更多細節可以透過程式碼中的註釋檢視。

public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // 先讀取control state int c = ctl。get(); // 如果當前的worker數量比corePoolSize核心執行緒數少 if (workerCountOf(c) < corePoolSize) { // 則嘗試新增一個core worker,並且傳入command作為firstTask執行 if (addWorker(command, true)) // 新增成功,則直接返回,沒有新增成功,說明可能其他execute執行緒觸發了addWorker並爭搶成功或者 return; // 再重新判斷下狀態,這段時間內,執行緒池狀態可能出現變化 c = ctl。get(); } // 這時候說明workerCount已經大於等於corePoolSize了,則需要新增到workQueue中,如果新增不了 // 則需要嘗試增加非核心執行緒worker if (isRunning(c) && workQueue。offer(command)) { // 再檢查下執行緒池是不是關閉了 int recheck = ctl。get(); // 如果已經在關閉,則重workQueue裡刪掉,並呼叫reject拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果現線上程池的worker數量為0了,說明核心執行緒回收了,則新增一個worker來執行,避免出現任務沒有worker執行的情況 else if (workerCountOf(recheck) == 0) // 新增一個worker,core引數為false addWorker(null, false); } // 如果執行緒池關閉或佇列已滿,都會走到這裡 // 1。 關閉的情況在addWorker的時候會失敗,交由rejectHandler處理 // 2。 如果佇列已滿,則會嘗試新增非核心執行緒worker,新增失敗交由rejectHandler處理 else if (!addWorker(command, false)) reject(command);}

Worker類

addWorker方法負責建立Worker物件。 首先看一下Worker類的內容。 Worker類需要區分當前是在等待獲取任務還是在執行任務中,Worker透過一個不可重入的鎖來實現的,先獲取到鎖才能執行任務。

這是為了把等待任務和執行任務的interrupt區分開。

為了防止worker處理的task中呼叫corePoolSize的時候會加鎖後去interrupt各個worker,如果能重入,則也會把自己的執行緒中斷狀態設定成interrupted導致執行中的任務後面可能被中斷。 只有在執行緒池處於STOP之後的狀態,才能夠interrupt在執行任務中的worker執行緒。

看一下Worker類的定義

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

private final class Worker extends AbstractQueuedSynchronizer implements Runnable{ // 負責執行任務的執行緒,如果ThreadFactory失敗則會是null final Thread thread; // 透過execute方法建立時,可能會傳入初始的任務 Runnable firstTask; Worker(Runnable firstTask) { // 防止被其他執行緒設定interrupt狀態影響任務執行 setState(-1); // inhibit interrupts until runWorker this。firstTask = firstTask; this。thread = getThreadFactory()。newThread(this); } // worker的執行邏輯,也是執行緒start之後呼叫的run方法 public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } // 實現AQS的tryAcquire來實現加鎖功能。把state 從0cas到1說明加鎖成功 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread。currentThread()); return true; } return false; } // 實現AQS的tryRelease來實現釋放鎖功能,釋放鎖實現為把鎖狀態改為0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 加鎖方法 public void lock() { acquire(1); } // 嘗試加鎖方法 public boolean tryLock() { return tryAcquire(1); } // 釋放鎖方法 public void unlock() { release(1); } // 判斷是否在加鎖中 public boolean isLocked() { return isHeldExclusively(); } // 這個是給shutdownNow方法用的,可以在不獲取鎖的情況下interrupt執行緒 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t。isInterrupted()) { try { t。interrupt(); } catch (SecurityException ignore) { } } }}

addWorker方法的流程

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

private boolean addWorker(Runnable firstTask, boolean core) { retry: // 不斷迴圈重試 for (int c = ctl。get();;) { // SHUTDOWN狀態時,如果任務佇列已經為空了,則不需要新增worker,並且也不能建立Worker執行firstTask // 如果是STOP狀態,則肯定返回false,不建立Worker if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || firstTask != null || workQueue。isEmpty())) return false; // 下面這個cas重試搶佔新增worker的機會,區分建立核心還是非核心執行緒 for (;;) { if (workerCountOf(c) >= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK)) // 已經超過對應的執行緒的數量,直接返回 return false; // cas失敗的重試 if (compareAndIncrementWorkerCount(c)) break retry; // 如果SHUTDOWN,退出到外層的迴圈重試 c = ctl。get(); // Re-read ctl if (runStateAtLeast(c, SHUTDOWN)) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 建立Worker物件 w = new Worker(firstTask); final Thread t = w。thread; if (t != null) { // 加鎖,works這個HashSet透過mainLock加鎖實現執行緒安全 final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { // Recheck while holding lock。 // Back out on ThreadFactory failure or if // shut down before lock acquired。 int c = ctl。get(); // 再次check執行緒池狀態 if (isRunning(c) || (runStateLessThan(c, STOP) && firstTask == null)) { if (t。getState() != Thread。State。NEW) throw new IllegalThreadStateException(); workers。add(w); workerAdded = true; int s = workers。size(); if (s > largestPoolSize) largestPoolSize = s; } } finally { // 釋放鎖 mainLock。unlock(); } if (workerAdded) { // 啟動worker中的執行緒,開始執行run方法,也就是runWorker t。start(); workerStarted = true; } } } finally { // 如果啟動失敗,比如出現OOM,回滾workerCount等 if (! workerStarted) addWorkerFailed(w); } return workerStarted;}

再看一下runWorker方法,裡面包含的就是worker的任務獲取、執行邏輯。

最強執行緒池解析,執行緒池的這些細節你真的會了嗎

final void runWorker(Worker w) { Thread wt = Thread。currentThread(); Runnable task = w。firstTask; w。firstTask = null; // 為什麼在這之前不能interrupt? w。unlock(); // allow interrupts boolean completedAbruptly = true; try { // 如果有firstTask,直接執行firstTask,否則透過getTask從任務佇列中阻塞等待獲取新任務,如果從佇列中獲取的是null說明被interrupt了,worker需要退出 while (task != null || (task = getTask()) != null) { // 執行任務之前,先加鎖 w。lock(); // 如果現在在STOP狀態,則任務需要interrupt // 如果不是,則可能是因為調整引數導致的interrupt需要呼叫Thread。interrupted方法清理掉中斷狀態,避免影響任務執行 // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted。 This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl。get(), STOP) || (Thread。interrupted() && runStateAtLeast(ctl。get(), STOP))) && !wt。isInterrupted()) wt。interrupt(); try { // 任務執行前的回撥方法 beforeExecute(wt, task); try { // 傳入的Runnable的run方法被執行。 task。run(); // 任務執行後的回撥方法 afterExecute(task, null); } catch (Throwable ex) { // 任務執行後的回撥方法 afterExecute(task, ex); throw ex; } } finally { task = null; w。completedTasks++; // 執行完當前任務後或任務異常退出後,釋放鎖 w。unlock(); } } // 如果執行到這裡,說明是從while迴圈條件中退出的 completedAbruptly = false; } finally { // 呼叫processWorkerExit,如果是異常退出會導致worker執行緒掛掉,會重新建立一個新的worker代替當前worker processWorkerExit(w, completedAbruptly); }}

getTask實現

getTask方法負責從任務佇列中不斷獲取任務,其中可以看到當執行緒能回收時,會使用keepAliveTime時間進行阻塞佇列poll等待來實現的Worker執行緒超過一定idle空閒時間後回收功能。

private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl。get(); // 如果執行緒池處於SHUTDOWN狀態,並且任務佇列空了,或者處於STOP狀態,則當前worker需要退出,因此會返回null if (runStateAtLeast(c, SHUTDOWN) && (runStateAtLeast(c, STOP) || workQueue。isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // allowCoreThreadTimeOut為true說明核心執行緒也可以回收,否則只回收非核心執行緒 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue。isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從任務阻塞佇列中poll任務,可以回收時加上等待時間,否則無限期等待。 Runnable r = timed ? workQueue。poll(keepAliveTime, TimeUnit。NANOSECONDS) : workQueue。take(); if (r != null) return r; // timedOut = true; } catch (InterruptedException retry) { // 執行緒池關閉或者調整執行緒池配置的時候會被interrupt, // 關閉的情況下次迴圈中會退出,調整配置則不會影響worker下次獲取task timedOut = false; } }}

shutdown實現

已有的task會繼續執行,但是不會接受新的task

public void shutdown() { final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { checkShutdownAccess(); // 把執行緒池狀態改為SHUTDOWN advanceRunState(SHUTDOWN); // 對各個idle worker也就是沒有在執行任務的worker的執行緒呼叫interrupt方法 interruptIdleWorkers(); // shutdown 回撥 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock。unlock(); } tryTerminate();}

shutdownNow方法,和shutdown的區別是會修改狀態為STOP,並且把佇列中的task drain出來

public List shutdownNow() { List tasks; final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { checkShutdownAccess(); // 修改執行緒池狀態為STOP advanceRunState(STOP); // interrupt所有的worker interruptWorkers(); // 情況佇列 tasks = drainQueue(); } finally { mainLock。unlock(); } tryTerminate(); return tasks;}

tryTerminate方法會嘗試關閉執行緒池

final void tryTerminate() { for (;;) { int c = ctl。get(); // 如果是RUNNING狀態,不需要關閉 if (isRunning(c) || // 如果是TIDYING,說明有其他執行緒在terminate,當前執行緒不需要處理,也return runStateAtLeast(c, TIDYING) || // 如果是SHUTDOWN狀態,並且任務佇列中還有任務,還需要等待任務執行完 (runStateLessThan(c, STOP) && ! workQueue。isEmpty())) return; // 走到這裡,說明是如下兩種情況中的一種 // 1。 SHUTDOWN狀態並且佇列已經為空 // 2。 STOP狀態 // 判斷如果還有worker存在,則嘗試interrupt if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } // 走到這裡,說明worker和任務佇列都空了,則需要修改狀態為TIDYING並呼叫terminated回撥方法。 final ReentrantLock mainLock = this。mainLock; mainLock。lock(); try { // cas修改狀態,保證terminated方法不會重複呼叫 if (ctl。compareAndSet(c, ctlOf(TIDYING, 0))) { try { terminated(); } finally { // 修改成TERMINATED狀態 ctl。set(ctlOf(TERMINATED, 0)); // awaitTerminate會await這個condition termination。signalAll(); } return; } } finally { mainLock。unlock(); } // else retry on failed CAS }}

BlockingQueue選擇

佇列型別

特性

ArrayBlockingQueue

基於陣列的阻塞佇列,有界佇列

LinkedBlockingQueue

基於連結串列的阻塞佇列,和ArrayBlockingQueue功能上的區別在於可以建立一個無界佇列,例如Executors。newFixedThreadPool(int)建立的執行緒池的佇列就是無界的,這種情況下可能出現佇列堆積導致OOM的問題

SynchronizedQueue

同步阻塞佇列,這個佇列是一個沒有長度的佇列,可以保證任務最快被處理,減少在佇列中的停留時間

PriorityBlockingQueue

帶有優先順序的阻塞佇列

DelayQueue

延遲佇列,ScheduledThreadPoolExecutor就是使用這個佇列實現定時執行和延遲執行功能的

RejectedExecutionHandler選擇

這裡介紹一下常見的RejectedExecutionHandler

RejectedExecutionHandler

AbortPolicy

拒絕時丟擲RejectedExecutionException異常,這是預設的拒絕策略

DiscardPolicy

會忽略任務,提交時沒有異常

DiscardOldestPolicy

會從任務佇列中移除最早的任務並重試提交當前任務

CallerRunsPolicy

使用提交任務的執行緒也就是呼叫execute方法的執行緒去執行這個任務

當然我們也可以自定義自己的拒絕策略,例如實現一個阻塞提交執行緒的拒絕策略,這個和CallerRunsPolicy一樣都能讓提交者慢下來,但是不會用提交執行緒去執行任務。

class BlockSubmitRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { executor。getQueue()。put(r); } catch (InterruptedException e) { e。printStackTrace(); } }}

動態修改引數

ThreadPoolExecutor還提供了修改corePoolSize和maximumPoolSize等引數的方法,使得我們可以動態調整執行緒池的引數。

public void setCorePoolSize(int corePoolSize) { if (corePoolSize < 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); int delta = corePoolSize - this。corePoolSize; this。corePoolSize = corePoolSize; if (workerCountOf(ctl。get()) > corePoolSize) interruptIdleWorkers(); else if (delta > 0) { // We don‘t really know how many new threads are “needed”。 // As a heuristic, prestart enough new workers (up to new // core size) to handle the current number of tasks in // queue, but stop if queue becomes empty while doing so。 int k = Math。min(delta, workQueue。size()); while (k—— > 0 && addWorker(null, true)) { if (workQueue。isEmpty()) break; } }}public void setMaximumPoolSize(int maximumPoolSize) { if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize) throw new IllegalArgumentException(); this。maximumPoolSize = maximumPoolSize; if (workerCountOf(ctl。get()) > maximumPoolSize) interruptIdleWorkers();}

如何修改佇列長度呢?我們可以實現一個可變長度的阻塞佇列即可,透過在LinkedBlockingQueue基礎上增加一個加鎖修改capacity的佇列比較容易實現,因為LinkedBlockingQueue中capacity只作為一個int欄位儲存沒有像ArrayBlockingQueue那樣會影響陣列長度。所以我們加鎖修改capacity後呼叫notFull。signalAll即可。

原文連結:https://bytejava。cn/java/2021/08/01/threadpoolexecutor。html?hmsr=toutiao。io&utm_campaign=toutiao。io&utm_medium=toutiao。io&utm_source=toutiao。io