当前位置: 首页 > article >正文

RabbitMQ 消息队列:生产者与消费者实现详解

在分布式系统中,消息队列(Message Queue, MQ)是一种重要的组件,用于解耦系统、异步处理任务以及实现系统间的通信。RabbitMQ 是一个流行的开源消息代理软件,它实现了高级消息队列协议(AMQP)。在本文中,我们将通过 Java 示例来演示如何使用 RabbitMQ 的生产者和消费者模型。

1. 环境准备

首先,确保你已经安装了 RabbitMQ 服务器,并且 Java 环境已经配置妥当。你还需要在项目中引入 RabbitMQ 的 Java 客户端库。通常,这可以通过 Maven 或 Gradle 等构建工具来实现。

2. 生产者(Producer)

生产者负责发送消息到队列。以下是生产者的 Java 实现代码及其注释:

package com.qcby.rabbitmq.mq1;  
  
import com.qcby.rabbitmq.connection.RabbitMQConnection; // 自定义的连接管理类  
import com.rabbitmq.client.Channel;  
import com.rabbitmq.client.Connection;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
// 生产者类  
public class Producer {  
    private static final String QUEUE_NAME = "boyatopMember"; // 队列名称  
  
    public static void main(String[] args) throws IOException, TimeoutException {  
        // 1. 创建连接  
        // 通过自定义的 RabbitMQConnection 类获取连接  
        Connection connection = RabbitMQConnection.getConnection("/boyavirtualHosts"); // 假设这是连接到特定虚拟主机的连接  
  
        // 2. 设置通道  
        // 通道是大多数 RabbitMQ API 调用的入口点  
        Channel channel = connection.createChannel();  
  
        // 3. 设置消息  
        String msg = "hello world"; // 待发送的消息  
        System.out.println("msg:" + msg); // 在控制台输出消息内容  
  
        // 4. 发送消息到队列  
        // 第一个参数是交换机名称,这里使用空字符串表示默认交换机(direct类型)  
        // 第二个参数是队列名称  
        // 第三个参数是消息的其他属性,这里传null表示默认  
        // 第四个参数是消息体,需要是字节数组形式  
        channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());  
  
        // 5. 关闭通道和连接  
        channel.close();  
        connection.close();  
    }  
}
3. 消费者(Consumer)

消费者负责从队列中接收并处理消息。以下是消费者的 Java 实现代码及其注释:

package com.qcby.rabbitmq.mq1;  
  
import com.qcby.rabbitmq.connection.RabbitMQConnection; // 自定义的连接管理类  
import com.rabbitmq.client.*;  
  
import java.io.IOException;  
import java.util.concurrent.TimeoutException;  
  
// 消费者类  
public class Consumer {  
    private static final String QUEUE_NAME = "boyatopMember"; // 队列名称  
  
    public static void main(String[] args) throws IOException, TimeoutException {  
        // 1. 创建连接  
        Connection connection = RabbitMQConnection.getConnection("/boyavirtualHosts"); // 连接到特定虚拟主机  
  
        // 2. 设置通道  
        Channel channel = connection.createChannel();  
  
        // 3. 定义消费者  
        // 使用 DefaultConsumer 类并覆盖 handleDelivery 方法来处理接收到的消息  
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {  
            @Override  
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {  
                String msg = new String(body, "UTF-8"); // 将字节数组转换为字符串  
                System.out.println("消费者获取消息:" + msg); // 在控制台输出消息内容  
            }  
        };  
  
        // 4. 监听队列  
        // 第一个参数是队列名称  
        // 第二个参数是是否自动确认消息,true表示自动确认,false表示需要手动确认  
        // 第三个参数是消费者实例  
        channel.basicConsume(QUEUE_NAME, true, defaultConsumer);  
  
        // 注意:这里的代码实际上会阻塞等待消息的到来。  
        // 在实际应用中,你可能需要保持这个消费者程序持续运行,直到你显式地停止它。  
    }  
}

4. 自定义连接(connection)
 

package com.qcby.rabbitmq.connection;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class RabbitMQConnection {
    /**
     * 获取连接
     *
     * @return
     */
    public static Connection getConnection(String virtualHost) throws IOException, TimeoutException, TimeoutException {
        // 1.创建连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 2.设置连接地址
        connectionFactory.setHost("127.0.0.1");
        // 3.设置端口号:
        connectionFactory.setPort(5672);
        // 4.设置账号和密码
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        // 5.设置VirtualHost
        connectionFactory.setVirtualHost(virtualHost);
        return connectionFactory.newConnection();
    }
}
5. 运行与测试
  1. 首先,启动 RabbitMQ 服务器。
  2. 运行 Producer 类的 main 方法发送消息。
  3. 运行 Consumer 类的 main 方法接收并处理消息。

如果一切设置正确,你将在 Consumer 的控制台输出中看到 "消费者获取消息:hello world" 的信息,表明消息已成功从生产者发送到消费者。

6. 结论

通过本文,我们学习了如何在 Java 中使用 RabbitMQ 实现基本的生产者和消费者模型。这仅仅是 RabbitMQ 强大功能的冰山一角,RabbitMQ 还支持多种交换机类型、消息确认机制、消息持久化等高级特性,可以帮助你构建更加健壮和灵活的分布式系统。


http://www.kler.cn/a/331332.html

相关文章:

  • vsCode怎么使用vue指令快捷生成代码
  • 「配置应用的可见性」功能使用教程
  • macOS 配置 vscode 命令行启动
  • Java模拟多个Mqtt客户端连接Mqtt Broker
  • HBase、Hive、Redis 和 MongoDB的对比
  • 【JavaEE初阶】线程 和 thread
  • Linux文件重定向文件缓冲区
  • 【漏洞复现】大华智慧园区综合管理平台 video 任意文件上传漏洞
  • 【rCore OS 开源操作系统】Rust mod模块和static生命周期 知识点及练习题
  • LeetCode hot100---哈希表专题(C++语言)
  • 【ecology】独立选择框\公共选择框表
  • C#的面向对象
  • C#类的概念
  • HarmonyOS Next应用开发——响应式布局之媒体查询
  • 【2024.10.2练习】奶牛晒衣服
  • Vue diff 算法介绍
  • 《江苏科技大学学报(自然科学版)》
  • 解决方法:PDF文件打开之后不能打印?
  • docker 搭建minimalist-web-notepad
  • OpenGL ES 顶点缓冲区和布局(3)
  • spring揭秘25-springmvc04-servlet容器与springmvc容器总结
  • 在线css像素px到Em的转换器
  • 二值图像的面积求取的两种方法及MATLAB实现
  • Java - LeetCode面试经典150题 - 矩阵 (四)
  • 【羊毛资源】华为云开发者云主机免费申请使用指南
  • Docker 容器日志记录与管理:日志输出、轮转与配置实践