标签 queue 下的文章

参考原作者文章:《thinkphp5.1应用rabbitMq》
https://blog.csdn.net/qq_42137846/article/details/125842245

为了验证实践过程,我做了相应的改动。

RabbitMQ的安装请参考《centos7.5 安装RabbitMQ Erlang》

△配置Rabbit文件

路径如下:

application|extra|rabbitmq.php

代码如下

<?php
// +----------------------------------------------------------------------
// | rabbitMQ 示例配置
// 
//   Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
//                      fanout:所有bind到此exchange的queue都可以接收消息
//                      direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
//                      topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
//                      headers:通过headers 来决定把消息发给哪些queue(这个很少用)
// +----------------------------------------------------------------------
return [
    // 连接信息
    'AMQP' => [
        'host'     => '192.168.66.64',
        'port'     => '5672',
        'username' => 'admin',
        'password' => 'admin',
        'vhost'    => '/'
    ],
    //测试队列:direct 路由模式
    'test_queue' => [
        'exchange_name' => 'test_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'test_queue',       //队列名称
        'route_key'     => 'test',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'test'              //消费标签
    ],
 
    //订单队列:direct 路由模式
    'order_queue' => [
        'exchange_name' => 'order_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'order_queue',       //队列名称
        'route_key'     => 'order',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'order'              //消费标签
    ],
    //...等等其它队列
];

△生产者
路径如下:
application|common|controller|MqProducer.php

代码如下:

<?php
namespace app\common\controller;
use think\Config;
//引入rabbitMq所需类
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MqProducer
{
    /**
     * 生产者
     * @param $data 控制器传来的数据
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \Exception
     */
    public function publish($data, $name)
    {
        //获取rabbitMq配置(刚刚在config文件下配置的rabbitmq信息)
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');
 
        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );
 
        //建立通道
        $channel = $connection->channel();
 
        //初始化交换机  
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);
 
        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);
 
        //绑定队列到交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //生成消息(json格式传输)
        $msg = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
 
        //推送消息到交换机
        $channel->basic_publish($msg, $amqpDefail['exchange_name'], $amqpDefail['route_key']);
 
        //关闭通道
        $channel->close();
 
        //断开连接
        $connection->close();
    }
}

△消费者
文件路径:
application|common|controller|MqConsumer.php
代码如下:

<?php

namespace app\common\controller;
 
use think\Config;
use think\Log;
//引入rabbitMq所需类
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
 
class MqConsumer
{
    /**
     * 消费者(路由模式)
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \ErrorException
     */
    public function consumer($name)
    {
        //获取rabbitMq配置
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');
 
        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );
 
        //建立通道
        $channel = $connection->channel();
 
        //流量控制(也被称为公平调度,这里设置一个消费者一次只处理200条消息)
        $channel->basic_qos(null, 200, null);
 
        //初始化交换机
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);
 
        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);
 
        //绑定队列与交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //消费消息
        $channel->basic_consume($amqpDefail['queue_name'], $amqpDefail['consumer_tag'], false, false, false, false, [$this, 'callback']);
        
        //注册退出函数行为(其实用不到,因为要保证消费者一直运行,所以不能断开连接))
        // register_shutdown_function([$this,'shutdown'], $channel, $connection);
 
        //消息未处理完毕时,循环监听并一直处理上方callback方法的逻辑
        $channel->consume();
 
    }
    
    /**
     * 回调后消息处理(业务逻辑放置此处)
     * @param  $msg
     */
    public function callback($msg)
    {
        //$msg->body  通过消费者传来的控制器接收的数据
        $data = json_decode($msg->body,true);//拿到数据

        Log::write('拿到的消费者数据为:'.json_encode($data,320),'info');
        //这里我拿来做了一个插库的动作
        /*app('weloginper')->create([
            'ToUserName' => 'test',
            'FromUserName' => 'test',
            'CreateTime' => 'test',
            'MsgType' => 'test',
            'Event' => 'test',
            'EventKey' => 'test',
            'phone' => 'test',
        ]);*/
        //如果有个值是quit,则让该消费者停止消费
        /*if($data['aa'] == 'quit'){
            $msg->getChannel()->basic_cancel($msg->getConsumerTag());
        }*/
        $msg->ack();  //消息应答:这波200条消息处理完毕后进行消息确认,告诉mq可以开始发下一波200条消息了
    }
 
    
    
    /**
     * @param $channel 信道
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
     */
    function shutdown($channel, $connection)
    {
        $channel->close(); //关闭通道
        $connection->close(); //断开连接
    }
 
 
}

生产者业务动作
目的:模拟生成消息
文件路径:application|common|controller|Index.php
代码如下:

<?php
 
namespace app\common\controller;
 
use think\Console;
use app\common\controller\MqProducer;
 
class Index extends MqProducer
{
    public function index()
    {
         $page = input('get.page') ? input('get.page') : 1;//得到初始页码
         $data = [
                'page' => $page,  //当前页
         ];
         if($page <= 100){  //当前不是第10页则一直调用刚刚定义的生产者publish去生产消息
             self::publish($data, 'order');
             $url = url('/common/index/index')  . '?page=' . ++$page;
             die("<script>location.href='{$url}';</script>");
         }
         
         return json([
            'code' => 200,
            'msg' => 'order队列推送成功'
         ]);
    }
 
 
 
}

配置下config

config文件默认禁止访问common模块了

