Redis 中如何實現的訊息佇列?實現的方式有幾種

Redis 中實現訊息佇列的方式有幾種

1、使用 List 型別實現

2、使用 ZSet 型別實現

3、使用釋出訂閱者模式實現訊息佇列;

4、使用 Stream 實現訊息佇列。

幾種訊息佇列具體使用和優缺點

1、List 型別實現

的方式最為簡單和直接,它主要是透過 lpush、rpop 存入和讀取實現訊息佇列的,如下圖所示:

Redis 中如何實現的訊息佇列?實現的方式有幾種

lpush 可以把最新的訊息儲存到訊息佇列(List 集合)的首部,而 rpop 可以讀取訊息佇列的尾部,這樣就實現了先進先出,如下圖所示:

Redis 中如何實現的訊息佇列?實現的方式有幾種

優點:

使用 List 實現訊息佇列的優點是訊息可以被持久化,List 可以藉助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用於把資料儲存至磁碟,這樣當 Redis 重啟之後,訊息不會丟失。

缺點:

但使用 List 同樣存在一定的問題,比如訊息不支援重複消費、沒有按照主題訂閱的功能、不支援消費訊息確認等。

2、ZSet 實現訊息佇列

:它是利用 zadd 和 zrangebyscore 來實現存入和讀取訊息的。

優點:

同樣具備持久化的功能

缺點:

List 存在的問題它也同樣存在,不但如此,使用 ZSet 還不能儲存相同元素的值。因為它是有序集合,有序集合的儲存元素值是不能重複的,但分值可以重複,也就是說當訊息值重複時,只能儲存一條資訊在 ZSet 中。

3、釋出訂閱

:使用釋出和訂閱的型別,我們可以實現主題訂閱的功能,也就是 Pattern Subscribe 的功能。因此我們可以使用一個消費者“queue_*”來訂閱所有以“queue_”開頭的訊息佇列,如下圖所示:

Redis 中如何實現的訊息佇列?實現的方式有幾種

優點:

可以按照主題訂閱方式

缺點:

a、無法持久化儲存訊息,如果 Redis 伺服器宕機或重啟,那麼所有的訊息將會丟失;

b、釋出訂閱模式是“發後既忘”的工作模式,如果有訂閱者離線重連之後就不能消費之前的歷史訊息;

c、不支援消費者確認機制,穩定性不能得到保證,例如當消費者獲取到訊息之後,還沒來得及執行就宕機了。因為沒有消費者確認機制,Redis 就會誤以為消費者已經執行了,因此就不會重複傳送未被正常消費的訊息了,這樣整體的 Redis 穩定性就被沒有辦法得到保障了。

4、Stream 型別實現

:使用 Stream 的 xadd 和 xrange 來實現訊息的存入和讀取了,並且 Stream 提供了 xack 手動確認訊息消費的命令,用它我們就可以實現消費者確認的功能了,使用命令如下:

127。0。0。1:6379> xack mq group1 1580959593553-0(integer) 1

消費確認增加了訊息的可靠性,一般在業務處理完成之後,需要執行 ack 確認訊息已經被消費完成,整個流程的執行如下圖所示:

Redis 中如何實現的訊息佇列?實現的方式有幾種

其中“Group”為群組,消費者也就是接收者需要訂閱到群組才能正常獲取到訊息。

在 Java 程式碼中使用 List 實現訊息佇列會有什麼問題?應該如何解決?

先看程式碼部分實現:

import redis。clients。jedis。Jedis;publicclass ListMQTest { public static void main(String[] args){ // 啟動一個執行緒作為消費者 new Thread(() -> consumer())。start(); // 生產者 producer(); } /** * 生產者 */ public static void producer() { Jedis jedis = new Jedis(“127。0。0。1”, 6379); // 推送訊息 jedis。lpush(“mq”, “Hello, List。”); } /** * 消費者 */ public static void consumer() { Jedis jedis = new Jedis(“127。0。0。1”, 6379); // 消費訊息 while (true) { // 獲取訊息 String msg = jedis。rpop(“mq”); if (msg != null) { // 接收到了訊息 System。out。println(“接收到訊息:” + msg); } } }}

可以看出以上消費者的實現是透過 while 無限迴圈來獲取訊息,但如果訊息的空閒時間比較長,一直沒有新任務,而 while 迴圈不會因此停止,

它會一直執行迴圈的動作,這樣就會白白浪費了系統的資源。

解決辦法:藉助 Redis 中的

阻塞讀

來替代 rpop 的方法就可以解決此問題

import redis。clients。jedis。Jedis;public class ListMQExample { public static void main(String[] args) throws InterruptedException { // 消費者 new Thread(() -> bConsumer())。start(); // 生產者 producer(); } /** * 生產者 */ public static void producer() throws InterruptedException { Jedis jedis = new Jedis(“127。0。0。1”, 6379); // 推送訊息 jedis。lpush(“mq”, “Hello, Java。”); Thread。sleep(1000); jedis。lpush(“mq”, “message 2。”); Thread。sleep(2000); jedis。lpush(“mq”, “message 3。”); } /** * 消費者(阻塞版) */ public static void bConsumer() { Jedis jedis = new Jedis(“127。0。0。1”, 6379); while (true) { // 阻塞讀 for (String item : jedis。brpop(0,“mq”)) { // 讀取到相關資料,進行業務處理 System。out。println(item); } } }}

使用

brpop

替代

rpop

來讀取最後一條訊息,就可以解決 while 迴圈在沒有資料的情況下,一直迴圈消耗系統資源的情況了。brpop 中的 b 是 blocking 的意思,表示阻塞讀,也就是當佇列沒有資料時,它會進入休眠狀態,當有資料進入佇列之後,它才會“甦醒”過來執行讀取任務,這樣就可以解決 while 迴圈一直執行消耗系統資源的問題了。

在程式中如何使用 Stream 來實現訊息佇列

在開始實現訊息佇列之前,我們必須先建立分組才行,因為消費者需要關聯分組資訊才能正常執行,具體實現程式碼如下:

import com。google。gson。Gson;import redis。clients。jedis。Jedis;import redis。clients。jedis。StreamEntry;import redis。clients。jedis。StreamEntryID;import utils。JedisUtils;import java。util。AbstractMap;import java。util。HashMap;import java。util。List;import java。util。Map;public class StreamGroupExample { private static final String _STREAM_KEY = “mq”; // 流 key private static final String _GROUP_NAME = “g1”; // 分組名稱 private static final String _CONSUMER_NAME = “c1”; // 消費者 1 的名稱 private static final String _CONSUMER2_NAME = “c2”; // 消費者 2 的名稱 public static void main(String[] args) { // 生產者 producer(); // 建立消費組 createGroup(_STREAM_KEY, _GROUP_NAME); // 消費者 1 new Thread(() -> consumer())。start(); // 消費者 2 new Thread(() -> consumer2())。start(); } /** * 建立消費分組 * @param stream 流 key * @param groupName 分組名稱 */ public static void createGroup(String stream, String groupName) { Jedis jedis = JedisUtils。getJedis(); jedis。xgroupCreate(stream, groupName, new StreamEntryID(), true); } /** * 生產者 */ public static void producer() { Jedis jedis = JedisUtils。getJedis(); // 新增訊息 1 Map map = new HashMap<>(); map。put(“data”, “redis”); StreamEntryID id = jedis。xadd(_STREAM_KEY, null, map); System。out。println(“訊息新增成功 ID:” + id); // 新增訊息 2 Map map2 = new HashMap<>(); map2。put(“data”, “java”); StreamEntryID id2 = jedis。xadd(_STREAM_KEY, null, map2); System。out。println(“訊息新增成功 ID:” + id2); } /** * 消費者 1 */ public static void consumer() { Jedis jedis = JedisUtils。getJedis(); // 消費訊息 while (true) { // 讀取訊息 Map。Entry entry = new AbstractMap。SimpleImmutableEntry<>(_STREAM_KEY, new StreamEntryID()。UNRECEIVED_ENTRY); // 阻塞讀取一條訊息(最大阻塞時間120s) List>> list = jedis。xreadGroup(_GROUP_NAME, _CONSUMER_NAME, 1, 120 * 1000, true, entry); if (list != null && list。size() == 1) { // 讀取到訊息 Map content = list。get(0)。getValue()。get(0)。getFields(); // 訊息內容 System。out。println(“Consumer 1 讀取到訊息 ID:” + list。get(0)。getValue()。get(0)。getID() + “ 內容:” + new Gson()。toJson(content)); } } } /** * 消費者 2 */ public static void consumer2() { Jedis jedis = JedisUtils。getJedis(); // 消費訊息 while (true) { // 讀取訊息 Map。Entry entry = new AbstractMap。SimpleImmutableEntry<>(_STREAM_KEY, new StreamEntryID()。UNRECEIVED_ENTRY); // 阻塞讀取一條訊息(最大阻塞時間120s) List>> list = jedis。xreadGroup(_GROUP_NAME, _CONSUMER2_NAME, 1, 120 * 1000, true, entry); if (list != null && list。size() == 1) { // 讀取到訊息 Map content = list。get(0)。getValue()。get(0)。getFields(); // 訊息內容 System。out。println(“Consumer 2 讀取到訊息 ID:” + list。get(0)。getValue()。get(0)。getID() + “ 內容:” + new Gson()。toJson(content)); } } }}

以上程式碼執行結果如下:

訊息新增成功 ID:1580971482344-0訊息新增成功 ID:1580971482415-0Consumer 1 讀取到訊息 ID:1580971482344-0 內容:{“data”:“redis”}Consumer 2 讀取到訊息 ID:1580971482415-0 內容:{“data”:“java”}

其中,jedis。xreadGroup() 方法的第五個引數 noAck 表示是否自動確認訊息,如果設定 true 收到訊息會自動確認 (ack) 訊息,否則需要手動確認。

可以看出,同一個分組內的多個 consumer 會讀取到不同訊息,不同的 consumer 不會讀取到分組內的同一條訊息。