Kafka成長3:Producer 元資料拉取原始碼原理(上)

Kafka成長3:Producer 元資料拉取原始碼原理(上)

上一節我們分析了Producer的核心元件,我們得到了一張關鍵的元件圖。你還記得麼?

Kafka成長3:Producer 元資料拉取原始碼原理(上)

簡單概括下上面的圖就是:

建立了Metadata元件,內部透過Cluster維護元資料

初始化了傳送訊息的記憶體緩衝器RecordAccumulator

建立了NetworkClient,內部最重要的是建立了NIO的Selector元件

啟動了一個Sender執行緒,Sender引用了上面的所有元件,開始執行run方法。

圖的最下方可以看到,上一節截止到了run方法的執行,這一節我們首先會看看run方法核心脈絡做了什麼。接著分析下Producer第一個核心流程:元資料拉取的原始碼原理。

讓我們開始吧!

Sender的run方法在做什麼?

這一節我們就繼續分析下,sender的run方法開始執行會做什麼。

public void run() { log。debug(“Starting Kafka producer I/O thread。”); // main loop, runs until close is called while (running) { try { run(time。milliseconds()); } catch (Exception e) { log。error(“Uncaught error in kafka producer I/O thread: ”, e); } } log。debug(“Beginning shutdown of Kafka producer I/O thread, sending remaining records。”); // okay we stopped accepting requests but there may still be // requests in the accumulator or waiting for acknowledgment, // wait until these are completed。 while (!forceClose && (this。accumulator。hasUnsent() || this。client。inFlightRequestCount() > 0)) { try { run(time。milliseconds()); } catch (Exception e) { log。error(“Uncaught error in kafka producer I/O thread: ”, e); } } if (forceClose) { // We need to fail all the incomplete batches and wake up the threads waiting on // the futures。 this。accumulator。abortIncompleteBatches(); } try { this。client。close(); } catch (Exception e) { log。error(“Failed to close network client”, e); } log。debug(“Shutdown of Kafka producer I/O thread has completed。”); }

這個run方法的核心脈絡很簡單。主要就是2個while迴圈+執行緒的close,而2個while迴圈,他們都呼叫了run(long time)的這個方法。

透過註釋你可以看到,第二個while是處理特殊情況的,當第一個while退出後,還有未傳送的請求,需要第二個while迴圈處理完成,才會關閉執行緒。

整體脈絡如下圖所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

接著其實就該看下run方法主要在幹什麼了?

/** * Run a single iteration of sending * * @param now * The current POSIX time in milliseconds */ void run(long now) { Cluster cluster = metadata。fetch(); // get the list of partitions with data ready to send RecordAccumulator。ReadyCheckResult result = this。accumulator。ready(cluster, now); // if there are any partitions whose leaders are not known yet, force metadata update if (result。unknownLeadersExist) this。metadata。requestUpdate(); // remove any nodes we aren‘t ready to send to Iterator iter = result。readyNodes。iterator(); long notReadyTimeout = Long。MAX_VALUE; while (iter。hasNext()) { Node node = iter。next(); if (!this。client。ready(node, now)) { iter。remove(); notReadyTimeout = Math。min(notReadyTimeout, this。client。connectionDelay(node, now)); } } // create produce requests Map> batches = this。accumulator。drain(cluster, result。readyNodes, this。maxRequestSize, now); if (guaranteeMessageOrder) { // Mute all the partitions drained for (List batchList : batches。values()) { for (RecordBatch batch : batchList) this。accumulator。mutePartition(batch。topicPartition); } } List expiredBatches = this。accumulator。abortExpiredBatches(this。requestTimeout, now); // update sensors for (RecordBatch expiredBatch : expiredBatches) this。sensors。recordErrors(expiredBatch。topicPartition。topic(), expiredBatch。recordCount); sensors。updateProduceRequestMetrics(batches); List requests = createProduceRequests(batches, now); // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately // loop and try sending more data。 Otherwise, the timeout is determined by nodes that have partitions with data // that isn’t yet sendable (e。g。 lingering, backing off)。 Note that this specifically does not include nodes // with sendable data that aren‘t ready to send since they would cause busy looping。 long pollTimeout = Math。min(result。nextReadyCheckDelayMs, notReadyTimeout); if (result。readyNodes。size() > 0) { log。trace(“Nodes with data ready to send: {}”, result。readyNodes); log。trace(“Created {} produce requests: {}”, requests。size(), requests); pollTimeout = 0; } for (ClientRequest request : requests) client。send(request, now); // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; this。client。poll(pollTimeout, now); }

上面的程式碼,你如果第一次看,你肯定會覺得,這個脈絡非常不清晰,不知道重點在哪裡。不過還好有些註釋,你能大體猜到他在幹嘛。

accumulator的ready,networkclient的ready、networkclient的send、networkclient的poll

這些好像是在準備記憶體區域、準備網路連線的node節點、傳送資料、拉取響應結果的意思。

可是如果你猜不到,該怎麼辦呢?

這時候就可以

祭出debug這個殺器了

。由於是producer,我們可以在Hellowolrd的這個客戶端打斷點,一步一步看下。

當你對run方法一步一步打了斷點之後你會發現:

accumulator的ready,networkclient的ready、networkclient的send 這些的邏輯幾乎都沒有執行,全部都是初始化空物件,或者方法內部直接return。

直接一路執行到了client。poll方法。如下圖所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

那麼,你可以得出一個結論,while第一次迴圈這個run方法的核心邏輯,其實只有一句話:

client。poll(pollTimeout, now)

整體脈絡如下所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

看來接下來,這個NetworkClient的poll方法,就是關鍵中的關鍵了:

/** * Do actual reads and writes to sockets。 * 對套接字進行實際讀取和寫入 * * @param timeout The maximum amount of time to wait (in ms) for responses if there are none immediately, * must be non-negative。 The actual timeout will be the minimum of timeout, request timeout and * metadata timeout * @param now The current time in milliseconds * @return The list of responses received */ @Override public List poll(long timeout, long now) { long metadataTimeout = metadataUpdater。maybeUpdate(now); try { this。selector。poll(Utils。min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { log。error(“Unexpected error during I/O”, e); } // process completed actions long updatedNow = this。time。milliseconds(); List responses = new ArrayList<>(); handleCompletedSends(responses, updatedNow); handleCompletedReceives(responses, updatedNow); handleDisconnections(responses, updatedNow); handleConnections(); handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { if (response。request()。hasCallback()) { try { response。request()。callback()。onComplete(response); } catch (Exception e) { log。error(“Uncaught error in request completion:”, e); } } } return responses; }

這個方法的脈絡就清晰多了,透過方法名和註釋,我們幾乎可以猜出他的一些作用主要有:

1)註釋說:對套接字進行實際讀取和寫入

2)metadataUpdater.maybeUpdate(),你還記得NetworkClient的元件DefaultMetadataUpdater麼,方法名意思是可能進行元資料更新。這個好像很關鍵的樣子

3)接著執行了Selector的poll方法,這個是NetworkClient的另一個元件Selector,還記得麼?它底層封裝了原生的NIO Selector。這個方法應該也比較關鍵。

4)後續對response執行了一系列的方法,從名字上看, handleCompletedSends 處理完成傳送的請求、handleCompletedReceives處理完成接受的請求、handleDisconnections處理斷開連線的請求、handleConnections處理連線成功的請求、處理超時的請求handleTimedOutRequests。根據不同情況有不同的處理。

5)最後還有一個response的相關的回撥處理,如果註冊了回撥函式,會執行下。這個應該不是很關鍵的邏輯

