当前位置: 首页 > article >正文

thinkphp6+swoole使用rabbitMq队列

  1. 安装think-swoole
  2. 安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用
  3. 安装rabbitMq延迟队列插件
    1. 安装 rabbitmq_delayed_message_exchange 插件,按照以下步骤操作:  
      下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
      
      以下路由不一定是一样的!!!
      将插件复制到 RabbitMQ 插件目录: 将下载的插件文件复制到 RabbitMQ 插件目录。  
      sudo cp rabbitmq_delayed_message_exchange-3.8.9.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/
      将 <version> 替换为您的 RabbitMQ 服务器版本。  
      启用插件: 使用 RabbitMQ 命令行工具启用插件。  
      sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
      重启 RabbitMQ: 重启 RabbitMQ 服务器以应用更改。  
      sudo systemctl restart rabbitmq-server

  4. config目录创建 rabbitmq.php 文件,内容如下
return [
    'host' => '服务器地址',
    'port' => '端口',
    'user' => '账户',
    'password' => '密码',
    'vhost' => '/',
    'exchange' => 'delayed_exchange',
    'exchange_type' => 'direct', // 交换机类型(如 direct、fanout、topic)
    'exchange_arguments' => ['x-delayed-type' => 'direct'], // 延迟交换机参数
];

创建 RabbitMQService 类


class RabbitMQService
{
    protected $connection;
    protected $channel;

    public function __construct()
    {
        $config = config('rabbitmq');
        $this->connection = new AMQPStreamConnection(
            $config['host'],
            $config['port'],
            $config['user'],
            $config['password'],
            $config['vhost']
        );
        $this->channel = $this->connection->channel();
        $this->channel->exchange_declare(
            $config['exchange'],
            'x-delayed-message', // 指定延迟交换机类型
            false,
            true,
            false,
            false,
            false,
            new AMQPTable(['x-delayed-type' => $config['exchange_type']]) // 设置延迟交换机的底层类型
        );
    }

    public function publish($message, $queue, $delay = 0)
    {
        // 声明队列
        $this->channel->queue_declare($queue, false, true, false, false);

        // 绑定队列到交换机
        $this->channel->queue_bind($queue, config('rabbitmq.exchange'));

        // 设置延迟头部信息
        $headers = new AMQPTable([
            'x-delay' => $delay // 延迟时间,单位为毫秒
        ]);

        // 创建消息
        $msg = new AMQPMessage($message, [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 持久化消息
        ]);
        $msg->set('application_headers', $headers); // 正确设置头信息

        // 发布消息到交换机
        $this->channel->basic_publish($msg, config('rabbitmq.exchange'));
    }


    public function consume($queue, $callback)
    {
        $this->channel->queue_declare($queue, false, true, false, false);
        $this->channel->basic_consume($queue, '', false, true, false, false, $callback);

        while ($this->channel->is_consuming()) {
            $this->channel->wait();
        }
    }

    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
    }
}

创建 RabbitMqUseService 类文件


class RabbitMqUseService
{
    // 消费队列
    public static function consumption()
    {
        $rabbitMQ = new RabbitMQService();
        $rabbitMQ->consume('queue', function ($msg){
            Log::error('消费队列'.$msg->body);
            $con = json_decode($msg->body,true);
            $class = $con['class'];
            Log::error("class->>".$class);
            if(class_exists($class)){
                $obj = new $class;
                $obj->handle($con['body']);
            }
        });
    }

    /**
     * @param $obj
     * @param $data
     * @param $delay
     * @return void
     */
    public static function push($obj,$data,$delay = 0)
    {
        $rabbitMQ = new RabbitMQService();
        $class = get_class($obj);
        // 构造消息体
        $message = json_encode([
            'class' => $class,  // 类名
            'body' => $data     // 具体数据
        ]);
        $rabbitMQ->publish($message, 'queue', $delay * 1000);
        var_dump('已加入');
    }

    public function test()
    {
        self::push(new TestJob(),['name'=>'test'],10);
    }
}

配置消费任务

新建文件类 RabbitConsumptionHandle

class RabbitConsumptionHandle
{
    public function handle()
    {
        RabbitMqUseService::consumption();
    }
}

在app/event.php listen 中引入

  'listen'    => [
        'swoole.init' => [
            RabbitConsumptionHandle::class
        ]
   ]

新增队列

      RabbitMqUseService::push(new \app\job\TestJob(),[
            'a'=>1,
            'b'=>2
        ]);


http://www.kler.cn/a/518855.html

相关文章:

  • 单片机基础模块学习——数码管(二)
  • 3097. 或值至少为 K 的最短子数组 II
  • 996引擎 - NPC-动态创建NPC
  • 2025美赛数学建模C题:奥运金牌榜,完整论文代码模型目前已经更新
  • 【6】YOLOv8 训练自己的分割数据集
  • 飞牛 fnOS 安装8852be网卡驱动并成功连接
  • GPMC介绍
  • Android 定位 获取当前位置 (Kotlin)
  • 企业信息化4:集团化企业的财务管理系统
  • 2025牛客寒假算法基础集训营1
  • C++异步future
  • hexo + Butterfly搭建博客
  • 【Django DRF Apps】从零搭建一个导出 Excel 和 PDF的app应用
  • 电力场效应晶体管(电力 MOSFET),全控型器件
  • 【ComfyUI专栏】ComfyUI 部署Kolors
  • 【实践】Python使用Pandas处理气象数据
  • 【数据分享】1929-2024年全球站点的逐日平均能见度(Shp\Excel\免费获取)
  • github配置SSH公钥后需要输入密码
  • VUE3 如何快速使用pinia
  • nacos(基于docker最详细安装)
  • 工业相机 SDK 二次开发-Sherlock插件
  • Spring Boot是什么及其优点
  • SpringBoot引入第三方jar包
  • redis实现lamp架构缓存
  • 将Deepseek接入本地Vscode
  • 对于Docker的初步了解