如何用多執行緒方式,提高rabbitmq訊息處理效率?

article/details/82012991

問題描述:專案中接收到rabbitmq訊息後,先進行一系列的處理,等所有處理完成後,將訊息推送到前臺,但是在處理訊息的過程中,每個方法中都有與資料庫互動的程式碼,直接導致訊息推送不及時。

單執行緒程式碼模型:

import org。springframework。amqp。core。Message;import org。springframework。amqp。core。MessageListener; public class MqHandler implements MessageListener { //訊息處理 public void onMessage(Message msg) { //處理A dealA(msg); //處理B dealB(msg); //處理C dealC(msg); //處理D dealD(msg); //處理E dealE(msg); //處理F dealF(msg); } public void dealA(Message msg){} public void dealB(Message msg){} public void dealC(Message msg){} public void dealD(Message msg){} public void dealE(Message msg){} public void dealF(Message msg){}}

單執行緒處理圖示:

如何用多執行緒方式,提高rabbitmq訊息處理效率?

解決方案:採用多執行緒的方式,每個執行緒處理一個或多個邏輯,提高CPU的使用率,最佳化訊息響應時間。

多執行緒處理圖示:

如何用多執行緒方式,提高rabbitmq訊息處理效率?

程式碼實現

一:配置spring執行緒池

<!—— 核心執行緒數 ——> <!—— 最大執行緒數 ——> <!—— 佇列最大長度 >=mainExecutor。maxSize ——> <!—— 執行緒池維護執行緒所允許的空閒時間 ——> <!—— 執行緒池對拒絕任務(無執行緒可用)的處理策略 ——>

二:執行緒類

ABProcess:將方法A和方法B中的邏輯交給該執行緒處理

import java。util。ArrayList;import java。util。List; import org。springframework。amqp。core。Message;import org。springframework。beans。factory。annotation。Autowired;import org。springframework。scheduling。concurrent。ThreadPoolTaskExecutor; public class ABProcess implements Runnable { @Autowired private ThreadPoolTaskExecutor taskExecutor; private List ABList = new ArrayList(); // 初始化方法:啟動執行緒 public void init() { taskExecutor。execute(this); } /** * 對外提供新增資料的方法 * ABList是共享資源,主執行緒MqHandler對此進行新增,子執行緒ABProcess對此進行刪除,存線上程安全問題,所以需要加同步 * notify():此方法必須在synchronized修飾的程式碼塊或者方法中使用 * @param msg */ public synchronized void addList(Message msg) { ABList。add(msg); notify(); // 喚醒在此物件監視器(鎖)上等待的單個執行緒, } @Override public void run() { while (true) { try { thread(); //呼叫實現方法 } catch (Exception e) { e。printStackTrace(); } } } /** * 因為涉及共享資源的操作,需要同步 * wait():此方法必須在synchronized修飾的程式碼塊或者方法中使用 * @throws Exception */ public synchronized void thread() throws Exception { if (ABList。size() > 0) { // 判斷集合中是否有訊息 dealA(ABList。get(0)); //方法A dealB(ABList。get(0));//方法B ABList。remove(0); // 處理完後,刪除這條資料 System。out。println(“dealABSuccess”); } else { wait(); // 若集合中沒有訊息,讓執行緒等待, } } public void dealA(Message msg) { } public void dealB(Message msg) { } }

spring配置配置Bean,並初始化init方法

CDProcess:將方法C和方法D的邏輯交給該執行緒處理,具體實現與ABProcess一致

三:修改主執行緒MqHandler邏輯

import org。springframework。amqp。core。Message;import org。springframework。amqp。core。MessageListener;import org。springframework。beans。factory。annotation。Autowired; public class MqHandler implements MessageListener { @Autowired private ABProcess acProcess; @Autowired private CDProcess cdProcess; //訊息處理 public void onMessage(Message msg) { acProcess。addList(msg);//主執行緒將訊息新增到集合,交給子執行緒ABProcess處理 cdProcess。addList(msg);//主執行緒將訊息新增到集合,交給子執行緒CDProcess處理 //E和F邏輯程式碼簡單,直接交給主執行緒 dealE(msg); dealF(msg); } public void dealE(Message msg){} public void dealF(Message msg){}}