作為java開發工作人員必備的高併發程式設計知識體系(二)

7.執行緒互動

7.1互動方式

執行緒互動也就是執行緒直接的通訊,最直接的辦法就是執行緒直接直接通訊傳值,而間接方式則是透過共享變數來達到彼此的互動。

等待:釋放物件鎖,允許其他執行緒進入同步塊

通知:重新獲取物件鎖,繼續執行

中斷:狀態互動,通知其他執行緒進入中斷

織入:合併執行緒,多個執行緒合併為一個

7.2執行緒安全

我們最關注的還是透過共享變數來達到互動的方式。執行緒如果都各自幹活互不搭理的話自然相安無事,但多數情況下執行緒直接需要打交道,而且需要分享共享資源,那麼這個時候最核心的就是執行緒安全了。

什麼是執行緒安全?

當多個執行緒訪問同一個物件時,如果不用考慮這些執行緒在執行時環境下的排程和交替執行,也不需要進行額外的同步,或者在呼叫方進行任何其他的協調操作,呼叫這個物件的行為都可以獲取正確的結果,那這個物件是執行緒安全的。(摘自《深入Java虛擬機器》)

如何保證執行緒安全?

我們最早接觸執行緒安全可能是JDK提供的一些號稱執行緒安全的容器,比如Vetor較ArrayList是執行緒安全,HashTable較HashMap是執行緒安全?其實執行緒安全類並不代表也不等同執行緒安全的程式,而執行緒不安全的類同樣可以完成執行緒安全的程式。我們關注的也就是寫出執行緒安全的程式,那麼如何寫出執行緒安全的程式碼呢?下面列舉了執行緒安全的主要設計技術:

無狀態

這個有點函數語言程式設計的味道,下文併發模式會介紹到,總之就是執行緒只有入參和區域性變數,如果變數是引用的話,確保變數的建立和呼叫生命週期都發生線上程棧內,就可以確保執行緒安全。

無共享狀態

完全要求執行緒無狀態比較難實現,必要的狀態是無法避免的,那麼我們就必須維護不同執行緒之間的不同狀態,這可是個麻煩事。幸好我們有ThreadLocal這個神器,該物件跟當前執行緒繫結,而且只對當前執行緒可見,完美解決了無共享狀態的問題。

不可變狀態

最後實在沒辦法避免狀態共享,線上程之間共享狀態,最怕的就是無法確保能維護好正確的讀寫順序,而且多執行緒確實也無法正確維護好這個共享變數。那麼我們索性粗暴點,把共享的狀態定位不可變,比如價格final修飾一下,這樣就達到安全狀態共享。

訊息傳遞

一個執行緒通常也不是所有步驟都需要共享狀態,而是部分環節才需要的,那麼我們把共享狀態的程式碼拆開,無共享狀態的那部分自然不用關心,而共享狀態的小段程式碼,則透過加入訊息元件來傳遞狀態。這個設計到併發模式的流水線程式設計模式,下文併發模式會重點介紹。

執行緒安全容器

JUC裡面提供大量的併發容器,涉及到執行緒互動的時候,使用安全容器可以避免大部分的錯誤,而且大大降低了程式碼的複雜度。

透過synchronized給方法加上內建鎖來實現執行緒安全的類如Vector,HashTable,StringBuffer

AtomicXXX如AtomicInteger

ConcurrentXXX如ConcurrentHashMap

BlockingQueue/BlockingDeque

CopyOnWriteArrayList/CopyOnWriteArraySet

ThreadPoolExecutor

synchronized同步

該關鍵字確保程式碼塊同一時間只被一個執行緒執行,在這個前提下再設計符合執行緒安全的邏輯

其作用域為

物件:物件加鎖,進入同步程式碼塊之前獲取物件鎖

例項方法:物件加鎖,執行例項方法前獲取物件例項鎖

類方法:類加鎖,執行類方法前獲取類鎖

volatile約束