// 禁止访问模块 原来的
//'deny_module_list'       => ['common', 'admin'],
//禁止访问模块 修改后的
'deny_module_list'       => ['admin'],

生成消息
访问 xx.com/common/index/index
生成消息.png

查看RabbitMQ

4.png

消费者消费

命令行:think php order
10.png

再查看RabbitMq 发现消息被消费了。
2501.png

tp5的queue服务安装,请参考composer 安装的方法!这里不再累述。本文主要是针对开发【开启服务】和【关闭服务】

本文仅支持在linux上使用。

根据linux的脚本实例,如:

kill -9 11340;
kill -9 8825;

linux多命令的执行使用英文;隔开。这个就为关闭多个queue服务提供了条件。

设计思路:

一、创建进程,写入进程号【开启服务】

队列: OrderUnPay ---对未支付的订单进行延时作废。

脚本命令如下:

"cd /www/wwwroot/saas.com/; php think queue:work --queue OrderUnPay --daemon > /dev/null 2> /dev/null & echo true"

php exec 执行以上linux脚本后。

使用命令

ps -aux | grep queue

810.png

可以看到里面的15865进程ID

那么如何才能将这个进程ID 15862输出,并写入数据库,方便后面的【关闭服务】

ps -aux|grep OrderUnPay |awk '{print $2}' | head -1

可以使用以上这个命令,通过关键词搜索,并倒序拿出当前的结果

以下php代码,进程ID就可以通过$out[0]

$pid_sq="ps -aux|grep OrderUnPay |awk '{print $2}' | head -1";
$process=exec($pid_sq,$out);
$process_id=isset($out[0])?$out[0]:0;

二.使用多个kill -9 制作【关闭服务】

kill -9 15862;

kill -9 ....;


以下是我tp下的【单个开启服务】和【关闭全部服务】功能

    #关闭服务
public function closed(){
     $out=array();
     $message='关闭服务失败!';
     $output='';
    if(strpos(PHP_OS,"Linux")!==false){
      #Linux套字
      $map['process_id']=array('gt',0);
      $process_ids=$this->model->where($map)->column('process_id');
      if(!empty($process_ids)){
          $exec_power ='';
        foreach($process_ids as $k=>$v){
           $exec_power .="kill -9 ".$v." ;"; 
        }
        $exec_power .=" echo true";
        #exit($exec_power);
        $message = exec($exec_power,$output, $ret);
      }else{
        $message ='没有进程数据';
      }
      
      #$exec_power ="kill -s 9 `ps -aux | grep queue| grep -v grep | awk '{print $2}'` & echo true";
    }else if(strpos(PHP_OS,"WIN")!==false){
      #Win套字
      $exec_power ='cd /d 2>&1'. ROOT_PATH .' & php think queue:restart';
      $message = exec($exec_power,$output, $ret);
    }else{
      $this->error('未识别的操作系统');
    }
    #命令执行
    #$message = shell_exec($exec_power);

    if(count($output)>0){
      foreach($output as $k=>$v){
        $info=iconv('GB2312', 'UTF-8',$v);
        $out[$k]=$info;
        $message.=$info;
      }
    }
    #返回结果
    if(!$ret){    
      $this->model->where(array('status'=>1))->update(['work_status'=>0,'process_id'=>0]);
      $this->success('关闭服务成功','',$exec_power);
    }else{
       $this->error($message,'');
    }
}
    #启动服务
public function powers(){
     $id =$this->request->param('id');
     $info=$this->model->where(array('id'=>$id))->find();
     $daemon =1;
     #获取标识
     $job_code =$info['job_code'];
     if(!empty($job_code)){
       #判断win还是linux
       if(strpos(PHP_OS,"Linux")!==false){
             #Linux套字
             if($daemon){
             $exec_power ='cd '. ROOT_PATH .'; php think queue:work --queue ' .$job_code.' --daemon > /dev/null 2> /dev/null & echo true';
             }else{
             $exec_power ='cd '. ROOT_PATH .'; php think queue:work --queue ' .$job_code.' & echo true';
             }
        }else if(strpos(PHP_OS,"WIN")!==false){
             #Win套字
             if($daemon){
               $exec_power ='cd /d '. ROOT_PATH .' & start /B php think queue:work --queue ' .$job_code.' --daemon'.' & echo true';
             }else{
               $exec_power ='cd /d "'. ROOT_PATH .'" & php think queue:work --queue ' .$job_code.' & echo true';
             }
        }else{
          $this->error('未识别的操作系统');
        }

        #命令执行
        $message ='';
        $process_id='';
        if($daemon>0){
            if(strpos(PHP_OS,"WIN")!==false){
            #打开一个进程
            $win_pid =popen($exec_power, 'r');
            $ret =fgets($win_pid);
            pclose($win_pid); #对应的是int(1) 
            }else{
            #执行命令
             $ret = shell_exec($exec_power);
             #查看进程
              $pid_sq ="ps -aux|grep $job_code |awk '{print $2}' | head -1";
              $process=exec($pid_sq,$out);
              $process_id=isset($out[0])?$out[0]:0;
            }
        }else{
         $ret = shell_exec($exec_power);
        }
        #获取进程ID
        $process_id=$process_id>0?$process_id:'';
        if($ret){
           $this->model->where(array('id'=>$id))->update(['work_status'=>1,'process_id'=>$process_id]);
           $this->success('启动成功','',$exec_power);    
        }else{
           $this->error('启动失败','',$exec_power);
        }
     }else{
        $this->error('未找到对应的任务标识');
     }
}