ThinkPHP6+Supervisor實現程序常駐訊息佇列

圖/文:迷神

之前有需要使用tp開發一個訊息佇列功能,用來非同步處理訂單,傳送一些訊息等。因為是使用的是Thinkphp6,訊息佇列我用的thinkphp官方的think-queue訊息佇列,結合 supervisor 程序管理使佇列程序常駐。記錄一下,順便分享給大家。

安裝 thinkphp-queue

composer install thinkphp-queue

儲存訊息環境

thinkphp-queue的訊息儲存環境,可以有多種形式, Redis,Database,Topthink ,Sync這四種驅動。為了速度,我們一般都使用redis,當然你也可以資料庫,建立提供的表結構就行了。

我們這裡使用了Redis,具體安裝,大家可以使用整合系統,省事一些,具體不再多做贅述啦。

在config/queue。php中,將’default’ => ‘sync’改為’default’ => ‘redis’,使用Redis驅動。

ThinkPHP6+Supervisor實現程序常駐訊息佇列

配置如下

新增生產者

修改app/controller/Index。php裡使用Queue::later或者Queue::push釋出任務。

namespace app\controller;use app\BaseController;use think\facade\Queue;class Index extends BaseController{ public function index() { //當前任務將由哪個類來負責處理 $jobHandlerClassName = ‘app\job\Job1’; //業務資料 物件需要手動轉序列化 $jobData = [‘ts’ => time()]; //佇列名稱 $jobQueueName = “OrderJob”; //入佇列,later延時傳送,單位秒。push立即傳送 $isPushed = Queue::later(2, $jobHandlerClassName, $jobData,$jobQueueName); //$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName ); // database 驅動時,返回值為 1|false ; redis 驅動時,返回值為 隨機字串|false if( $isPushed !== false ){ echo ‘執行成功’; }else{ echo ‘執行失敗’; } //php think queue:listen ——queue OrderJob 執行佇列 }}

建立消費者

編寫 消費者類,用於處理 OrderJob 佇列中的任務,並編寫其 fire() 方法

class Job1{ public function fire(Job $job, $data) { //業務處理程式碼,具體不貼出來了 $isJobDone = $this->jobDone($data); //執行成功刪除 if($isJobDone){ $job->delete(); print(“任務已經被執行成功並且刪除”); }else{ $job->release(3); //$delay為延遲時間 表示該任務延遲3秒後再執行 print(“任務3s後再次被執行”); } //透過這個方法可以檢查任務重試了幾次 if ($job->attempts() > 3) { print(“Job has been retried more than 3 times!”); $job->delete(); } } public function failed($data) { // 。。。任務達到最大重試次數後,失敗了 } //job private function jobDone($data){ Log::write(‘這是資料 ’ 。 json_encode($data)); return true; }

出佇列(消費任務)

在專案根目錄執行命令

php think queue:work ——queue OrderJob

supervisor的安裝和配置

yum安裝

# yum install epel-release# yum install supervisor//設定成開機自動啟動# systemctl enable supervisord

/var/supervisor/conf 建立一個 conf配置檔案

[program:queue_worker] ;專案名稱directory = /www/wwwroot/tp6 ; 程式的啟動目錄,專案根目錄的上一級command = php think queue:work ——queue OrderJob ——daemon ; 啟動命令 queueName就是佇列名process_name=%(program_name)s_%(process_num)02dnumprocs = 3 ; 開啟的程序數量autostart = true ; 在 supervisord 啟動的時候也自動啟動startsecs = 5 ; 啟動 5 秒後沒有異常退出,就當作已經正常啟動了autorestart = true ; 程式異常退出後自動重啟startretries = 3 ; 啟動失敗自動重試次數,預設是 3user = root ; 用哪個使用者啟動redirect_stderr = true ; 把 stderr 重定向到 stdout,預設 falsestdout_logfile_maxbytes = 50MB ; stdout 日誌檔案大小,預設 50MBstdout_logfile_backups = 20 ; stdout 日誌檔案備份數; stdout 日誌檔案,需要手動建立目錄(supervisord 會自動建立日誌檔案)stdout_logfile = /var/supervisor/log/queue_worker。logloglevel=info

重啟下

# systemctl restart supervisord

好了,大概就這麼多了。

有問題,歡迎留言。覺得不錯,記得關注迷神筆記哦。