RabbitMQ 中訊息傳遞模型的核心思想是生產者從不直接向佇列傳送任何訊息。實際上,生產者通常根本不知道訊息是否會被傳遞到任何佇列。
相反,生產者只能向交換器傳送訊息。交換器的工作是一件非常簡單的事情。一方面它接收來自生產者的訊息,另一方面它將它們推送到佇列中。交換器必須確切地知道如何處理它收到的訊息。是否應該將其附加到特定佇列?它應該附加到許多佇列中嗎?或者它應該被丟棄。其規則由“交換型別”定義。
有幾種可用的交換型別:direct、topic、headers 和fanout。
無名交換器
在本教程的前面部分中,我們對交換一無所知,但仍然能夠將訊息傳送到佇列。這是可能的,因為我們使用的是預設交換,我們透過空字串 (“ ”) 來識別它。
以下程式碼:
channel。basicPublish( “” , “hello” , null , message。getBytes());
第一個引數是交換器的名稱。空字串表示預設或無名交換器。訊息被路由到具有routingKey指定的名稱的佇列(如果存在)。
臨時佇列
在 Java 客戶端中,當我們不向queueDeclare()提供引數時, 我們會建立一個具有生成名稱的非持久、獨佔、自動刪除佇列:
String queueName = channel。queueDeclare()。getQueue();
此時queueName包含一個隨機佇列名稱。例如,它可能看起來像amq。gen-JzTY20BRgKO-HjmUJj0wLg。以下是宣告的臨時佇列:
這種佇列,在接收fanout型別的exchange的資訊時,非常有用。
繫結binding
圖中X(exchange)到queue之間邊線,即為binding,即繫結。
如果沒有宣告繫結關係,則預設exchange會繫結到當宣告的佇列:
如果沒有佇列繫結到交換器,訊息將丟失,但這對我們來說沒關係;如果沒有消費者在監聽,我們可以安全地丟棄訊息。
訊息的過期時間
官網地址:https://www。rabbitmq。com/ttl。html
透過在引數中,透過在命令列設定message-ttl(豪秒)或在程式碼中設定x-message-ttl可以設定訊息的過期時間。
以下宣告放到這個佇列裡面的訊息,將會在60秒後過期。
透過命令列,了可以宣告設定一個佇列的過期時間:
或使用工具類,指定過期時間
示例:
以下示例傳送一個訊息,並設定過期時間為5s,超過5s此資訊沒有被消費將會被MQ丟棄。
//宣告訊息的過期時間為5秒
AMQP
。
BasicProperties prop
=
new
AMQP
。BasicProperties()。builder()
。expiration(
“”
+(
1000
*
5
))。build();
channel
。basicPublish(
“”
,
queueName
,
prop
,
“HelloWorld”
。getBytes(
StandardCharsets
。
UTF_8
));
死信佇列
https://www。rabbitmq。com/dlx。html
什麼時候訊息被認為是死的?
訊息在到達 RabbitMQ 後變得無法傳遞的三種確定情況:
消費者使用basic。reject或 basic。nack否定確認訊息,並將requeue引數設定為false。
訊息由於每條訊息的 TTL而過期;或者
訊息被丟棄,因為它的佇列超過了長度限制
為什麼使用死信交換
附加到死信交換的佇列收集丟棄的訊息,接下來的步驟由您決定。換句話說 - 由您決定如何處理死信佇列中的訊息。如果實施得當,資訊幾乎不會丟失。
當您知道您有可能已被取消但仍需要處理的訊息時,請附加死信交換。當您不能丟失具有過期 TTL 的訊息或佇列可能達到其容量時。
設定 RabbitMQ 死信交換
死信交換與其他交換沒有什麼不同:
只需正常指定交換並將其宣告為佇列的備份:
只需要給正常的佇列指定一個死信交換機及路由key即可以實現過期資料自動被轉存到死信佇列。
程式碼示例1-過期的訊息自動進入死信佇列:
使用生產者來宣告正常佇列到死信佇列的資料流:
String queueName
=
“HelloQueue”
;
try
(
Connection connection
=
ConnUtils
。
newConnection
()){
Channel channel
=
connection
。createChannel();
//宣告一個死信交換機用於接收過期的訊息
channel
。exchangeDeclare(
“dead_exchanger”
,
“direct”
,
true
);
//宣告一個死信佇列,並繫結死信交換機,即接收列信交換機傳遞的資料
channel
。queueDeclare(
“dead_queue”
,
true
,
false
,
false
,
null
);
//繫結死信佇列+死信交換機,並指定一個routingKey
channel
。queueBind(
“dead_queue”
,
“dead_exchanger”
,
“dead_routingKey”
);
//宣告引數,指定訊息出現死信後的去處
Map
<
String
,
Object
>
argus
=
Map
。
of
(
“x-dead-letter-exchange”
,
“dead_exchanger”
,
“x-dead-letter-routing-key”
,
“dead_routingKey”
);
//宣告正常佇列
channel
。queueDeclare(
queueName
,
true
,
false
,
false
,
argus
);
//宣告訊息的過期時間為5秒
AMQP
。
BasicProperties prop
=
new
AMQP
。BasicProperties()。builder()
。expiration(
“”
+(
1000
*
5
))。build();
channel
。basicPublish(
“”
,
queueName
,
prop
,
“HelloWorld”
。getBytes(
StandardCharsets
。
UTF_8
));
log
。info(
“送資訊完成”
);
}
截圖:
注意繫結關係:
測試,將資料傳送到佇列,等待,5秒,因為沒有被消費,所以,訊息自動進入列信佇列,進入死信佇列的訊息,也可以被繫結到這個死信佇列的消費者正常消費(這樣就形成了延遲佇列)。
程式碼示例2-消費者否的訊息自動進入死信佇列:
基於上述程式碼,我們現在取消過期時間的設定,現在透過消費者來否定確認訊息。
注意 :消費者這邊宣告死信佇列的過程,必須與生產者那邊完全相同。然後消費者透過Nack()否定確認訊息,消費即可以進入死信佇列。
完整程式碼:
String queueName
=
“HelloQueue”
;
Connection con
=
ConnUtils
。
newConnection
();
Channel channel
=
con
。createChannel();
//宣告一個死信交換機用於接收過期的訊息
channel
。exchangeDeclare(
“dead_exchanger”
,
“direct”
,
true
);
//宣告一個死信佇列,並繫結死信交換機,即接收列信交換機傳遞的資料
channel
。queueDeclare(
“dead_queue”
,
true
,
false
,
false
,
null
);
//繫結死信佇列+死信交換機,並指定一個routingKey
channel
。queueBind(
“dead_queue”
,
“dead_exchanger”
,
“dead_routingKey”
);
//宣告引數,指定訊息出現死信後的去處
Map
<
String
,
Object
>
argus
=
Map
。
of
(
“x-dead-letter-exchange”
,
“dead_exchanger”
,
“x-dead-letter-routing-key”
,
“dead_routingKey”
);
//宣告正常佇列
channel
。queueDeclare(
queueName
,
true
,
false
,
false
,
argus
);
//接收到資料以後的回撥函式
DeliverCallback callback
= (consumerTag, message) -> {
byte
[]
body
= message。getBody();
String str
=
new
String(
body
);
long
tag
= message。getEnvelope()。getDeliveryTag();
log
。info(
“接收到資訊:{},tagStr:{},tag:{}”
,
str
, consumerTag,
tag
);
//否定確認,且不回原佇列
channel
。basicNack(message。getEnvelope()。getDeliveryTag(),
false
,
false
);
};
//取消處理資訊的回撥
CancelCallback cancelCallback
= consumerTag -> { };
//消費監聽
channel
。basicConsume(
queueName
,
false
,
callback
,
cancelCallback
);
截圖:
程式碼示例3-訊息達到上限:
參考地址:
https://www。cloudamqp。com/blog/part2-rabbitmq-best-practice-for-high-performance。html。
https://www。rabbitmq。com/maxlength。html
RabbitMQ 中的佇列是單執行緒的,一個佇列最多可以處理大約 5 萬條訊息。如果您有多個佇列和消費者,並且如果您有與底層節點上的核心一樣多的佇列,您將在多核系統上實現更好的吞吐量。
RabbitMQ 管理介面收集和計算叢集中每個佇列的指標。如果您有成千上萬的活動佇列和消費者,這可能會減慢伺服器的速度。如果佇列太多,CPU 和 RAM 使用率也可能會受到負面影響。
預設最大佇列長度限制行為
當設定了最大佇列長度或大小並達到最大值時,RabbitMQ 的預設行為是從佇列的前面丟棄或 死信訊息(即佇列中最舊的訊息)。要修改此行為,請使用下面描述的溢位設定。
當佇列的容量已達到上界,再次新增訊息時,則最先放到佇列中的訊息預設會被丟棄。
透過以下設定,可以修改溢位時的策略:
Overflow behaviour can be set by supplying the
x-overflow
queue declaration argument with a string value。 Possible values are
drop-head
(default),
reject-publish
or
reject-publish-dlx.
drop-head : 刪除最舊的訊息,這是預設值。
reject-publish: 不再接收新的訊息。
reject-publish-dlx:與reject-publish不同,會將後釋出的訊息,放到死信佇列。
完整程式碼:
Channel channel
=
connection
。createChannel();
//宣告一個死信交換機用於接收過期的訊息
channel
。exchangeDeclare(
“dead_exchanger”
,
“direct”
,
true
);
//宣告一個死信佇列,並繫結死信交換機,即接收列信交換機傳遞的資料
channel
。queueDeclare(
“dead_queue”
,
true
,
false
,
false
,
null
);
//繫結死信佇列+死信交換機,並指定一個routingKey
channel
。queueBind(
“dead_queue”
,
“dead_exchanger”
,
“dead_routingKey”
);
//宣告引數,指定訊息出現死信後的去處
Map
<
String
,
Object
>
argus
=
Map
。
of
(
“x-dead-letter-exchange”
,
“dead_exchanger”
,
“x-dead-letter-routing-key”
,
“dead_routingKey”
,
“x-max-length”
,
2
,
“x-overflow”
,
“reject-publish-dlx”
);
//宣告正常佇列,設定容量為2條訊息,預設
channel
。queueDeclare(
queueName
,
true
,
false
,
false
,
argus
);
for
(
int
i
=
1
;
i
<=
3
;
i
++) {
//釋出3條
String msg
=
“Hello”
+
i
;
channel
。basicPublish(
“”
,
queueName
,
null
,
msg
。getBytes(
StandardCharsets
。
UTF_8
));
}
log
。info(
“送資訊完成”
);
截圖:
在宣告佇列時,設定x-max-length,x-overflow。
檢視UI,可以看到HelloQueue保留了兩條資訊,而最後釋出的訊息,則直接進入了死信佇列:
重試死信
您可以透過將原始交換附加到死信佇列來建立使用死信佇列的重新傳遞機制。但是,如果不加以檢查,可能會建立無限迴圈的重新傳遞訊息,這可能會阻塞死信佇列。
建立一個單獨的死信佇列來儲存訊息,然後再將它們推送到交換器或將它們路由回原始佇列。
要進一步改進系統,請在訊息正文中包含一個遞增的屬性,指示接收到訊息的次數。這需要在單獨的消費者中處理死信,但最終允許您丟棄訊息或將它們推送到儲存中。