1 步骤1:创建测试表
首先,在数据库中创建如下的两张表,tp51_test表用于保存要处理的数据,tp51_jobs表用于保存信息队列信息,tp51_jobs表各个字段名不能更改,也不能删除,tp51_test表则可以修改字段名或删除字段名。
// 测试表
CREATE TABLE `tp51_test` (
    `id` int(11) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID序号',
    `order_no` varchar(50) NOT NULL COMMENT '订单号',
    `msg` varchar(255) NOT NULL COMMENT '消息内容',
    `status` tinyint(1) NOT NULL DEFAULT '1' COMMENT '状态 1未执行,2执行',
    `create_time` datetime DEFAULT NULL COMMENT '创建时间',
    `update_time` datetime DEFAULT NULL COMMENT '更新时间',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='测试表';
// 消息队列表
CREATE TABLE `tp51_jobs` (
    `id` int(11) NOT NULL AUTO_INCREMENT COMMENT 'ID序号',
    `queue` varchar(255) NOT NULL COMMENT '队列名称',
    `payload` longtext NOT NULL COMMENT '存储任务数据,可以是任何格式,如JSON或序列化的字符串',
    `attempts` tinyint(3) unsigned NOT NULL COMMENT '尝试次数,如果任务失败,则此值会增加',
    `reserved` tinyint(3) unsigned NOT NULL COMMENT '是否已被消费(即正在处理),0 表示未被消费,非0表示已被消费',
    `reserved_at` int(10) unsigned DEFAULT NULL COMMENT '记录任务被取出的时间戳',
    `available_at` int(10) unsigned NOT NULL COMMENT '记录任务可被执行的时间戳',
    `created_at` int(10) unsigned NOT NULL COMMENT '记录任务创建的时间戳',
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;2 步骤2:安装消息队列
在tp5.1项目的根目录下,执行如下的命令,用于安装think-queue信息队列插件,注意版本
tp50框架: composer require topthink/think-queue:2.*
tp51框架: composer require topthink/think-queue:2.*
tp6*框架:composer require topthink/think-queue:3.*3 步骤3:配置信息队列
安装队列插件之后,在tp5.1的【根目录/config】目录下,会自动生成queue.php队列文件,接着,需要对该文件,进行如下的参数配置:
return [
    // 'connector'  => 'Sync'
    'connector'     => 'Database',   // 数据库驱动
    'expire'        => 60,           // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'       => 'default',    // 默认的队列名称
    'table'         => 'jobs',       // 存储消息的表名,不带前缀
    'dsn'           => [],
    // 'connector'     => 'Redis',     // Redis 驱动
    // 'expire'        => 60,          // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 
    // 'default'       => 'default',   // 默认的队列名称
    // 'host'          => '127.0.0.1', // redis 主机ip
    // 'port'          => 6379,        // redis 端口
    // 'password'      => '',          // redis 密码
    // 'select'        => 0,           // 使用哪一个 db,默认为 db0
    // 'timeout'       => 0,           // redis连接的超时时间
    // 'persistent'    => false,       // 是否是长连接
];
4 步骤4:创建且推送消息
接着,在控制器方法中,进行消息的创建与推送
<?php
namespace app\index\controller;
use think\Controller;
use think\Queue;
use think\Db;
class Index extends Controller
{
    // 消息的创建与推送
    public function queueTest()
    {
        // 消息的创建
        $data = [
            'order_no' => rand(100000,999999)
        ];
        $this->add($data['order_no']);
        // 消息的推送
        $cname = 'app\job\Job1';
        $data  = json_encode($data);
        $qname = 'firstQueue';
        // 将该任务推送到消息队列,等待对应的消费者去执行
        // 参数1:表示当前任务将由哪个类来负责处理,当轮到该任务时,系统将生成一个该类的实例,并调用其 fire 方法
        // 参数2:当前任务所需的业务数据 . 不能为resource类型,其他类型最终将转化为json形式的字符串
        // 参数3:当前任务归属的队列名称,如果为新队列,会自动创建
        $res  = Queue::push($cname, $data, $qname);
        
        // 结果处理:database 驱动时,返回值为 1|false; redis驱动时,返回值为 随机字符串|false
        if ($res !== false) {  
            echo '已成功将任务推送到信息队列中';
        } else {
            echo '推送失败';
        }
    }
    // 把业务数据添加到数据库
    public function add($orderNo)
    {
        Db::name('test')->insert([
            'order_no'      => $orderNo,
            'msg'           => $orderNo,
            'status'        => 1,
            'create_time'   => date('Y-m-d H:i:s'),
            'update_time'   => date('Y-m-d H:i:s')
        ]);
    }
}5 步骤5:创建队列处理类
接着,定义消息队列处理类
<?php
namespace app\job;
use think\Controller;
use think\Db;
use think\queue\Job;
class Job1 extends Controller
{
    // fire方法是消息队列默认调用的方法
    // 参数1:当前的任务对象
    // 参数2:发布任务时自定义的数据
    public function fire(Job $job, $data)
    {
        // 步骤1:先把数据解析为数组
        $data = json_decode($data, true);
        // 步骤2:判断当前任务是否已经被执行,因为有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDoJobStatus($data);
        if (!$isJobStillNeedToBeDone) {
            // 如果该任务已经被执行, 则删除任务
            $job->delete();
        }
        // 步骤3:如果该任务没有被执行,则执行任务,即进行实际的业务处理
        $isJobDone = $this->doJob($data);
        if ($isJobDone) {
            // 如果任务执行成功, 记得删除任务
            $job->delete();
        } else {
            // 通过这个方法可以检查这个任务已经重试了几次了,例如任务轮询4次后删除
            if ($job->attempts() > 3) {
                // 第1种处理方式:重新发布这个任务,例如如下表示该任务延迟10秒后再执行
                // $job->release(10);
                // 第2种处理方式:删除任务
                $job->delete();
            }
        }
    }
    // 有些消息在到达消费者时,可能已经不再需要执行了
    // 参数:发布任务时自定义的数据
    // 返回:任务执行的结果,已成功执行返回true, 否则返回false
    public function checkDoJobStatus($data)
    {
        $status = Db::name('test')->where('order_no', $data['order_no']) -> value('status');
        if ($status == 2) {
            return true;
        } else {
            return false;
        }
    }
    
    // 根据消息中的数据进行实际的业务处理
    // 参数:发布任务时自定义的数据
    // 返回:任务执行的结果
    public function doJob($data)
    {
        $res = Db::name('test')->where('order_no', $data['order_no'])->update(['status' => 2]);
        if (!empty($res)) {
            return true;
        } else {
            return false;
        }
    }
}6 步骤6:发布任务
在浏览器中访问:http://域名/index/index/queueTest,用于创建消息,并将信息推送到队列中。

此时,查看数据库的那两个表,看看数据:


7 步骤7:处理任务
处理任务(即消息的消费与删除),需要在项目的根目录下,执行如下的命令:
# firstQueue为自定义的队列名称
php think queue:work  --queue firstQueue8 步骤8:查看数据表


9 步骤9:结束
到此,消息队列演示完成,另外消息队列的常用命令如下:
(1) 默认队列
php think queue:listen                 // 监听 开发环境用(常用)
php think queue:work                   // 只执行一次
nohup php think queue:work --daemon &  // 守护进程,多次执行(常用)(2) 指定队列
php think queue:listen --queue 队列名称                  // 监听,开发环境用,一次性执行指定队列的所有任务(常用)
php think queue:work  --queue 队列名称                   // 只执行一次,执行指定队列的第一个任务
nohup php think queue:work  --queue 队列名称 --daemon  & // 守护进程,多次执行, 一次性执行指定队列的所有任务(常用)
步骤10:常驻进程即supervisor安装(可选)
安装Supervisor作用:可以使think-queue队列,在后台以守护进程的方式一直运行
步骤1:安装Supervisor
yum install -y supervisor
步骤2:查看Supervisor安装位置,supervisor安装完成后,会在/usr/bin下生成三个执行程序:supervisortd、supervisorctl、echo_supervisord_conf,分别是supervisor的守护进程服务(用于接收进程管理命令)、客户端(用于和守护进程通信,发送管理进程的指令)、生成初始配置文件程序
whereis supervisord
whereis echo_supervisord_conf
whereis supervisorctl
步骤3:修改配置文件,【vi /etc/supervisord.conf】,修改内容如下:
[include]
files = supervisord.d/*.ini
步骤4:自定义待守护进程配置文件,在 /etc/supervisord.d 下创建以.ini 后缀的文件,编辑如下配置:
[program:pdfQueue] ;程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
command=php /www/wwwroot/templet.tik-im.com/think queue:work --daemon --queue pdfQueue
autostart=true ;在 supervisord 启动的时候也自动启动
autorestart=true ; 程序异常退出后自动重启
user=root ;用哪个用户启动
redirect_stderr=true ;把 stderr 重定向到 stdout,默认 false
stdout_logfile_maxbytes=20MB ;stdout 日志文件大小,默认 50MB
stdout_logfile_backups=20 ;stdout 日志文件备份数
stderr_logfile=/www/wwwroot/templet.tik-im.com/worker_err.log ; 错误日志文件
stdout_logfile=/www/wwwroot/templet.tik-im.com/worker.log  ;输出日志文件
具体配置如下:
[program:pdfQueue]
command=php /www/wwwroot/web_management/think queue:work --daemon --queue pushQueue
autostart=true
autorestart=true
user=root
redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=20
stderr_logfile=/www/wwwroot/web_management/runtime/worker_err.log
stdout_logfile=/www/wwwroot/web_management/runtime/worker.log
步骤5:Surpervisor的启动
# supervisord二进制启动
supervisord -c /etc/supervisord.conf
# 检查进程
ps aux | grep supervisord
# 更新Supervisor
supervisorctl update
# 关闭Supervisor
supervisorctl shutdown 智享笔记
								    智享笔记								 
                             
                             
                             
                            