2022年8月

composer安装

composer require topthink/think-worker=1.0.*

4.png

workman服务需要PHP开启某些函数

//使用workerman需要解除以下函数的禁用

stream_socket_server
stream_socket_client
pcntl_signal_dispatch
pcntl_signal
pcntl_alarm
pcntl_fork
posix_getuid
posix_getpwuid
posix_kill
posix_setsid
posix_getpid
posix_getpwnam
posix_getgrnam
posix_getgid
posix_setgid
posix_initgroups
posix_setuid
posix_isatty

在项目根目录创建

server.php

代码如下

<?php
define('APP_PATH', __DIR__ . '/application/');
define('BIND_MODULE','push/Worker');
require __DIR__ . '/thinkphp/start.php';

*创建模块应答

application|push|controller|Worker.php

代码如下

<?php    
namespace app\push\controller;    
use think\worker\Server;
use think\Log;
use think\Controller;
class Worker extends Server
{
    protected $socket = 'websocket://0.0.0.0:2346';    
    /**
     * 收到信息
     * @param $connection
     * @param $data
     */
    public function onMessage($connection, $data)
    {
        $params =$data;
        $data = json_decode($data,true);
        if ($data['sign']){
            $connection->send($data['sign']);
        }else{
            $return_data ="我于".date("Y-m-d H:i:s")."收到您的信息:".$params;
            $connection->send($return_data);
        }
    }    
    /**
     * 当连接建立时触发的回调函数
     * @param $connection
     */
    public function onConnect($connection)
    {    
    }    
    /**
     * 当连接断开时触发的回调函数
     * @param $connection
     */
    public function onClose($connection)
    {            
    }    
    /**
     * 当客户端的连接上发生错误时触发
     * @param $connection
     * @param $code
     * @param $msg
     */
    public function onError($connection, $code, $msg)
    {
        echo "error $code $msg\n";
    }    
    /**
     * 每个进程启动
     * @param $worker
     */
    public function onWorkerStart($worker)
    {    
        Log::write('进程启动了:'.date('Y-m-d H:i:s'),'info');
    }

    public function index(){
        parent::start();
    }
}

启动php服务进程
78.png

php  server.php start #启动 |stop 停止|restart重启

Nginx配置反向代理

在server内添加

location /wss {
          proxy_pass http://127.0.0.1:2346/;
          proxy_http_version 1.1;
          proxy_set_header Upgrade $http_upgrade;
          proxy_set_header Connection "upgrade";
          proxy_set_header Host $host;
          proxy_set_header X-Real-IP $remote_addr;
          proxy_set_header REMOTE-HOST $remote_addr;
          proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
          client_max_body_size 5000M;
  }

客户端访问代码
我本地创建了一个8.html文件访问

<!doctype html>
<html lang="en">
 <head>
  <meta charset="UTF-8">
  <title>测试接收数据</title>
  <script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
 </head>
 <body>
 <button type="button" class="signScore">点击一下</button>
  <script type="application/javascript">
  function StartWebSocket(){
    websocket = new WebSocket("ws://app.dev.ejiegd.com/wss");
    websocket.onopen = function (data) {
      // 发送信息
      var content =Math.random().toString(36).slice(2,10);
      websocket.send(content);
    };
    websocket.onmessage = function (data) {
      console.log("接收内容");
      alert(data.data);
      console.log(data.data); // 接收信息
    };
    websocket.onclose = function (data) {
      console.log("关闭", data); //关闭
    };
 }
    $(".signScore").click(function () {
        StartWebSocket();
    })
</script>
 </body>
</html>

最终效果

6.png

workerman可以一直运行,以daemon(守护进程)方式启动workerman即可后台一直运行。

启动停止workerman:

启动

以debug(调试)方式启动

php server.php start

以daemon(守护进程)方式启动

php server.php start -d

停止

php server.php stop

重启

php server.php restart

平滑重启

php server.php reload

查看状态

php server.php status

debug和daemon方式区别:

1、以debug方式启动,代码中echo、var_dump、print等打印函数会直接输出在终端。

2、以daemon方式启动,代码中echo、var_dump、print等打印会默认重定向到/dev/null文件,可以通过设置Worker::$stdoutFile = '/your/path/file';来设置这个文件路径。

3、以debug方式启动,终端关闭后workerman会随之关闭并退出。

4、以daemon方式启动,终端关闭后workerman继续后台正常运行。

RabbitMq访问使用Guest访问时候会出现以下错误

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.

解决办法:

通过RabbitMq后台新增一个用户来连接

