訊息重投
“
生產者在傳送訊息時,同步訊息失敗會重投,非同步訊息有重試,oneway 沒有任何保證。訊息重投保證訊息儘可能傳送成功、不丟失,但可能會造成訊息重複,訊息重複在 RocketMQ 中是無法避免的問題。訊息重複在一般情況下不會發生,當出現訊息量大、網路抖動,訊息重複就會是大機率事件。另外,生產者主動重發、consumer 負載變化也會導致重複訊息。如下方法可以設定訊息重試策略:
retryTimesWhenSendFailed: 同步傳送失敗重投次數,預設為 2,因此生產者會最多嘗試傳送 retryTimesWhenSendFailed + 1 次。不會選擇上次失敗的>broker,嘗試向其他 broker 傳送,最大程度保證訊息不丟。超過重投次數,丟擲異常,由客戶端保證訊息不丟。當出現 RemotingException、>MQClientException 和部分 MQBrokerException 時會重投。
retryTimesWhenSendAsyncFailed: 非同步傳送失敗重試次數,非同步重試不會選擇其他 broker,僅在同一個 broker 上做重試,不保證訊息不丟。
retryAnotherBrokerWhenNotStoreOK: 訊息刷盤(主或備)超時或 slave 不可用(返回狀態非 SEND_OK),是否嘗試傳送到其他 broker,預設 false。十分重要訊息可以開啟。
”
此外,只有 普通訊息 具有傳送重試機制,順序訊息是沒有的。
retryTimesWhenSendFailed
同步傳送失敗策略
DefaultMQProducer producer = new DefaultMQProducer(“pg”); producer。setNamesrvAddr(“rocketmqOS:9876”); // 設定同步傳送失敗時重試傳送的次數,預設為 2 次 producer。setRetryTimesWhenSendFailed(3); // 設定傳送超時時限為 5s,預設 3s producer。setSendMsgTimeout(5000);
在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:
透過原始碼可以看到,它的預設值是 2:
retryTimesWhenSendAsyncFailed
非同步傳送失敗策略
DefaultMQProducer producer = new DefaultMQProducer(“pg”); producer。setNamesrvAddr(“rocketmqOS:9876”); // 指定非同步傳送失敗後不進行重試傳送 producer。setRetryTimesWhenSendAsyncFailed(0);
在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:
透過原始碼可以看到,它的預設值也是 2:
retryAnotherBrokerWhenNotStoreOK
訊息刷盤失敗策略
訊息刷盤超時( Master 、 Slave ),預設是不會將訊息嘗試傳送到其他 Broker。對於重要訊息可以透過在 Broker 的配置檔案設定 retryAnotherBrokerWhenNotStoreOK 屬性為 true 來開啟。
在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:
訊息重試
“
Consumer 消費訊息失敗後,要提供一種重試機制,令訊息再消費一次。Consumer 消費訊息失敗通常可以認為有以下幾種情況:
由於訊息本身的原因,例如反序列化失敗,訊息資料本身無法處理(例如話費充值,當前訊息的手機號被登出,無法充值)等。這種錯誤通常需要跳過這條訊息,再消費其它訊息,而這條失敗的訊息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過 10 秒後再重試。
由於依賴的下游應用服務不可用,例如 db 連線不可用,外系統網路不可達等。遇到這種錯誤,即使跳過當前失敗的訊息,消費其他訊息同樣也會報錯。這種情況建議應用 sleep 30s,再消費下一條訊息,這樣可以減輕 Broker 重試訊息的壓力。
RocketMQ 會為每個消費組都設定一個 Topic 名稱為“%RETRY%+consumerGroup”的重試佇列(這裡需要注意的是,這個 Topic 的重試佇列是針對消費組,而不是針對每個 Topic 設定的),用於暫時儲存因為各種異常而導致 Consumer 端無法消費的訊息。考慮到異常恢復起來需要一些時間,會為重試佇列設定多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。RocketMQ 對於重試訊息的處理是先儲存至 Topic 名稱為“SCHEDULE_TOPIC_XXXX”的延遲佇列中,後臺定時任務按照對應的時間進行 Delay 後重新儲存至“%RETRY%+consumerGroup”的重試佇列中。
”
消費者消費某條訊息失敗後,會根據訊息重試機制將該訊息重新投遞,若達到重試次數後訊息還沒有成功被消費,則訊息將被投入死信佇列。一條訊息無論重試多少次,這些重試訊息的 Message ID 不會改變。
suspendCurrentQueueTimeMillis
同步消費(順序訊息)訊息模式下消費失敗後再次消費的時間間隔。 預設值:1000 ms
在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:
順序訊息的重試是無休止的,不間斷的,直至消費成功,所以,對於順序訊息的消費, 務必要保證應用能夠及時監控並處理消費失敗的情況,避免消費被永久性阻塞。
順序訊息沒有傳送失敗重試機制,但具有消費失敗重試機制
MaxReconsumeTimes
無序訊息(包括普通訊息、延時訊息、定時訊息和事務訊息)的最大重試次數可透過自定義引數 MaxReconsumeTimes 取值進行配置。預設值為 16 次,該引數取值無最大限制,建議使用預設值。
間隔時間根據重試次數階梯變化,取值範圍:1 秒~2 小時。不支援自定義配置。
若最大重試次數小於等於 16 次,則間隔時間按照無序訊息重試間隔時間階梯變化。若最大重試次數大於 16 次,則超過 16 次的間隔時間均為 2 小時。
delayLevelWhenNextConsume
非同步消費訊息模式下消費失敗重試策略:
-1, 不重複,直接放入死信佇列
0,broker 控制重試策略
0,client 控制重試策略
預設值:0。
在我們 Spring Cloud Stream + Spring Cloud Alibaba RocketMQ 的 例子的配置裡,我們可以這樣配置:
死信佇列
當一條訊息初次消費失敗,訊息佇列會自動進行消費重試;達到最大重試次數後,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該訊息,此時,訊息佇列不會立刻將訊息丟棄,而是將其傳送到該消費者對應的特殊佇列中。
正常情況下無法被消費的訊息稱為 死信訊息(Dead-Letter Message),儲存死信訊息的特殊佇列稱為 死信佇列(Dead-Letter Queue)。
對於 無序訊息叢集消費 下的重試消費,預設允許每條訊息最多重試 16 次,如果訊息重試 16 次後仍然失敗,訊息將被投遞至 死信佇列
特徵
不會再被消費者正常消費
有效期與正常訊息相同,均為 3 天,3 天后會被自動刪除
特性
一個死信佇列對應一個 Group ID, 而不是對應單個消費者例項。名稱為 %DLQ%consumerGroup@consumerGroup
如果一個 Group ID 未產生死信訊息,則不會為其建立相應的死信佇列
一個死信佇列包含了對應 Group ID 產生的所有死信訊息,不論該訊息屬於哪個 Topic
參考
https://github。com/apache/rocketmq/blob/master/docs/cn/features。md
https://help。aliyun。com/document_detail/43490。html#table-4i1-8kq-6vt
https://github。com/alibaba/spring-cloud-alibaba/blob/rocketmq/spring-cloud-alibaba-docs/src/main/asciidoc-zh/rocketmq-new。adoc
https://www。codeleading。com/article/57335926159/
https://gitbook。cn/books/5d340810c43fe20aeadc88db/index。html