也就是簡單的說就是

NetworkClient執行poll方法,主要透過selector處理請求的讀取和寫入,對響應結果做不同的處理而已。

如下圖所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

到這裡其實我們基本摸清出了run方法主要在做的一件事情了,由於是第一次迴圈,之前的accumulator的ready,networkclient的ready、networkclient的send 什麼都沒做,第一次while迴圈run方法核心執行的是networkclient。poll方法。而poll方法的主要邏輯就是上面圖中所示的了。

maybeUpdate可能在在拉取元資料?

剛才我們分析到,poll方法首先執行的是DefaultMetadataUpdater的maybeUpdate方法,它是可能更新的意思。我們來一起看下他的邏輯吧。

public long maybeUpdate(long now) { // should we update our metadata? long timeToNextMetadataUpdate = metadata。timeToNextUpdate(now); long timeToNextReconnectAttempt = Math。max(this。lastNoNodeAvailableMs + metadata。refreshBackoff() - now, 0); long waitForMetadataFetch = this。metadataFetchInProgress ? Integer。MAX_VALUE : 0; // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math。max(Math。max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode。 Node node = leastLoadedNode(now); maybeUpdate(now, node); } return metadataTimeout; } /** * The next time to update the cluster info is the maximum of the time the current info will expire and the time the * current info can be updated (i。e。 backoff time has elapsed); If an update has been request then the expiry time * is now */ public synchronized long timeToNextUpdate(long nowMs) { long timeToExpire = needUpdate ? 0 : Math。max(this。lastSuccessfulRefreshMs + this。metadataExpireMs - nowMs, 0); long timeToAllowUpdate = this。lastRefreshMs + this。refreshBackoffMs - nowMs; return Math。max(timeToExpire, timeToAllowUpdate); }

原來這裡有一個時間的判斷,當判斷滿足才會執行maybeUpdate。

這個時間計算好像比較複雜,但是大體可以看出來,metadataTimeout是根據三個時間綜合判斷出來的,如果是0才會執行真正的maybeUpdate()。

像這種時候,我們可以直接在metadataTimeout這裡打一個斷點,看下它的值是如何計算的,比如下圖:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

你會發現,當第一次執行while迴圈,執行到poll方法,執行到這個maybeUpdate的時候,決定metadataTimeout的3個值,有兩個是0,其中一個是非0,是一個299720的值。最終導致metadataTimeout也是非0,是299720。

也就是說,

第一次while迴圈不會執行maybeUpdate的任何邏輯。

Kafka成長3:Producer 元資料拉取原始碼原理(上)

那麼接著向下執行 Selector的poll()方法。

/** * Do whatever I/O can be done on each connection without blocking。 This includes completing connections, completing * disconnections, initiating new sends, or making progress on in-progress sends or receives。 * 在不阻塞的情況下,在每個連線上做任何可以做的 I/O。這包括完成連線完成、斷開連線,啟動新的傳送,或在進行中的傳送或接收請求 */ @Override public void poll(long timeout) throws IOException { if (timeout < 0) throw new IllegalArgumentException(“timeout should be >= 0”); clear(); if (hasStagedReceives() || !immediatelyConnectedKeys。isEmpty()) timeout = 0; /* check ready keys */ long startSelect = time。nanoseconds(); //這個方法是NIO底層Selector。select(),會阻塞監聽 int readyKeys = select(timeout); long endSelect = time。nanoseconds(); currentTimeNanos = endSelect; this。sensors。selectTime。record(endSelect - startSelect, time。milliseconds()); //如果監聽到有操作的SelectionKeys,也就是readyKeys>0< 會執行一些操作 if (readyKeys > 0 || !immediatelyConnectedKeys。isEmpty()) { pollSelectionKeys(this。nioSelector。selectedKeys(), false); pollSelectionKeys(immediatelyConnectedKeys, true); } addToCompletedReceives(); long endIo = time。nanoseconds(); this。sensors。ioTime。record(endIo - endSelect, time。milliseconds()); maybeCloseOldestConnection(); } private int select(long ms) throws IOException { if (ms < 0L) throw new IllegalArgumentException(“timeout should be >= 0”); if (ms == 0L) return this。nioSelector。selectNow(); else return this。nioSelector。select(ms); }

上面的脈絡主要是2步:

1)select(timeout): NIO底層selector.select(),會阻塞監聽

2)pollSelectionKeys(): 監聽到有操作的SelectionKeys,做了一些操作