volatile確保每次操作都能強制同步CPU快取和主存直接的變數。而且在編譯期間能阻止指令重排。讀寫併發情況下volatile也不能確保執行緒安全,上文解析記憶體模型的時候有提到過。

這節我們論述了編寫執行緒安全程式的指導思想,其中我們提到了JDK提供的JUC工具包,下一節將重點介紹併發程式設計常用的趁手工具。

8.執行緒工具

前文我們介紹了記憶體理論和執行緒的一些特徵,大家都知道併發程式設計容易出錯,而且出了錯還不好除錯排查,幸好JDK裡面集成了大量實用的API工具,我們能熟悉這些工具,寫起併發程式來也事半功倍。

工具篇其實就是對鎖的不斷變種,適應更多的開發場景,提高效能,提供更方便的工具,從最粗暴的同步修飾符,到靈活的可重入鎖,到寬鬆的條件,接著到允許多個執行緒訪問的訊號量,最後到讀寫分離鎖。

8.1同步控制

由於大多數的併發場景都是需要訪問到共享資源的,為了保證執行緒安全,我們不得已採用鎖的技術來做同步控制,這節我們介紹的是適用不同場景各種鎖技術。

ReentrantLock

可重入互斥鎖具有與使用synchronized的隱式監視器鎖具有相同的行為和語義,但具有更好擴充套件功能。

ReentrantLock由最後成功鎖定的執行緒擁有,而且還未解鎖。當鎖未被其他執行緒佔有時,執行緒呼叫lock()將返回並且成功獲取鎖。如果當前執行緒已擁有鎖,則該方法將立即返回。這可以使用方法isHeldByCurrentThread()和getHoldCount()來檢查。

建構函式接受可選的fairness引數。當設定為true時,在競爭條件下,鎖定有利於賦予等待時間最長執行緒的訪問許可權。否則,鎖將不保證特定的訪問順序。在多執行緒訪問的情況,使用公平鎖比預設設定,有著更低的吞吐量,但是獲得鎖的時間比較小而且可以避免等待鎖導致的飢餓。但是,鎖的公平性並不能保證執行緒排程的公平性。因此,使用公平鎖的許多執行緒中的一個可以連續多次獲得它,而其他活動執行緒沒有進展並且當前沒有持有鎖。不定時的tryLock()方法不遵循公平性設定。即使其他執行緒正在等待,如果鎖可用,它也會成功。

任意指定鎖的起始位置

中斷響應

鎖申請等待限時tryLock()

公平鎖

Condition

Condition從擁有監控方法(wait,notify,notifyAll)的Object物件中抽離出來成為獨特的物件,高效的讓每個物件擁有更多的等待執行緒。和鎖對比起來,如果說用Lock代替synchronized,那麼Condition就是用來代替Object本身的監控方法。

Condition例項跟Object本身的監控相似,同樣提供wait()方法讓呼叫的執行緒暫時掛起讓出資源,知道其他執行緒通知該物件轉態變化,才可能繼續執行。Condition例項來源於Lock例項,透過Lock呼叫newCondition()即可。Condition較Object原生監控方法,可以保證通知順序。

Semaphore

鎖和同步塊同時只能允許單個執行緒訪問共享資源,這個明顯有些單調,部分場景其實可以允許多個執行緒訪問,這個時候訊號量例項就派上用場了。訊號量邏輯上維持了一組許可證, 執行緒呼叫acquire()阻塞直到許可證可用後才能執行。 執行release()意味著釋放許可證,實際上訊號量並沒有真正的許可證,只是採用了計數功能來實現這個功能。

ReadWriteLock

顧名思義讀寫鎖將讀寫分離,細化了鎖的粒度,照顧到效能的最佳化。

CountDownLatch

這個鎖有點“關門放狗”的意思,尤其在我們壓測的時候模擬實時並行請求,該例項將執行緒積累到指定數量後,呼叫countDown()方法讓所有執行緒同時執行。

