RocketMQ Producer傳送訊息流程

這節介紹Producer傳送訊息的流程。

接上一節開頭的Demo,傳送訊息的寫法如下:

public class SyncProducer { public static void main (String[] args) throws Exception { // 例項化訊息生產者Producer DefaultMQProducer producer = new DefaultMQProducer (“GroupTest”); // 設定NameServer的地址 producer。setNamesrvAddr (“localhost:9876”); // 啟動Producer例項 producer。start (); for (int i = 0; i < 100; i++) { // 建立訊息,並指定Topic,Tag和訊息體 Message msg = new Message (“TopicTest” /* Topic */, “TagA” /* Tag */, (“Hello RocketMQ ” + i)。getBytes (RemotingHelper。DEFAULT_CHARSET) /* Message body */ ); // 傳送訊息到一個Broker SendResult sendResult = producer。send (msg); // 透過sendResult返回訊息是否成功送達 System。out。printf (“%s%n”, sendResult); } // 如果不再發送訊息,關閉Producer例項。 producer。shutdown (); }}

傳送訊息的方法為:

SendResult sendResult = producer。send (msg);

其send方法內容如下:

public SendResult send( Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this。defaultMQProducerImpl。send(msg);}

主要呼叫了DefaulMQProducerImpl,委託給了DefaultMQProducerImpl的send方法。而DefaultMQProducerImpl又呼叫了自身的sendDefaultImpl,該方法完成了傳送的主要動作。sendDefaultImpl的定義如下:

private SendResult sendDefaultImpl( Message msg, final CommunicationMode communicationMode, final SendCallback sendCallback, final long timeout ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {}

引數包括:

Message:訊息內容

CommunicationMode:通訊模式,包括同步、非同步、單步

SendCallback:非同步模式下的回撥介面,包括成功和異常通知

timeout:超時時間

如下為SendDefaultImpl的主要執行過程:

RocketMQ Producer傳送訊息流程

(1) 確保客戶端已經初始化成功

主要確保DefaultMQProducerImpl的狀態為RUNNING

(2) 查詢topic的釋出資訊

從內部維護的ConcurrentMap中獲取topic對應的釋出資訊,上一節介紹過,該資訊會通過後臺執行緒定時更新,如果當前沒有topic對應的資訊,則會立即呼叫updateTopicRouteInfoFromNameServer方法實時同步。

TopicPlushInfo用於表示Topic的路由資訊,第一節介紹RocketMQ時說過,Topic的資料分佈在多個Broker上,同時在一個broker上還會分為若干個Queue以增加並行度。

RocketMQ Producer傳送訊息流程

上圖的關係圖中,TopicPlushInfo持有一個MessageQueue列表和一個TopicRouteData。MessageQueue表示了各個Queue的對映資訊,即上面提到的各個Broker上的Queue。而TopicRouteData則用於描述Broker的位置資訊和Queue的配置資訊。

(3) 判斷是否有路由資訊

如果上一步沒有查詢到topic對應的釋出資訊,則丟擲異常結束,否則轉到(4)

(4) 計算重試次數

根據通訊模式計算重試次數

int timesTotal = communicationMode == CommunicationMode。SYNC ? 1 + this。defaultMQProducer。getRetryTimesWhenSendFailed() : 1;

即如果是同步模式,則在失敗時會再重試配置的次數(預設為2次),其他情況不進行重試。

(5) 判斷當前是否需要重試

即判斷當前執行次數是否已經超過重試次數,如果已經超過,則說明重試次數用完,沒法繼續嘗試,判斷當前是否有結果,如果有結果則返回,否則丟擲異常結束。如果重試次數沒用完,則轉到(6)

(6) 選取一個延遲較短的broker

選取一個延遲較短的broker,該功能由MQFaultStrategy提供。這裡先介紹MQFaultStrategy,其提供了可選的故障延遲機制,對於請求響應較慢的broker,可以在一段時間內將其狀態置為不可用。如下圖:

RocketMQ Producer傳送訊息流程

可以透過MQFaultStrategy的sendLatencyFaultEnable屬性控制是否開啟故障延遲機制開關,預設為false不開啟。在開啟該開關時,則每次選取topic下對應的queue時,會基於之前執行的耗時,在有存在符合條件的broker的前提下,優選選取一個延遲較短的broker,否則再考慮隨機選取。

LatencyFaultTolerance用於維護有“故障”broker的“可用”狀態。對於每一個被定義為“故障”的broker,LatencyFaultTolerance內部都會有一個對應的FaultItem來表示,其主要內容如下:

class FaultItem implements Comparable { private final String name;//brokername private volatile long currentLatency;//最近發生延遲的時間點 private volatile long startTimestamp;//下一次開始可用的時間點 public FaultItem(final String name) { this。name = name; } public boolean isAvailable() {//判斷當前時間是否已經過了設定的開始可用時間點 return (System。currentTimeMillis() - startTimestamp) >= 0; } }

即當一個broker傳送故障時,會記錄其最近發生延遲的時間點和下一次開始可用的時間點,而一個broker“可用”的意思是指:該broker不存在LatencyFaultTolerance維護的faultItemTable屬性中,或者當前時間已經大於該broker下一次開始可用的時間點。

LatencyFaultTolerance透過updateFaultItem方法更新“故障”broker的可用狀態,該方法會直接更新faultItemTable中broker對應FaultItem的最近延遲時間和最近開始可用時間點。該方法需要給定指定broker的不可用時間。而判斷一個broker是否有故障以及不可用時間的方法,則在MQFaultStrategy的computNotAvaliableDuration方法中,如下:

private long computeNotAvailableDuration(final long currentLatency) { for (int i = latencyMax。length - 1; i >= 0; i——) { if (currentLatency >= latencyMax[i]) return this。notAvailableDuration[i]; } return 0;}

其中延遲級別陣列latencyMax和不可用時長陣列notAvailableDuration的定義如下:

private long[] latencyMax = {50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L};private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

即如果當前請求的“延遲時間”超過latencyMax的某個級別,則認為該broker已經是“故障”狀態,會從notAvailableDuration中選擇該broke對應的不可用時間。從這兩組陣列的定義也可以看出來,當延遲時間小於50ms則認為該broker狀態正常,不用進行故障延遲處理。而“延遲時間”則是指:呼叫remotingclient傳送netty請求這一步的耗時,會在後面進行說明。

現在回過頭來看當前步驟的動作,MQFaultStrategy的selectOneMessageQueue方法用於選取指定topic下的queue,其內容如下:

RocketMQ Producer傳送訊息流程

上面selectOneMessageQueue方法已經添加了註釋,在開啟了故障延遲機制時,該方法首先從topic下可選的queue列表中輪訓選擇一個broker“可用”的,並且brokerName=lastBrokerName(lastBrokerName表示上次使用過的broker,可以為空,表示第一次選擇)上的queue。如果沒有符合要求的broker,則會選擇一個“相對好”的broker上的queue,最後才會從該topic下可選的列表中隨機選擇一個queue。如果沒有開啟故障延遲機制,則會直接從該topic下最近使用過的broker上的可選queue列表中隨機選擇一個queue。

(7)呼叫remotingclient傳送netty請求

使用MQClientAPIImpl傳送訊息(內部使用RemotingClient),傳送模式包括單步、非同步、同步。步驟為:

從快取中獲取broker的ip和埠,如果快取中沒有該broker的資訊則從nameserver同步一次到快取,再從快取中獲取資訊

包裝一個傳送的上下文物件SendMessageContext

如果不是批次訊息,則設定唯一的ID,ID值為:應用啟動到當前的時間差+當前訊息計數

嘗試壓縮訊息體,目前,批次訊息不壓縮;單條訊息,超過配置的長度會進行壓縮,使用Deflater演算法。因為上層會在該步驟失敗時進行重試,因而改不在最後會使用finally將壓縮後的訊息體重新設定為未壓縮前的內容

如果存在CheckForbiddenHook,則執行

如果存在sendHook,則執行傳送前回調動作executeSendMessageHookBefore

包裝請求頭SendMessageRequestHeader

使用MQClientAPIImpl的sendMessage方法傳送訊息,得到SendResult結果

如果存在sendHook,則執行傳送後回撥動作executeSendMessageHookAfter,當丟擲異常也會執行傳送後回撥動作executeSendMessageHookAfter

傳送完後將訊息體設定回原值(還原Message被壓縮過的body值),為重試做準備

返回SendResult

(8) 更新broker的故障狀態

(7)呼叫remotingclient執行完請求後,可以得到該步的耗時,再根據(6)指出的,如果打開了故障延遲開關,會根據該耗時來確定該broker是否有故障,然後讓其“不可用”一段時間,具體可看(6)的內容。

(9) 判斷結果是否有有效

判斷該次請求是否有效,有效包括:執行過程無異常,(7)正確返回結果,如果無效,則會執行(8)更新broker的故障狀態,然後跳到(5)進行重試

以上就是Producer傳送訊息的主要流程,其中涉及到很多快取資料很多都是上一節客戶端啟動過程中講過的後臺任務同步的。下面附上該部分當時原始碼閱讀過程做的筆記簡圖,該圖描述了Producer傳送訊息的大致過程:

RocketMQ Producer傳送訊息流程

RocketMQ Producer傳送訊息流程

更多原創內容請搜尋微信公眾號:啊駝(doubaotaizi)