// 连接信息
'AMQP' => [
    'host'     => '192.168.66.64',
    'port'     => '5672',
    'username' => 'vister',
    'password' => 'vister',
    'vhost'    => '/'
],

0.png

参考原作者文章:《thinkphp5.1应用rabbitMq》
https://blog.csdn.net/qq_42137846/article/details/125842245

为了验证实践过程,我做了相应的改动。

RabbitMQ的安装请参考《centos7.5 安装RabbitMQ Erlang》

△配置Rabbit文件

路径如下:

application|extra|rabbitmq.php

代码如下

<?php
// +----------------------------------------------------------------------
// | rabbitMQ 示例配置
// 
//   Exchange在定义的时候是有类型的,以决定到底是哪些Queue符合条件,可以接收消息:
//                      fanout:所有bind到此exchange的queue都可以接收消息
//                      direct:通过routingKey和exchange决定的那个唯一的queue可以接收消息
//                      topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
//                      headers:通过headers 来决定把消息发给哪些queue(这个很少用)
// +----------------------------------------------------------------------
return [
    // 连接信息
    'AMQP' => [
        'host'     => '192.168.66.64',
        'port'     => '5672',
        'username' => 'admin',
        'password' => 'admin',
        'vhost'    => '/'
    ],
    //测试队列:direct 路由模式
    'test_queue' => [
        'exchange_name' => 'test_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'test_queue',       //队列名称
        'route_key'     => 'test',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'test'              //消费标签
    ],
 
    //订单队列:direct 路由模式
    'order_queue' => [
        'exchange_name' => 'order_exchange',    //交换机名称
        'exchange_type' => 'direct',            //交换机运行模式(从上面四种模式中选)
        'queue_name'    => 'order_queue',       //队列名称
        'route_key'     => 'order',             //路由键,用于绑定队列与交换机
        'consumer_tag'  => 'order'              //消费标签
    ],
    //...等等其它队列
];

△生产者
路径如下:
application|common|controller|MqProducer.php

代码如下:

<?php
namespace app\common\controller;
use think\Config;
//引入rabbitMq所需类
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MqProducer
{
    /**
     * 生产者
     * @param $data 控制器传来的数据
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \Exception
     */
    public function publish($data, $name)
    {
        //获取rabbitMq配置(刚刚在config文件下配置的rabbitmq信息)
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');
 
        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );
 
        //建立通道
        $channel = $connection->channel();
 
        //初始化交换机  
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);
 
        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);
 
        //绑定队列到交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //生成消息(json格式传输)
        $msg = new AMQPMessage(json_encode($data, JSON_UNESCAPED_UNICODE | JSON_UNESCAPED_SLASHES));
 
        //推送消息到交换机
        $channel->basic_publish($msg, $amqpDefail['exchange_name'], $amqpDefail['route_key']);
 
        //关闭通道
        $channel->close();
 
        //断开连接
        $connection->close();
    }
}

△消费者
文件路径:
application|common|controller|MqConsumer.php
代码如下:

<?php

namespace app\common\controller;
 
use think\Config;
use think\Log;
//引入rabbitMq所需类
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
 
class MqConsumer
{
    /**
     * 消费者(路由模式)
     * @param $name 配置文件下rabbitmq文件前缀名
     * @throws \ErrorException
     */
    public function consumer($name)
    {
        //获取rabbitMq配置
        $amqp = Config::get('rabbitmq.AMQP');
        $amqpDefail = Config::get('rabbitmq.' . $name . '_queue');
 
        //建立连接
        $connection = new AMQPStreamConnection(
            $amqp['host'],
            $amqp['port'],
            $amqp['username'],
            $amqp['password']
        );
 
        //建立通道
        $channel = $connection->channel();
 
        //流量控制(也被称为公平调度,这里设置一个消费者一次只处理200条消息)
        $channel->basic_qos(null, 200, null);
 
        //初始化交换机
        $channel->exchange_declare($amqpDefail['exchange_name'], $amqpDefail['exchange_type'], false, true, false);
 
        //初始化队列
        $channel->queue_declare($amqpDefail['queue_name'], false, true, false, false);
 
        //绑定队列与交换机
        $channel->queue_bind($amqpDefail['queue_name'], $amqpDefail['exchange_name'], $amqpDefail['route_key']);
        
        //消费消息
        $channel->basic_consume($amqpDefail['queue_name'], $amqpDefail['consumer_tag'], false, false, false, false, [$this, 'callback']);
        
        //注册退出函数行为(其实用不到,因为要保证消费者一直运行,所以不能断开连接))
        // register_shutdown_function([$this,'shutdown'], $channel, $connection);
 
        //消息未处理完毕时,循环监听并一直处理上方callback方法的逻辑
        $channel->consume();
 
    }
    