CyclicBarrier

CyclicBarrier是加強版的CountDownLatch,上面講的是一次性“關門放狗”,而迴圈柵欄則是集齊了指定數量的執行緒,在資源都允許的情況下同時執行,然後下一批同樣的操作,週而復始。

LockSupport

LockSupport是用來建立鎖和其他同步類的基本執行緒阻塞原語。 LockSupport中的park() 和 unpark() 的作用分別是阻塞執行緒和解除阻塞執行緒,而且park()和unpark()不會遇到“Thread。suspend 和 Thread。resume所可能引發的死鎖”問題。因為park() 和 unpark()有許可的存在;呼叫 park() 的執行緒和另一個試圖將其 unpark() 的執行緒之間的競爭將保持活性。

8.2執行緒池

執行緒池總覽

執行緒多起來的話就需要管理,不然就會亂成一鍋。我們知道執行緒在物理上對應的就是棧裡面的一段記憶體,存放著區域性變數的空間和待執行指令集。如果每次執行都要從頭初始化這段記憶體,然後再交給CPU執行,效率就有點低了。假如我們知道該段棧記憶體會被經常用到,那我們就不要回收,建立完就讓它在棧裡面待著,要用的時候取出來,用完換回去,是不是就省了初始化執行緒空間的時間,這樣是我們搞出線程池的初衷。

其實執行緒池很簡單,就是搞了個池子放了一堆執行緒。既然我們搞執行緒池是為了提高效率,那就要考慮執行緒池放多少個執行緒比較合適,太多了或者太少了有什麼問題,怎麼拒絕多餘的請求,除了異常怎麼處理。首先我們來看跟執行緒池有關的一張類圖。

作為java開發工作人員必備的高併發程式設計知識體系(二)

作為java開發工作人員必備的高併發程式設計知識體系(二)

執行緒池歸結起來就是這幾個類的使用技巧了,重點關注ThreadPoolExecutor和Executors即可。

建立執行緒池

萬變不離其宗,建立執行緒池的各種馬甲方法最後都是呼叫到這方法裡面,包含核心執行緒數,最大執行緒數,執行緒工廠,拒絕策略等引數。其中執行緒工廠則可以實現自定義建立執行緒的邏輯。

public interface ThreadFactory { Thread newThread(Runnable r);}

建立的核心構造方法ThreadPoolExecutor。java 1301

/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters。 * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating。 * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed。 This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method。 * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:
* {@code corePoolSize < 0}
* {@code keepAliveTime < 0}
* {@code maximumPoolSize <= 0}
* {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

拒絕策略包含:

/** 實際上並未真正丟棄任務,但是執行緒池效能會下降 * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded。 */ public static class CallerRunsPolicy implements RejectedExecutionHandler /** 粗暴停止拋異常 * A handler for rejected tasks that throws a * {@code RejectedExecutionException}。 */ public static class AbortPolicy implements RejectedExecutionHandler /** 悄無聲息的丟棄拒絕的任務 * A handler for rejected tasks that silently discards the * rejected task。 */ public static class DiscardPolicy implements RejectedExecutionHandler /** 丟棄最老的請求 * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded。 */ public static class DiscardOldestPolicy implements RejectedExecutionHandler

包括Executors。java中的建立執行緒池的方法,具體實現也是透過ThreadPoolExecutor來建立的。

public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer。MAX_VALUE, 60L, TimeUnit。SECONDS, new SynchronousQueue());}public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue());}public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit。MILLISECONDS, new LinkedBlockingQueue()));}

呼叫執行緒池

ThreadPoolExecutor。java 1342