也就是說,

最終,Sender執行緒的run方法,第一次while迴圈執行poll方法,最後什麼都沒幹,會被selector.select()阻塞住。

如下圖所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

new KafkaProducer之後

分析完了run方法的執行 ,我們分析的KafkaProducerHelloWorld第一步new KafkaProducer()基本就完成了。

大家經歷了一節半的時間,終於分析清楚了KafkaProducer建立的原理。不不知道你對Kafka的Producer是不是有了更深的理解了。

分析了new KafkaProducer()之後呢?

我們繼續接著KafkaProducerHelloWorld往下分析,你還記得KafkaProducerHelloWorld的程式碼麼?

public class KafkaProducerHelloWorld { public static void main(String[] args) throws Exception { //配置Kafka的一些引數 Properties props = new Properties(); props。put(“bootstrap。servers”, “mengfanmao。org:9092”); props。put(“key。serializer”, “org。apache。kafka。common。serialization。StringSerializer”); props。put(“value。serializer”, “org。apache。kafka。common。serialization。StringSerializer”); // 建立一個Producer例項 KafkaProducer producer = new KafkaProducer<>(props); // 封裝一條訊息 ProducerRecord record = new ProducerRecord<>( “test-topic”, “test-key”, “test-value”); // 同步方式傳送訊息,會阻塞在這裡,直到傳送完成 // producer。send(record)。get(); // 非同步方式傳送訊息,不阻塞,設定一個監聽回撥函式即可 producer。send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) { System。out。println(“訊息傳送成功”); } else { exception。printStackTrace(); } } }); Thread。sleep(5 * 1000); // 退出producer producer。close(); }

KafkaProducerHelloWorld主要就3步:

1)new KafkaProducer 這個我們已經分析完了,主要分析了配置檔案的解析、各個元件是什麼、有什麼,還有就是剛才分析的run執行緒第一次迴圈到底執行了什麼。

2) new ProducerRecord 建立待發送的訊息