    /**
     * 回调后消息处理(业务逻辑放置此处)
     * @param  $msg
     */
    public function callback($msg)
    {
        //$msg->body  通过消费者传来的控制器接收的数据
        $data = json_decode($msg->body,true);//拿到数据

        Log::write('拿到的消费者数据为:'.json_encode($data,320),'info');
        //这里我拿来做了一个插库的动作
        /*app('weloginper')->create([
            'ToUserName' => 'test',
            'FromUserName' => 'test',
            'CreateTime' => 'test',
            'MsgType' => 'test',
            'Event' => 'test',
            'EventKey' => 'test',
            'phone' => 'test',
        ]);*/
        //如果有个值是quit,则让该消费者停止消费
        /*if($data['aa'] == 'quit'){
            $msg->getChannel()->basic_cancel($msg->getConsumerTag());
        }*/
        $msg->ack();  //消息应答:这波200条消息处理完毕后进行消息确认,告诉mq可以开始发下一波200条消息了
    }
 
    
    
    /**
     * @param $channel 信道
     * @param \PhpAmqpLib\Connection\AbstractConnection $connection
     */
    function shutdown($channel, $connection)
    {
        $channel->close(); //关闭通道
        $connection->close(); //断开连接
    }
 
 
}

生产者业务动作
目的:模拟生成消息
文件路径:application|common|controller|Index.php
代码如下:

<?php
 
namespace app\common\controller;
 
use think\Console;
use app\common\controller\MqProducer;
 
class Index extends MqProducer
{
    public function index()
    {
         $page = input('get.page') ? input('get.page') : 1;//得到初始页码
         $data = [
                'page' => $page,  //当前页
         ];
         if($page <= 100){  //当前不是第10页则一直调用刚刚定义的生产者publish去生产消息
             self::publish($data, 'order');
             $url = url('/common/index/index')  . '?page=' . ++$page;
             die("<script>location.href='{$url}';</script>");
         }
         
         return json([
            'code' => 200,
            'msg' => 'order队列推送成功'
         ]);
    }
 
 
 
}

配置下config

config文件默认禁止访问common模块了

// 禁止访问模块 原来的
//'deny_module_list'       => ['common', 'admin'],
//禁止访问模块 修改后的
'deny_module_list'       => ['admin'],

生成消息
访问 xx.com/common/index/index
生成消息.png

查看RabbitMQ

4.png

消费者消费

命令行:think php order
10.png

再查看RabbitMq 发现消息被消费了。
2501.png

RabbitMQ服务应用是Erlang语言开发的,需要先安装Erlang
以下为安装的步骤

【Erlang】

安装weget工具

yum install wget

下载Erlang

wget --content-disposition https://packagecloud.io/rabbitmq/erlang/packages/el/7/erlang-22.3.4.12-1.el7.x86_64.rpm/download.rpm

安装Erlang

yum install erlang-22.3.4.12-1.el7.x86_64.rpm

【RabbitMQ】

下载RabbitMq

wget --content-disposition https://packagecloud.io/rabbitmq/rabbitmq-server/packages/el/7/rabbitmq-server-3.8.13-1.el7.noarch.rpm/download.rpm

安装签名

rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc

安装RabbitMq

yum install rabbitmq-server-3.8.13-1.el7.noarch.rpm

启动RabbitMq

systemctl start rabbitmq-server

开机启动RabbitMq

systemctl enable rabbitmq-server

【管理界面】

安装RabbitMq管理界面

rabbitmq-plugins enable rabbitmq_management

重启RabbitMq服务

systemctl restart rabbitmq-server

新增管理用户

rabbitmqctl add_user admin admin

给用户分配权限

rabbitmqctl set_user_tags admin administrator

【开放端口】

yum install firewalld

firewall-cmd --zone=public --add-port=15672/tcp --permanent 

systemctl start firewalld #启动
systemctl stop firewalld #停止
systemctl status firewalld #查看状态
systemctl disable firewalld #开机禁用
systemctl enable firewalld #开机启动

firewall-cmd --list-ports #查看开放端口的列表

firewall-cmd --state #查看防火墙状态

【访问管理面板】

yum install net-tools

ifconfig -a

查看当前服务器ID为:192.168.66.64

访问后台地址为:http://192.168.66.64:15672/

账号为:admin
密码为:admin