/** 同步執行執行緒,出現異常列印堆疊資訊 * Executes the given task sometime in the future。 The task * may execute in a new thread or in an existing pooled thread。 * * If the task cannot be submitted for execution, either because this * executor has been shutdown or because its capacity has been reached, * the task is handled by the current {@code RejectedExecutionHandler}。 * * @param command the task to execute * @throws RejectedExecutionException at discretion of * {@code RejectedExecutionHandler}, if the task * cannot be accepted for execution * @throws NullPointerException if {@code command} is null */public void execute(Runnable command)/*** 非同步提交執行緒任務,出現異常無法同步追蹤堆疊,本質上也是呼叫execute()方法*/public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); RunnableFuture ftask = newTaskFor(task, result); execute(ftask); return ftask;}

執行緒池最佳化

執行緒池已經是我們使用執行緒的一個最佳化成果了,而執行緒池本身的最佳化其實就是根據實際業務選擇好不同型別的執行緒池,預估併發執行緒數量,控制好執行緒池預留執行緒數(最大執行緒數一般設為2N+1最好,N是CPU核數),這些涉及CPU數量,核數還有具體業務。

另外我們還注意到ForkJoinPool繼承了AbstractExecutorService,這是在JDK7才加上去的,目的就是提高任務派生出來更多工的執行效率,由上圖的繼承關係我們可以知道跟普通執行緒池最大的差異是執行的任務型別不同。