3) producer。send() 傳送訊息

Kafka成長3:Producer 元資料拉取原始碼原理(上)

首先建立待發送的訊息:

ProducerRecord record = new ProducerRecord<>(“test-topic”, “test-key”, “test-value”);public ProducerRecord(String topic, K key, V value) { this(topic, null, null, key, value);} /** * Creates a record with a specified timestamp to be sent to a specified topic and partition * 建立具有指定時間戳的記錄以傳送到指定主題和分割槽 * @param topic The topic the record will be appended to * @param partition The partition to which the record should be sent * @param timestamp The timestamp of the record * @param key The key that will be included in the record * @param value The record contents */ public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) { if (topic == null) throw new IllegalArgumentException(“Topic cannot be null”); if (timestamp != null && timestamp < 0) throw new IllegalArgumentException(“Invalid timestamp ” + timestamp); this。topic = topic; this。partition = partition; this。key = key; this。value = value; this。timestamp = timestamp; }

我們之前提過,Record表示了一條訊息的抽象封裝。這個ProducerRecord其實就表示了一條訊息。

從建構函式的註釋可以看出來

,ProducerRecord可以指定往哪個topic,哪一個分割槽partition,並且訊息可以設定一個時間戳。分割槽和時間戳預設可以不指定

其實看這塊原始碼,我們主要得到的資訊就是這些了,這些都比較簡單。就不畫圖了。

傳送訊息時的元資料拉取觸發

當Producer和Record都建立好了之後,可以用同步或者非同步的方式傳送訊息。

// 同步方式傳送訊息,會阻塞在這裡,直到傳送完成// producer。send(record)。get();// 非同步方式傳送訊息,不阻塞,設定一個監聽回撥函式即可producer。send(record, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if(exception == null) { System。out。println(“訊息傳送成功”); } else { exception。printStackTrace(); } }}); //同步傳送 @Override public Future send(ProducerRecord record) { return send(record, null); } //非同步傳送 public Future send(ProducerRecord record, Callback callback) { // intercept the record, which can be potentially modified; this method does not throw exceptions ProducerRecord interceptedRecord = this。interceptors == null ? record : this。interceptors。onSend(record); return doSend(interceptedRecord, callback); }

同步和非同步的整個傳送邏輯如下圖所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

從上圖你會發現,但是無論同步傳送還是非同步底層都會呼叫同一個方法doSend()。區別就是有沒有callBack回撥函式而已,他們還都在呼叫前註冊一些攔截器,這裡我們抓大放小下,我們重點還是關注doSend方法。

doSend方法如下:

/** * Implementation of asynchronously send a record to a topic。 Equivalent to send(record, null)。 * See {@link #send(ProducerRecord, Callback)} for details。 */private Future doSend(ProducerRecord record, Callback callback) { TopicPartition tp = null; try { // first make sure the metadata for the topic is available long waitedOnMetadataMs = waitOnMetadata(record。topic(), this。maxBlockTimeMs); long remainingWaitMs = Math。max(0, this。maxBlockTimeMs - waitedOnMetadataMs); byte[] serializedKey; try { serializedKey = keySerializer。serialize(record。topic(), record。key()); } catch (ClassCastException cce) { throw new SerializationException(“Can’t convert key of class ” + record。key()。getClass()。getName() + “ to class ” + producerConfig。getClass(ProducerConfig。KEY_SERIALIZER_CLASS_CONFIG)。getName() + “ specified in key。serializer”); } byte[] serializedValue; try { serializedValue = valueSerializer。serialize(record。topic(), record。value()); } catch (ClassCastException cce) { throw new SerializationException(“Can‘t convert value of class ” + record。value()。getClass()。getName() + “ to class ” + producerConfig。getClass(ProducerConfig。VALUE_SERIALIZER_CLASS_CONFIG)。getName() + “ specified in value。serializer”); } int partition = partition(record, serializedKey, serializedValue, metadata。fetch()); int serializedSize = Records。LOG_OVERHEAD + Record。recordSize(serializedKey, serializedValue); ensureValidRecordSize(serializedSize); tp = new TopicPartition(record。topic(), partition); long timestamp = record。timestamp() == null ? time。milliseconds() : record。timestamp(); log。trace(“Sending record {} with callback {} to topic {} partition {}”, record, callback, record。topic(), partition); // producer callback will make sure to call both ’callback‘ and interceptor callback Callback interceptCallback = this。interceptors == null ? callback : new InterceptorCallback<>(callback, this。interceptors, tp); RecordAccumulator。RecordAppendResult result = accumulator。append(tp, timestamp, serializedKey, serializedValue, interceptCallback, remainingWaitMs); if (result。batchIsFull || result。newBatchCreated) { log。trace(“Waking up the sender since topic {} partition {} is either full or getting a new batch”, record。topic(), partition); this。sender。wakeup(); } return result。future; // handling exceptions and record the errors; // for API exceptions return them in the future, // for other exceptions throw directly } catch (ApiException e) { log。debug(“Exception occurred during message send:”, e); if (callback != null) callback。onCompletion(null, e); this。errors。record(); if (this。interceptors != null) this。interceptors。onSendError(record, tp, e); return new FutureFailure(e); } catch (InterruptedException e) { this。errors。record(); if (this。interceptors != null) this。interceptors。onSendError(record, tp, e); throw new InterruptException(e); } catch (BufferExhaustedException e) { this。errors。record(); this。metrics。sensor(“buffer-exhausted-records”)。record(); if (this。interceptors != null) this。interceptors。onSendError(record, tp, e); throw e; } catch (KafkaException e) { this。errors。record(); if (this。interceptors != null) this。interceptors。onSendError(record, tp, e); throw e; } catch (Exception e) { // we notify interceptor about all exceptions, since onSend is called before anything else in this method if (this。interceptors != null) this。interceptors。onSendError(record, tp, e); throw e; }}

這個方法的脈絡雖然比較長,但是脈絡還是比較清晰,主要先執行了:

1)waitOnMetadata 應該是等待元資料拉取

2)keySerializer.serialize和valueSerializer.serialize,很明顯就是將Record序列化成byte位元組陣列

3)透過partition進行路由分割槽,按照一定路由策略選擇Topic下的某個分割槽

4)accumulator.append將訊息放入緩衝器中

5)喚醒Sender執行緒的selector.select()的阻塞,開始處理記憶體緩衝器中的資料。

用圖來表示如下所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

這兩節我們重點分析元資料拉取的這個場景的原始碼原理。

所以這裡我們著重先看下步驟1 ,之後的4步我們之後會分析到的。

waitOnMetadata 如何等待元資料拉取的?

既然send的第一步是執行waitOnMetadata方法,首先看下它的程式碼:

