thinkphp6+swoole使用rabbitMq队列
- 安装think-swoole
- 安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用
- 安装rabbitMq延迟队列插件
-
安装 rabbitmq_delayed_message_exchange 插件,按照以下步骤操作: 下载插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases 以下路由不一定是一样的!!! 将插件复制到 RabbitMQ 插件目录: 将下载的插件文件复制到 RabbitMQ 插件目录。 sudo cp rabbitmq_delayed_message_exchange-3.8.9.ez /usr/lib/rabbitmq/lib/rabbitmq_server-<version>/plugins/ 将 <version> 替换为您的 RabbitMQ 服务器版本。 启用插件: 使用 RabbitMQ 命令行工具启用插件。 sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange 重启 RabbitMQ: 重启 RabbitMQ 服务器以应用更改。 sudo systemctl restart rabbitmq-server
-
- config目录创建 rabbitmq.php 文件,内容如下
return [
'host' => '服务器地址',
'port' => '端口',
'user' => '账户',
'password' => '密码',
'vhost' => '/',
'exchange' => 'delayed_exchange',
'exchange_type' => 'direct', // 交换机类型(如 direct、fanout、topic)
'exchange_arguments' => ['x-delayed-type' => 'direct'], // 延迟交换机参数
];
创建 RabbitMQService 类
class RabbitMQService
{
protected $connection;
protected $channel;
public function __construct()
{
$config = config('rabbitmq');
$this->connection = new AMQPStreamConnection(
$config['host'],
$config['port'],
$config['user'],
$config['password'],
$config['vhost']
);
$this->channel = $this->connection->channel();
$this->channel->exchange_declare(
$config['exchange'],
'x-delayed-message', // 指定延迟交换机类型
false,
true,
false,
false,
false,
new AMQPTable(['x-delayed-type' => $config['exchange_type']]) // 设置延迟交换机的底层类型
);
}
public function publish($message, $queue, $delay = 0)
{
// 声明队列
$this->channel->queue_declare($queue, false, true, false, false);
// 绑定队列到交换机
$this->channel->queue_bind($queue, config('rabbitmq.exchange'));
// 设置延迟头部信息
$headers = new AMQPTable([
'x-delay' => $delay // 延迟时间,单位为毫秒
]);
// 创建消息
$msg = new AMQPMessage($message, [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, // 持久化消息
]);
$msg->set('application_headers', $headers); // 正确设置头信息
// 发布消息到交换机
$this->channel->basic_publish($msg, config('rabbitmq.exchange'));
}
public function consume($queue, $callback)
{
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->basic_consume($queue, '', false, true, false, false, $callback);
while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}
public function __destruct()
{
$this->channel->close();
$this->connection->close();
}
}
创建 RabbitMqUseService 类文件
class RabbitMqUseService
{
// 消费队列
public static function consumption()
{
$rabbitMQ = new RabbitMQService();
$rabbitMQ->consume('queue', function ($msg){
Log::error('消费队列'.$msg->body);
$con = json_decode($msg->body,true);
$class = $con['class'];
Log::error("class->>".$class);
if(class_exists($class)){
$obj = new $class;
$obj->handle($con['body']);
}
});
}
/**
* @param $obj
* @param $data
* @param $delay
* @return void
*/
public static function push($obj,$data,$delay = 0)
{
$rabbitMQ = new RabbitMQService();
$class = get_class($obj);
// 构造消息体
$message = json_encode([
'class' => $class, // 类名
'body' => $data // 具体数据
]);
$rabbitMQ->publish($message, 'queue', $delay * 1000);
var_dump('已加入');
}
public function test()
{
self::push(new TestJob(),['name'=>'test'],10);
}
}
配置消费任务
新建文件类 RabbitConsumptionHandle
class RabbitConsumptionHandle
{
public function handle()
{
RabbitMqUseService::consumption();
}
}
在app/event.php listen 中引入
'listen' => [
'swoole.init' => [
RabbitConsumptionHandle::class
]
]
新增队列
RabbitMqUseService::push(new \app\job\TestJob(),[
'a'=>1,
'b'=>2
]);