public void execute(ForkJoinTask<?> task) { if (task == null) throw new NullPointerException(); externalPush(task);}public void execute(Runnable task) { if (task == null) throw new NullPointerException(); ForkJoinTask<?> job; if (task instanceof ForkJoinTask<?>) // avoid re-wrap job = (ForkJoinTask<?>) task; else job = new ForkJoinTask。RunnableExecuteAction(task); externalPush(job); }

8.3併發容器

其實我們日常開發大多數併發場景直接用JDK 提供的執行緒安全資料結構足矣,下面列舉了常用的列表,集合等容器,具體就不展開講,相信大家都用得很熟悉了。

ConcurrentHashMap

CopyOnWriteArrayList

ConcurrentLinkedQueue

BlockingQueue

ConcurrentSkipListMap

Vector

HashTable

9.執行緒調優

9.1效能指標

回想一下,當我們在談效能最佳化的時候,我們可能指的是資料庫的讀寫次數,也可能指網站的響應時間。通常我們會用QPS,TPS,RT,併發數,吞吐量,更進一步的還會對比CPU負載來衡量一個系統的效能。

當然我們知道一個系統的吞吐量和響應時間跟外部網路,分散式架構等都存在強關聯,效能最佳化也跟各級快取設計,資料冗餘等架構有很大關係,假設其他方面我們都已經完成了,聚焦到本文我們暫時關心的是單節點的效能最佳化。畢竟一屋不掃何以掃天下,整體系統的最佳化也有賴於各個節點的調優。從感官上來談,當請求量很少的時候,我們可以很輕鬆的透過各種快取最佳化來提高響應時間。但是隨著使用者激增,請求次數的增加,我們的服務也對應著需要併發模型來支撐。但是一個節點的併發量有個上限,當達到這個上限後,響應時間就會變長,所以我們需要探索併發到什麼程度才是最優的,才能保證最高的併發數,同時響應時間又能保持在理想情況。由於我們暫時不關注節點以外的網路情況,那麼下文我們特指的RT是指服務接收到請求後,完成計算,返回計算結果經歷的時間。

單執行緒

單執行緒情況下,服務接收到請求後開始初始化,資源準備,計算,返回結果,時間主要花在CPU計算和CPU外的IO等待時間,多個請求來也只能排隊一個一個來,那麼RT計算如下

RT = T(cpu) + T(io)

QPS = 1000ms / RT

多執行緒

單執行緒情況很好計算,多執行緒情況就複雜了,我們目標是計算出最佳併發量,也就是執行緒數N

單核情況:N = [T(cpu) + T(io)] / T(cpu)

M核情況:N = [T(cpu) + T(io)] / T(cpu) * M

由於多核情況CPU未必能全部使用,存在一個資源利用百分比P

那麼併發的最佳執行緒數 N = [T(cpu) + T(io)] / T(cpu) M P

吞吐量

我們知道單執行緒的QPS很容易算出來,那麼多執行緒的QPS

QPS = 1000ms / RT N = 1000ms / T(cpu) + T(io) [T(cpu) + T(io)] / T(cpu) M P= 1000ms / T(cpu) M P

在機器核數固定情況下,也即是併發模式下最大的吞吐量跟服務的CPU處理時間和CPU利用率有關。CPU利用率不高,就是通常我們聽到最多的抱怨,壓測時候qps都打滿了,但是cpu的load就是上不去。併發模型中多半個共享資源有關,而共享資源又跟鎖息息相關,那麼大部分時候我們想對節點服務做效能調優時就是對鎖的最佳化,這個下一節會提到。

前面我們是假設機器核數固定的情況下做最佳化的,那假如我們把快取,IO,鎖都優化了,剩下的還有啥空間去突破呢?回想一下我們談基礎理論的時候提到的Amdahl定律,公式之前已經給出,該定律想表達的結論是隨著核數或者處理器個數的增加,可以增加最佳化加速比,但是會達到上限,而且增加趨勢愈發不明顯。

9.2鎖最佳化

說真的,我們並不喜歡鎖的,只不過由於臨界資源的存在不得已為之。如果業務上設計能避免出現臨界資源,那就沒有鎖最佳化什麼事了。但是,鎖最佳化的一些原則還是要說一說的。

時間

既然我們並不喜歡鎖,那麼就按需索取,只在核心的同步塊加鎖,用完立馬釋放,減少鎖定臨界區的時間,這樣就可以把資源競爭的風險降到最低。

粒度

進一步看,有時候我們核心同步塊可以進一步分離,比如只讀的情況下並不需要加鎖,這時候就可以用讀寫鎖各自的讀寫功能。

還有一種情況,有時候我們反而會小心翼翼的到處加鎖來防止意外出現,可能出現三個同步塊加了三個鎖,這也造成CPU的過多停頓,根據業務其實可以把相關邏輯合併起來,也就是鎖粗化。

鎖的分離和粗化具體還得看業務如何操作。

尺度

除了鎖暫用時間和粒度外,還有就是鎖的尺度,還是根據業務來,能用共享鎖定的情況就不要用獨享鎖。

死鎖

這個不用說都知道,死鎖防不勝防,我們前面也介紹很多現成的工具,比如可重入鎖,還有執行緒本地變數等方式,都可以一定程度避免死鎖。

9.3JVM鎖機制

我們在程式碼層面把鎖的應用都按照安全法則做到最好了,那接下來要做的就是下鑽到JVM級別的鎖最佳化。具體實現原理我們暫不展開,後續有機會再搞個專題寫寫JVM鎖實現。

自旋鎖(Spin Lock)

自旋鎖的原理非常簡單。如果持有鎖的執行緒可以在短時間內釋放鎖資源,那麼等待競爭鎖的那些執行緒不需要在核心狀態和使用者狀態之間進行切換。 它只需要等待,並且鎖可以在釋放鎖之後立即獲得鎖。這可以避免消耗使用者執行緒和核心切換。

但是,自旋鎖讓CPU空等著什麼也不幹也是一種浪費。 如果自旋鎖的物件一直無法獲得臨界資源,則執行緒也無法在沒有執行實際計算的情況下一致進行CPU空轉,因此需要設定自旋鎖的最大等待時間。如果持有鎖的執行緒在旋轉等待的最大時間沒有釋放鎖,則自旋鎖執行緒將停止旋轉進入阻塞狀態。

JDK1。6開啟自旋鎖 -XX:+UseSpinning,1。7之後控制器收回到JVM自主控制

偏向鎖(Biased Lock)

偏向鎖偏向於第一個訪問鎖的執行緒,如果在執行過程中,同步鎖只有一個執行緒訪問,不存在多執行緒爭用的情況,則執行緒是不需要觸發同步的,這種情況下,就會給執行緒加一個偏向鎖。如果在執行過程中,遇到了其他執行緒搶佔鎖,則持有偏向鎖的執行緒會被掛起,JVM會消除它身上的偏向鎖,將鎖恢復到標準的輕量級鎖。

JDK1。6開啟自旋鎖 -XX:+UseBiasedLocking,1。7之後控制器收回到JVM自主控制

輕量級鎖(Lightweight Lock)

輕量級鎖是由偏向鎖升級來的,偏向鎖執行在一個執行緒進入同步塊的情況下,當第二個執行緒加入鎖競爭的時候,偏向鎖就會升級為輕量級鎖。

重量級鎖(Heavyweight Lock)

如果鎖檢測到與另一個執行緒的爭用,則鎖定會膨脹至重量級鎖。也就是我們常規用的同步修飾產生的同步作用。

9.4無鎖

最後其實我想說的是,雖然鎖很符合我們人類的邏輯思維,設計起來也相對簡單,但是擺脫不了臨界區的限制。那麼我們不妨換個思路,進入無鎖的時間,也就是我們可能會增加業務複雜度的情況下,來消除鎖的存在。

CAS策略

著名的CAS(Compare And Swap),是多執行緒中用於實現同步的原子指令。 它將記憶體位置的內容與給定值進行比較,並且只有它們相同時,才將該記憶體位置的內容修改為新的給定值。 這是作為單個原子操作完成的。 原子性保證了新值是根據最新資訊計算出來的; 如果在此期間該值已被另一個執行緒更新,則寫入將失敗。 操作的結果必須表明它是否進行了替換; 這可以透過簡單的Boolean來響應,或透過返回從記憶體位置讀取的值(而不是寫入它的值)來完成。

也就是一個原子操作包含了要操作的資料和給定認為正確的值進行對比,一致的話就繼續,不一致則會重試。這樣就在沒有鎖的情況下完成併發操作。

我們知道原子類 AtomicInteger內部實現的原理就是採用了CAS策略來完成的。

AtomicInteger。java 132

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value。 * * @param expect the expected value * @param update the new value * @return {@code true} if successful。 False return indicates that * the actual value was not equal to the expected value。 */public final boolean compareAndSet(int expect, int update) { return unsafe。compareAndSwapInt(this, valueOffset, expect, update);}

類似的還有AtomicReference。java 115

/** * Atomically sets the value to the given updated value * if the current value {@code ==} the expected value。 * @param expect the expected value * @param update the new value * @return {@code true} if successful。 False return indicates that * the actual value was not equal to the expected value。 */public final boolean compareAndSet(V expect, V update) { return unsafe。compareAndSwapObject(this, valueOffset, expect, update);}

有興趣的同學可以再瞭解一下Unsafe的實現,進一步可以瞭解Distuptor無鎖框架。

10.併發模型

前面我們大費周章的從併發的基礎概念到多執行緒的使用方法和最佳化技巧。但都是戰術層面的,本節我們試著從戰略的高度來擴充套件一下併發程式設計的世界。可能大多數情況下我們談併發都會想到多執行緒,但是本節我們要打破這種思維,在完全不用搞多執行緒那一套的情況下實現併發。

首先我們用”多執行緒模式“來回顧前文所講的所有關於Thread衍生出來的定義,開發和最佳化的技術。

多執行緒模式

作為java開發工作人員必備的高併發程式設計知識體系(二)

單位執行緒完成完整的任務,也即是一條龍服務執行緒。

優勢:

對映現實單一任務,便於理解和編碼

劣勢:

作為java開發工作人員必備的高併發程式設計知識體系(二)

有狀態多執行緒共享資源,導致資源競爭,死鎖問題,執行緒等待阻塞,失去併發意義

有狀態多執行緒非阻塞演算法,有利減少競爭,提升效能,但難以實現

多執行緒執行順序無法預知

流水線模型

介紹完傳統多執行緒工作模式後,我們來學習另外一種併發模式,傳統的多執行緒工作模式,理解起來很直觀,接下來我們要介紹的另外一種併發模式看起來就不那麼直觀了。

作為java開發工作人員必備的高併發程式設計知識體系(二)

流水線模型,特點是無狀態執行緒,無狀態也意味著無需競爭共享資源,無需等待,也就是非阻塞模型。流水線模型顧名思義就是流水線上有多個環節,每個環節完成自己的工作後就交給下一個環節,無需等待上游,週而復始的完成自己崗位上的一畝三分地就行。各個環節之間交付無需等待,完成即可交付。

作為java開發工作人員必備的高併發程式設計知識體系(二)

而工廠的流水線也不止一條,所以有多條流水線同時工作。

作為java開發工作人員必備的高併發程式設計知識體系(二)

不同崗位的生產效率是不一樣的,所以不同流水線之間也可以發生協同。

作為java開發工作人員必備的高併發程式設計知識體系(二)

我們說流水線模型也稱為響應式模型或者事件驅動模型,其實就是流水線上上游崗位完成生產就通知下游崗位,所以完成了一個事件的通知,每完成一次就通知一下,就是響應式的意思。

流水線模型總體的思想就是縱向切分任務,把任務裡面耗時過久的環節單獨隔離出來,避免完成一個任務需要耗費等待的時間。在實現上又分為Actors和Channels模型

Actors

該模型跟我們講述的流水線模型基本一致,可以理解為響應式模型

作為java開發工作人員必備的高併發程式設計知識體系(二)

Channels

作為java開發工作人員必備的高併發程式設計知識體系(二)

由於各個環節直接不直接互動,所以上下游之間並不知道對方是誰,好比不同環節直接用的是幾條公共的傳送帶來接收物品,各自只需要把完成後的半成品扔到傳送帶,即使後面流水線優化了,去掉中間的環節,對於個體崗位來說也是無感知的,它只是週而復始的從傳送帶拿物品來加工。

流水線的優缺點:

優勢:

無共享狀態:無需考慮資源搶佔,死鎖等問題

獨享記憶體:worker可以持有記憶體,合併多次操作到記憶體後再持久化,提升效率

貼合底層:單執行緒模式貼合硬體執行流程,便於程式碼維護

任務順序可預知

劣勢:

不夠直觀:一個任務被拆分為流水線上多個環節,程式碼層面難以直觀理解業務邏輯

由於流水線模式跟人類的順序執行思維不一樣,比較費解,那麼有沒有辦法讓我們編碼的時候像寫傳統的多執行緒程式碼一樣,而執行起來又是流水線模式呢?答案是肯定的,比如基於Java的Akka/Reator/Vert。x/Play/Qbit框架,或者golang就是為流水線模式而生的併發語言,還有nodeJS等等。

流水線模型的開發實踐可以參考流水線模型實踐。

其實流水線模型背後用的也還是多執行緒來實現,只不過對於傳統多執行緒模式下我們需要小心翼翼來處理跟蹤資源共享問題,而流水線模式把以前一個執行緒做的事情拆成多個,每一個環節再用一條執行緒來完成,避免共享,執行緒直接透過管道傳輸訊息。

這一塊展開也是一個專題,主要設計NIO,Netty和Akka的程式設計實踐,先佔坑後面補上。

函式式模型

函式式並行模型類似流水線模型,單一的函式是無狀態的,所以避免了資源競爭的複雜度,同時每個函式類似流水線裡面的單一環境,彼此直接透過函式呼叫傳遞引數副本,函式之外的資料不會被修改。函式式模式跟流水線模式相輔相成逐漸成為更為主流的併發架構。具體的思想和程式設計實踐也是個大專題,篇幅限制本文就先不展開,擬在下個專題中詳細介紹《函數語言程式設計演化》。

11.總結

由於CPU和I/O天然存在的矛盾,傳統順序的同步工作模式導致任務阻塞,CPU空等著沒有執行,浪費資源。多執行緒為突破了同步工作模式的情況下浪費CPU資源,即使單核情況下也能將時間片拆分成單位給更多的執行緒來輪詢享用。多執行緒在不同享狀態的情況下非常高效,不管協同式還是搶佔式都能在單位時間內執行更多的任務,從而更好的榨取CPU資源。

但是多數情況下執行緒之間是需要通訊的,這一核心場景導致了一系列的問題,也就是執行緒安全。記憶體被共享的單位由於被不同執行緒輪番讀取寫入操作,這種操作帶來的後果往往是寫程式碼的人類沒想到的,也就是併發帶來的髒資料等問題。解決了資源使用效率問題,又帶來了新的安全問題,如何解決?悲觀方式就是對於存在共享記憶體的場景,無論如何只同意同一時刻一個執行緒操作,也就是同步操作方法或者程式碼段或者顯示加鎖。或者volatile來使共享的主存跟每條執行緒的工作記憶體同步(每次讀都從主存重新整理,每次寫完都刷到主存)

要保證執行緒安全:

1、不要使用多執行緒,

2、多執行緒各幹各的不要共享記憶體,

3、共享的記憶體空間是不可變的(常量,final),

4、實在要變每次變完要同步到主存volatile(依賴當前值的邏輯除外),

5、原子變數,

6、根據具體業務,避免髒資料(這塊就是多執行緒最容易犯錯的地方)

執行緒安全後,要考慮的就是效率問題,如果不解決效率問題,那還幹嘛要多執行緒。。。

如果所有執行緒都很自覺,快速執行完就跑路,那就是我們的理想情況了。但是,部分執行緒又臭又長(I/O阻塞),不能讓一直賴在CPU不走,就把他上下文(執行緒號,變數,執行到哪等數值的快照)儲存到記憶體,然後讓它滾蛋下一個執行緒來。但是切換太快的話也不合適,畢竟每次儲存執行緒的作案現場也要花不少時間的,單位時間執行執行緒數要控制在一個適當的個數。建立執行緒也是一項很吃力的工作,一個執行緒就是在棧記憶體裡面開闢一段記憶體空間,根據位元組碼分配臨時變數空間,不同作業系統通常不一樣。不能頻繁的建立銷燬執行緒。那就搞個執行緒池出來,用的時候拿出來,用完扔回去,簡單省事。但是執行緒池的建立也有門道,不能無限建立不然就失去意義了。作業系統有一定上限,執行緒池太多執行緒記憶體爆了,系統奔潰,所以需要一個機制。容納1024個執行緒,多了排隊再多了扔掉。回到執行緒切換,由於建立執行緒耗費資源,切換也花費,有時候切換執行緒的時間甚至比讓執行緒待在cpu無所事事更長,那就給加個自旋鎖,就是讓它自己再cpu打滾啥事不幹,一會兒輪到它裡面就能幹活。

既然多執行緒同步又得加鎖耗資源,不同步又有共享安全問題。那能不能把這些鎖,共享,同步,要注意的問題封裝起來。搞出一個非同步的工作機制,不用管底層的同步問題,只管業務問題。傳統是工匠幹活一根筋幹完,事件驅動是流水線,把一件事拆分成多個環節,每個環節有唯一標識,各個環節批次生產,在流水線對接。這樣在CPU單獨幹,不共享,不阻塞,幹完自己的通知管工,高效封裝了內部執行緒的執行規則,把業務關係暴露給管理者。

本文主要將的數基於JAVA的傳統多執行緒併發模型,下面例牌給出知識體系圖。

作為java開發工作人員必備的高併發程式設計知識體系(二)

影片資料領取☟☟☟

作為java開發工作人員必備的高併發程式設計知識體系(二)

原創: 程式設計原理林振華