/** * Wait for cluster metadata including partitions for the given topic to be available。 * @param topic The topic we want metadata for * @param maxWaitMs The maximum time in ms for waiting on the metadata * @return The amount of time we waited in ms */ private long waitOnMetadata(String topic, long maxWaitMs) throws InterruptedException { // add topic to metadata topic list if it is not there already。 if (!this。metadata。containsTopic(topic)) this。metadata。add(topic); if (metadata。fetch()。partitionsForTopic(topic) != null) return 0; long begin = time。milliseconds(); long remainingWaitMs = maxWaitMs; while (metadata。fetch()。partitionsForTopic(topic) == null) { log。trace(“Requesting metadata update for topic {}。”, topic); int version = metadata。requestUpdate(); sender。wakeup(); metadata。awaitUpdate(version, remainingWaitMs); long elapsed = time。milliseconds() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException(“Failed to update metadata after ” + maxWaitMs + “ ms。”); if (metadata。fetch()。unauthorizedTopics()。contains(topic)) throw new TopicAuthorizationException(topic); remainingWaitMs = maxWaitMs - elapsed; } return time。milliseconds() - begin; } /** * Get the current cluster info without blocking */ public synchronized Cluster fetch() { return this。cluster; } public synchronized int requestUpdate() { this。needUpdate = true; return this。version; } /** * Wait for metadata update until the current version is larger than the last version we know of */ public synchronized void awaitUpdate(final int lastVersion, final long maxWaitMs) throws InterruptedException { if (maxWaitMs < 0) { throw new IllegalArgumentException(“Max time to wait for metadata updates should not be < 0 milli seconds”); } long begin = System。currentTimeMillis(); long remainingWaitMs = maxWaitMs; while (this。version <= lastVersion) { if (remainingWaitMs != 0) wait(remainingWaitMs); long elapsed = System。currentTimeMillis() - begin; if (elapsed >= maxWaitMs) throw new TimeoutException(“Failed to update metadata after ” + maxWaitMs + “ ms。”); remainingWaitMs = maxWaitMs - elapsed; } }

這個方法核心就是判斷了是否有Cluster元資料資訊,如果沒有,進行了如下操作:

1)metadata.requestUpdate(); 更新了一個needUpdate標記,這個值會影響之前maybeUpdate的metadataTimeout的計算,可以讓metadataTimeout為0

2)sender.wakeup();喚醒之前nioSelector.select()的阻塞,繼續執行

3)metadata.awaitUpdate(version, remainingWaitMs); 主要進行了版本比較,如果不是最新版本,呼叫了Metadata.wait()方法(wait方法是每個Object都會有的方法,一般和notify或者notifyAll組合使用)

整個過程我直接用圖給大家表示一下,如下所示:

Kafka成長3:Producer 元資料拉取原始碼原理(上)

整個圖就是今天我們分析的關鍵結果了,

這裡透過兩種阻塞和喚醒機制,一個是NIO中Selector的select()和wakeUp(),一個是MetaData物件的wait()和notifyAll()機制。

所以這裡要結合之前Sender執行緒的阻塞邏輯一起來理解。

是不是很有意思一種使用,這裡沒有用任何執行緒的join、sleep、wait、park、unpark、notify這些方法。

小結

最後我們簡單小結下,這裡一節我們主要分析瞭如下Producer的原始碼原理:

初始化KafkaProducer時並沒有去拉取元資料,但是建立了Selector元件,啟動了Sender執行緒,select阻塞等待請求響應。由於還沒有傳送任何請求,所以初始化時並沒有去真正拉取元資料。

真正拉取元資料是在第一次send方法呼叫時,會喚醒喚醒Selector之前阻塞的select(),進入第二次while迴圈,從而傳送拉取元資料請求,並且透過Obejct。wait的機制等待60s,等到從Broker拉取元資料成功後,才會繼續執行真正的生產訊息的請求,否則會報拉取元資料超時異常。

這一節我們只是看到了進行了wait如何等待元資料拉取。

而喚醒Selector的select之後應該會進入第二次while迴圈

那第二次while迴圈如何傳送請求拉取元資料請求,並且在成功後notifyAll()進行喚醒操作的呢?

讓我們下一節繼續分析,大家敬請期待! 我們下一節見!

本文由部落格一文多發平臺 OpenWrite 釋出!