上一節我們分析了Producer的核心元件,我們得到了一張關鍵的元件圖。你還記得麼?
簡單概括下上面的圖就是:
建立了Metadata元件,內部透過Cluster維護元資料
初始化了傳送訊息的記憶體緩衝器RecordAccumulator
建立了NetworkClient,內部最重要的是建立了NIO的Selector元件
啟動了一個Sender執行緒,Sender引用了上面的所有元件,開始執行run方法。
圖的最下方可以看到,上一節截止到了run方法的執行,這一節我們首先會看看run方法核心脈絡做了什麼。接著分析下Producer第一個核心流程:元資料拉取的原始碼原理。
讓我們開始吧!
Sender的run方法在做什麼?
這一節我們就繼續分析下,sender的run方法開始執行會做什麼。
public void run() { log。debug(“Starting Kafka producer I/O thread。”); // main loop, runs until close is called while (running) { try { run(time。milliseconds()); } catch (Exception e) { log。error(“Uncaught error in kafka producer I/O thread: ”, e); } } log。debug(“Beginning shutdown of Kafka producer I/O thread, sending remaining records。”); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed。 while (!forceClose && (this。accumulator。hasUnsent() || this。client。inFlightRequestCount() > 0)) { try { run(time。milliseconds()); } catch (Exception e) { log。error(“Uncaught error in kafka producer I/O thread: ”, e); } } if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures。 this。accumulator。abortIncompleteBatches(); } try { this。client。close(); } catch (Exception e) { log。error(“Failed to close network client”, e); } log。debug(“Shutdown of Kafka producer I/O thread has completed。”); }
這個run方法的核心脈絡很簡單。主要就是2個while迴圈+執行緒的close,而2個while迴圈,他們都呼叫了run(long time)的這個方法。
透過註釋你可以看到,第二個while是處理特殊情況的,當第一個while退出後,還有未傳送的請求,需要第二個while迴圈處理完成,才會關閉執行緒。
整體脈絡如下圖所示:
接著其實就該看下run方法主要在幹什麼了?
/** * Run a single iteration of sending * * @param now * The current POSIX time in milliseconds */ void run(long now) { Cluster cluster = metadata。fetch(); // get the list of partitions with data ready to send RecordAccumulator。ReadyCheckResult result = this。accumulator。ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result。unknownLeadersExist) this。metadata。requestUpdate(); // remove any nodes we aren‘t ready to send to Iterator
上面的程式碼,你如果第一次看,你肯定會覺得,這個脈絡非常不清晰,不知道重點在哪裡。不過還好有些註釋,你能大體猜到他在幹嘛。
accumulator的ready,networkclient的ready、networkclient的send、networkclient的poll
這些好像是在準備記憶體區域、準備網路連線的node節點、傳送資料、拉取響應結果的意思。
可是如果你猜不到,該怎麼辦呢?
這時候就可以
祭出debug這個殺器了
。由於是producer,我們可以在Hellowolrd的這個客戶端打斷點,一步一步看下。
當你對run方法一步一步打了斷點之後你會發現:
accumulator的ready,networkclient的ready、networkclient的send 這些的邏輯幾乎都沒有執行,全部都是初始化空物件,或者方法內部直接return。
直接一路執行到了client。poll方法。如下圖所示:
那麼,你可以得出一個結論,while第一次迴圈這個run方法的核心邏輯,其實只有一句話:
client。poll(pollTimeout, now)
整體脈絡如下所示:
看來接下來,這個NetworkClient的poll方法,就是關鍵中的關鍵了:
/** * Do actual reads and writes to sockets。 * 對套接字進行實際讀取和寫入 * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative。 The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * @param now The current time in milliseconds * @return The list of responses received */ @Override public List
這個方法的脈絡就清晰多了,透過方法名和註釋,我們幾乎可以猜出他的一些作用主要有:
1)註釋說:對套接字進行實際讀取和寫入
2)metadataUpdater.maybeUpdate(),你還記得NetworkClient的元件DefaultMetadataUpdater麼,方法名意思是可能進行元資料更新。這個好像很關鍵的樣子
3)接著執行了Selector的poll方法,這個是NetworkClient的另一個元件Selector,還記得麼?它底層封裝了原生的NIO Selector。這個方法應該也比較關鍵。
4)後續對response執行了一系列的方法,從名字上看, handleCompletedSends 處理完成傳送的請求、handleCompletedReceives處理完成接受的請求、handleDisconnections處理斷開連線的請求、handleConnections處理連線成功的請求、處理超時的請求handleTimedOutRequests。根據不同情況有不同的處理。
5)最後還有一個response的相關的回撥處理,如果註冊了回撥函式,會執行下。這個應該不是很關鍵的邏輯
也就是簡單的說就是
NetworkClient執行poll方法,主要透過selector處理請求的讀取和寫入,對響應結果做不同的處理而已。
如下圖所示:
到這裡其實我們基本摸清出了run方法主要在做的一件事情了,由於是第一次迴圈,之前的accumulator的ready,networkclient的ready、networkclient的send 什麼都沒做,第一次while迴圈run方法核心執行的是networkclient。poll方法。而poll方法的主要邏輯就是上面圖中所示的了。
maybeUpdate可能在在拉取元資料?
剛才我們分析到,poll方法首先執行的是DefaultMetadataUpdater的maybeUpdate方法,它是可能更新的意思。我們來一起看下他的邏輯吧。
public long maybeUpdate(long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata。timeToNextUpdate(now); long timeToNextReconnectAttempt = Math。max(this。lastNoNodeAvailableMs + metadata。refreshBackoff() - now, 0); long waitForMetadataFetch = this。metadataFetchInProgress ? Integer。MAX_VALUE : 0; // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math。max(Math。max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode。 Node node = leastLoadedNode(now); maybeUpdate(now, node); } return metadataTimeout; } /** * The next time to update the cluster info is the maximum of the time the current info will expire and the time the * current info can be updated (i。e。 backoff time has elapsed); If an update has been request then the expiry time * is now */ public synchronized long timeToNextUpdate(long nowMs) { long timeToExpire = needUpdate ? 0 : Math。max(this。lastSuccessfulRefreshMs + this。metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this。lastRefreshMs + this。refreshBackoffMs - nowMs; return Math。max(timeToExpire, timeToAllowUpdate); }
原來這裡有一個時間的判斷,當判斷滿足才會執行maybeUpdate。
這個時間計算好像比較複雜,但是大體可以看出來,metadataTimeout是根據三個時間綜合判斷出來的,如果是0才會執行真正的maybeUpdate()。
像這種時候,我們可以直接在metadataTimeout這裡打一個斷點,看下它的值是如何計算的,比如下圖:
你會發現,當第一次執行while迴圈,執行到poll方法,執行到這個maybeUpdate的時候,決定metadataTimeout的3個值,有兩個是0,其中一個是非0,是一個299720的值。最終導致metadataTimeout也是非0,是299720。
也就是說,
第一次while迴圈不會執行maybeUpdate的任何邏輯。
那麼接著向下執行 Selector的poll()方法。
/** * Do whatever I/O can be done on each connection without blocking。 This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives。 * 在不阻塞的情況下,在每個連線上做任何可以做的 I/O。這包括完成連線完成、斷開連線,啟動新的傳送,或在進行中的傳送或接收請求 */ @Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException(“timeout should be >= 0”); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys。isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time。nanoseconds(); //這個方法是NIO底層Selector。select(),會阻塞監聽 int readyKeys = select(timeout); long endSelect = time。nanoseconds(); currentTimeNanos = endSelect; this。sensors。selectTime。record(endSelect - startSelect, time。milliseconds()); //如果監聽到有操作的SelectionKeys,也就是readyKeys>0< 會執行一些操作 if (readyKeys > 0 || !immediatelyConnectedKeys。isEmpty()) { pollSelectionKeys(this。nioSelector。selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); } addToCompletedReceives(); long endIo = time。nanoseconds(); this。sensors。ioTime。record(endIo - endSelect, time。milliseconds()); maybeCloseOldestConnection(); } private int select(long ms) throws IOException { if (ms < 0L) throw new IllegalArgumentException(“timeout should be >= 0”); if (ms == 0L) return this。nioSelector。selectNow(); else return this。nioSelector。select(ms); }
上面的脈絡主要是2步:
1)select(timeout): NIO底層selector.select(),會阻塞監聽
2)pollSelectionKeys(): 監聽到有操作的SelectionKeys,做了一些操作
也就是說,
最終,Sender執行緒的run方法,第一次while迴圈執行poll方法,最後什麼都沒幹,會被selector.select()阻塞住。
如下圖所示:
new KafkaProducer之後
分析完了run方法的執行 ,我們分析的KafkaProducerHelloWorld第一步new KafkaProducer()基本就完成了。
大家經歷了一節半的時間,終於分析清楚了KafkaProducer建立的原理。不不知道你對Kafka的Producer是不是有了更深的理解了。
分析了new KafkaProducer()之後呢?
我們繼續接著KafkaProducerHelloWorld往下分析,你還記得KafkaProducerHelloWorld的程式碼麼?
public class KafkaProducerHelloWorld { public static void main(String[] args) throws Exception { //配置Kafka的一些引數 Properties props = new Properties(); props。put(“bootstrap。servers”, “mengfanmao。org:9092”); props。put(“key。serializer”, “org。apache。kafka。common。serialization。StringSerializer”); props。put(“value。serializer”, “org。apache。kafka。common。serialization。StringSerializer”); // 建立一個Producer例項 KafkaProducer
KafkaProducerHelloWorld主要就3步:
1)new KafkaProducer 這個我們已經分析完了,主要分析了配置檔案的解析、各個元件是什麼、有什麼,還有就是剛才分析的run執行緒第一次迴圈到底執行了什麼。
2) new ProducerRecord 建立待發送的訊息
3) producer。send() 傳送訊息
首先建立待發送的訊息:
ProducerRecord
我們之前提過,Record表示了一條訊息的抽象封裝。這個ProducerRecord其實就表示了一條訊息。
從建構函式的註釋可以看出來
,ProducerRecord可以指定往哪個topic,哪一個分割槽partition,並且訊息可以設定一個時間戳。分割槽和時間戳預設可以不指定
其實看這塊原始碼,我們主要得到的資訊就是這些了,這些都比較簡單。就不畫圖了。
傳送訊息時的元資料拉取觸發
當Producer和Record都建立好了之後,可以用同步或者非同步的方式傳送訊息。
// 同步方式傳送訊息,會阻塞在這裡,直到傳送完成// producer。send(record)。get();// 非同步方式傳送訊息,不阻塞,設定一個監聽回撥函式即可producer。send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) { System。out。println(“訊息傳送成功”); } else { exception。printStackTrace(); } }}); //同步傳送 @Override public Future
同步和非同步的整個傳送邏輯如下圖所示:
從上圖你會發現,但是無論同步傳送還是非同步底層都會呼叫同一個方法doSend()。區別就是有沒有callBack回撥函式而已,他們還都在呼叫前註冊一些攔截器,這裡我們抓大放小下,我們重點還是關注doSend方法。
doSend方法如下:
/** * Implementation of asynchronously send a record to a topic。 Equivalent to send(record, null)
。 * See {@link #send(ProducerRecord, Callback)} for details。 */private Future
這個方法的脈絡雖然比較長,但是脈絡還是比較清晰,主要先執行了:
1)waitOnMetadata 應該是等待元資料拉取
2)keySerializer.serialize和valueSerializer.serialize,很明顯就是將Record序列化成byte位元組陣列
3)透過partition進行路由分割槽,按照一定路由策略選擇Topic下的某個分割槽
4)accumulator.append將訊息放入緩衝器中
5)喚醒Sender執行緒的selector.select()的阻塞,開始處理記憶體緩衝器中的資料。
用圖來表示如下所示:
這兩節我們重點分析元資料拉取的這個場景的原始碼原理。
所以這裡我們著重先看下步驟1 ,之後的4步我們之後會分析到的。
waitOnMetadata 如何等待元資料拉取的?
既然send的第一步是執行waitOnMetadata方法,首先看下它的程式碼:
/** * Wait for cluster metadata including partitions for the given topic to be available。 * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The amount of time we waited in ms */ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already。 if (!this。metadata。containsTopic(topic)) this。metadata。add(topic); if (metadata。fetch()。partitionsForTopic(topic) != null) return 0; long begin = time。milliseconds(); long remainingWaitMs = maxWaitMs; while (metadata。fetch()。partitionsForTopic(topic) == null) { log。trace(“Requesting metadata update for topic {}。”, topic); int version = metadata。requestUpdate(); sender。wakeup(); metadata。awaitUpdate(version, remainingWaitMs); long elapsed = time。milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException(“Failed to update metadata after ” + maxWaitMs + “ ms。”); if (metadata。fetch()。unauthorizedTopics()。contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; } return time。milliseconds() - begin; } /** * Get the current cluster info without blocking */ public synchronized Cluster fetch() { return this。cluster; } public synchronized int requestUpdate() { this。needUpdate = true; return this。version; } /** * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException(“Max time to wait for metadata updates should not be < 0 milli seconds”); } long begin = System。currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this。version <= lastVersion) { if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System。currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException(“Failed to update metadata after ” + maxWaitMs + “ ms。”); remainingWaitMs = maxWaitMs - elapsed; } }
這個方法核心就是判斷了是否有Cluster元資料資訊,如果沒有,進行了如下操作:
1)metadata.requestUpdate(); 更新了一個needUpdate標記,這個值會影響之前maybeUpdate的metadataTimeout的計算,可以讓metadataTimeout為0
2)sender.wakeup();喚醒之前nioSelector.select()的阻塞,繼續執行
3)metadata.awaitUpdate(version, remainingWaitMs); 主要進行了版本比較,如果不是最新版本,呼叫了Metadata.wait()方法(wait方法是每個Object都會有的方法,一般和notify或者notifyAll組合使用)
整個過程我直接用圖給大家表示一下,如下所示:
整個圖就是今天我們分析的關鍵結果了,
這裡透過兩種阻塞和喚醒機制,一個是NIO中Selector的select()和wakeUp(),一個是MetaData物件的wait()和notifyAll()機制。
所以這裡要結合之前Sender執行緒的阻塞邏輯一起來理解。
是不是很有意思一種使用,這裡沒有用任何執行緒的join、sleep、wait、park、unpark、notify這些方法。
小結
最後我們簡單小結下,這裡一節我們主要分析瞭如下Producer的原始碼原理:
初始化KafkaProducer時並沒有去拉取元資料,但是建立了Selector元件,啟動了Sender執行緒,select阻塞等待請求響應。由於還沒有傳送任何請求,所以初始化時並沒有去真正拉取元資料。
真正拉取元資料是在第一次send方法呼叫時,會喚醒喚醒Selector之前阻塞的select(),進入第二次while迴圈,從而傳送拉取元資料請求,並且透過Obejct。wait的機制等待60s,等到從Broker拉取元資料成功後,才會繼續執行真正的生產訊息的請求,否則會報拉取元資料超時異常。
這一節我們只是看到了進行了wait如何等待元資料拉取。
而喚醒Selector的select之後應該會進入第二次while迴圈
那第二次while迴圈如何傳送請求拉取元資料請求,並且在成功後notifyAll()進行喚醒操作的呢?
讓我們下一節繼續分析,大家敬請期待! 我們下一節見!
本文由部落格一文多發平臺 OpenWrite 釋出!