自頂向下學習 RocketMQ(十):訊息重投和訊息重試

訊息重投

生產者在傳送訊息時,同步訊息失敗會重投,非同步訊息有重試,oneway 沒有任何保證。訊息重投保證訊息儘可能傳送成功、不丟失,但可能會造成訊息重複,訊息重複在 RocketMQ 中是無法避免的問題。訊息重複在一般情況下不會發生,當出現訊息量大、網路抖動,訊息重複就會是大機率事件。另外,生產者主動重發、consumer 負載變化也會導致重複訊息。如下方法可以設定訊息重試策略:

retryTimesWhenSendFailed: 同步傳送失敗重投次數,預設為 2,因此生產者會最多嘗試傳送 retryTimesWhenSendFailed + 1 次。不會選擇上次失敗的>broker,嘗試向其他 broker 傳送,最大程度保證訊息不丟。超過重投次數,丟擲異常,由客戶端保證訊息不丟。當出現 RemotingException、>MQClientException 和部分 MQBrokerException 時會重投。

retryTimesWhenSendAsyncFailed: 非同步傳送失敗重試次數,非同步重試不會選擇其他 broker,僅在同一個 broker 上做重試,不保證訊息不丟。

retryAnotherBrokerWhenNotStoreOK: 訊息刷盤(主或備)超時或 slave 不可用(返回狀態非 SEND_OK),是否嘗試傳送到其他 broker,預設 false。十分重要訊息可以開啟。

此外,只有 普通訊息 具有傳送重試機制,順序訊息是沒有的。

retryTimesWhenSendFailed

同步傳送失敗策略

DefaultMQProducer producer = new DefaultMQProducer(“pg”); producer。setNamesrvAddr(“rocketmqOS:9876”); // 設定同步傳送失敗時重試傳送的次數,預設為 2 次 producer。setRetryTimesWhenSendFailed(3); // 設定傳送超時時限為 5s,預設 3s producer。setSendMsgTimeout(5000);

在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

透過原始碼可以看到,它的預設值是 2:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

retryTimesWhenSendAsyncFailed

非同步傳送失敗策略

DefaultMQProducer producer = new DefaultMQProducer(“pg”); producer。setNamesrvAddr(“rocketmqOS:9876”); // 指定非同步傳送失敗後不進行重試傳送 producer。setRetryTimesWhenSendAsyncFailed(0);

在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

透過原始碼可以看到,它的預設值也是 2:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

retryAnotherBrokerWhenNotStoreOK

訊息刷盤失敗策略

訊息刷盤超時( Master 、 Slave ),預設是不會將訊息嘗試傳送到其他 Broker。對於重要訊息可以透過在 Broker 的配置檔案設定 retryAnotherBrokerWhenNotStoreOK 屬性為 true 來開啟。

在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

訊息重試

Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer 消費訊息失敗通常可以認為有以下幾種情況:

由於訊息本身的原因,例如反序列化失敗,訊息資料本身無法處理(例如話費充值,當前訊息的手機號被登出,無法充值)等。這種錯誤通常需要跳過這條訊息,再消費其它訊息,而這條失敗的訊息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過 10 秒後再重試。

由於依賴的下游應用服務不可用,例如 db 連線不可用,外系統網路不可達等。遇到這種錯誤,即使跳過當前失敗的訊息,消費其他訊息同樣也會報錯。這種情況建議應用 sleep 30s,再消費下一條訊息,這樣可以減輕 Broker 重試訊息的壓力。

RocketMQ 會為每個消費組都設定一個 Topic 名稱為“%RETRY%+consumerGroup”的重試佇列(這裡需要注意的是,這個 Topic 的重試佇列是針對消費組,而不是針對每個 Topic 設定的),用於暫時儲存因為各種異常而導致 Consumer 端無法消費的訊息。考慮到異常恢復起來需要一些時間,會為重試佇列設定多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ 對於重試訊息的處理是先儲存至 Topic 名稱為“SCHEDULE_TOPIC_XXXX”的延遲佇列中,後臺定時任務按照對應的時間進行 Delay 後重新儲存至“%RETRY%+consumerGroup”的重試佇列中。

消費者消費某條訊息失敗後,會根據訊息重試機制將該訊息重新投遞,若達到重試次數後訊息還沒有成功被消費,則訊息將被投入死信佇列。一條訊息無論重試多少次,這些重試訊息的 Message ID 不會改變。

suspendCurrentQueueTimeMillis

同步消費(順序訊息)訊息模式下消費失敗後再次消費的時間間隔。 預設值:1000 ms

在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

順序訊息的重試是無休止的,不間斷的,直至消費成功,所以,對於順序訊息的消費, 務必要保證應用能夠及時監控並處理消費失敗的情況,避免消費被永久性阻塞。

順序訊息沒有傳送失敗重試機制,但具有消費失敗重試機制

MaxReconsumeTimes

無序訊息(包括普通訊息、延時訊息、定時訊息和事務訊息)的最大重試次數可透過自定義引數 MaxReconsumeTimes 取值進行配置。預設值為 16 次,該引數取值無最大限制,建議使用預設值。

間隔時間根據重試次數階梯變化,取值範圍:1 秒~2 小時。不支援自定義配置。

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

若最大重試次數小於等於 16 次,則間隔時間按照無序訊息重試間隔時間階梯變化。若最大重試次數大於 16 次,則超過 16 次的間隔時間均為 2 小時。

delayLevelWhenNextConsume

非同步消費訊息模式下消費失敗重試策略:

-1, 不重複,直接放入死信佇列

0,broker 控制重試策略

0,client 控制重試策略

預設值:0。

在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:

自頂向下學習 RocketMQ(十):訊息重投和訊息重試

死信佇列

當一條訊息初次消費失敗,訊息佇列會自動進行消費重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該訊息,此時,訊息佇列不會立刻將訊息丟棄,而是將其傳送到該消費者對應的特殊佇列中。

正常情況下無法被消費的訊息稱為 死信訊息(Dead-Letter Message),儲存死信訊息的特殊佇列稱為 死信佇列(Dead-Letter Queue)。

對於 無序訊息叢集消費 下的重試消費,預設允許每條訊息最多重試 16 次,如果訊息重試 16 次後仍然失敗,訊息將被投遞至 死信佇列

特徵

不會再被消費者正常消費

有效期與正常訊息相同,均為 3 天,3 天后會被自動刪除

特性

一個死信佇列對應一個 Group ID, 而不是對應單個消費者例項。名稱為 %DLQ%consumerGroup@consumerGroup

如果一個 Group ID 未產生死信訊息,則不會為其建立相應的死信佇列

一個死信佇列包含了對應 Group ID 產生的所有死信訊息,不論該訊息屬於哪個 Topic

參考

https://github。com/apache/rocketmq/blob/master/docs/cn/features。md

https://help。aliyun。com/document_detail/43490。html#table-4i1-8kq-6vt

https://github。com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new。adoc

https://www。codeleading。com/article/57335926159/

https://gitbook。cn/books/5d340810c43fe20aeadc88db/index。html