- 在 RabbitMQ 平台创建 Virtual Hosts 和一个队列
- /boyaVirtualHosts
- 订单队列
- 支付队列
-
导入依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5 </version>
</dependency>
-
编写连接类:
public class RabbitMQConnection {
/**
* 获取连接
*/
public static Connection getConnection() throws IOException, TimeoutException {
// 1.创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2.设置连接地址
connectionFactory.setHost("127.0.0.1");
// 3.设置端口号
connectionFactory.setPort(5672);
// 4.设置账号和密码
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
// 5.设置 VirtualHost
connectionFactory.setVirtualHost("/boyaVirtualHostsR");
return connectionFactory.newConnection();
}
}
-
编写生产者代码:
public class Producer {
private static final String QUEUE_NAME = "BoyatopMamber";
/**
* 获取连接
*/
public static void main(String[] args) throws IOException, TimeoutException {
while (true){
// 1.创建连接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
// 3.设置消息
String msg = "Hello World";
System.out.println("msg:" + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.close();
connection.close();
}
}
}
-
编写消费者代码:
public class Comsumer {
private static final String QUEUE_NAME = "BoyatopMamber";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建链接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取消息:" + msg);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
-
RabbitMQ 如何保证消息不丢失:
- 生产者角色:
- 确保生产者角色投递到 MQ 服务器端成功
- Ack 消息确认机制
- 同步或异步的形式:
- Confirms
- 事务消息
- 消费者角色:
- 在 RabbitMQ 情况下:
- 必须要将消息消费成功之后,才会将消息从 MQ 服务器端移除
- 在 kafka 中的情况下:
- 不管是消费成功还是消费失败,该消息都不会立即从 MQ 服务器移除
- MQ 服务器端:
- 在默认的情况下,都会对队列中的消息持久化,持久化硬盘
- 使用消息确认机制 + 持久化技术实现:
- A 消费者确认收到消息机制
channel.basicConsume(QUEUE_NAME,false,defaultConsumer);
- 第二个参数值为 false,代表关闭 RabbitMQ 的自动应答机制,改为手动应答
- 在处理完消息时,返回应答状态,true 表示为自动应答模式
channel.basicAck(envelope.getDeliveryTag(),false);
- B 生产者确认投递消息成功,使用 Confirm 机制或者事务消息
![](https://i-blog.csdnimg.cn/direct/d0c821c78a9b44c5b2591b913e1658d3.png)
- Confirm 机制,同步或异步的形式
-
RabbitMQ 默认创建是持久化的形式:![](https://i-blog.csdnimg.cn/direct/30b2a616bdf3461e85c02bd31799bb7a.png)
- 将代码中的 durable 设为 true
- 参数详解:
- Durability:是否持久化
- durable:持久化
- Transient:不持久化
- Auto delete:是否自动删除
- 当最后一个消费者断开连接之后队列是否自动被删除
- 可以通过 RabbitMQ Management 查看某个队列的消费者数量
- 当 consumers = 0 时,队列就会自动删除
- 使用 RabbitMQ 事务:
//设置事务
channel.txSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
channel.txCommit();
- 生产者:
public class producer {
private static final String QUEUE_NAME = "BoyatopMamber";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1.创建新的连接
Connection connection = RabbitMQConnection.getConnection();
// 2.设置 channel
Channel channel = connection.createChannel();
// 3.发送消息
String msg = "Hello my Bro";
channel.confirmSelect();
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
boolean result = channel.waitForConfirms();
if(result){
System.out.println("消息投递成功");
}else {
System.out.println("消息投递失败");
}
// 4.关闭资源
channel.close();
connection.close();
}
}
- 消费者:
public class Consumer {
private static final String QUEUE_ANME = "BoyatopMamber";
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建连接
Connection connection = RabbitMQConnection.getConnection();
//2.设置通道
Channel channel = connection.createChannel();
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("消费者获取消息:" + msg);
//消费者完成 消费该消息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 3.监听队列
channel.basicConsume(QUEUE_ANME,false,defaultConsumer);
}
}