RabbitMQ代码实战2
RabbitMQ代码实战2
RPC远程过程调用模式队列(RPC)
模型
package cn.yanghuisen.rpc.server;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* RPC模式队列-服务端
*/
public class RPCServer {
// 队列名称
private static final String RPC_QUEUE_NAME = "rpc_queue";
/**
* 计算斐波那契数列
*
* @param n
* @return
*/
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
try {
// 通过工厂创建连接
final Connection connection = factory.newConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);
/*
限制RabbitMQ只发不超过1条的消息给同一个消费者。
当消息处理完毕后,有了反馈,才会进行第二次发送。
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取replyTo队列和correlationId请求标识
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();
String response = "";
try {
// 接收客户端消息
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
// 服务端根据业务需求处理
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
// 将处理结果发送至replyTo队列同时携带correlationId属性
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,
response.getBytes("UTF-8"));
// 手动回执消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
// RabbitMq消费者工作线程通知RPC服务器其他所有线程运行
synchronized (monitor) {
monitor.notify();
}
}
};
// 监听队列
/*
autoAck = true代表自动确认消息
autoAck = false代表手动确认消息
*/
boolean autoAck = false;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
// Wait and be prepared to consume the message from RPC client.
// 线程等待并准备接收来自RPC客户端的消息
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
package cn.yanghuisen.rpc.client;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;
/**
* RPC模式队列-客户端
*/
public class RPCClient implements AutoCloseable {
private Connection connection;
private Channel channel;
// 队列名称
private String requestQueueName = "rpc_queue";
// 初始化连接
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
connection = factory.newConnection();
channel = connection.createChannel();
}
public static void main(String[] args) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 10; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
// 请求服务端
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}
// 请求服务端
public String call(String message) throws IOException, InterruptedException {
// correlationId请求标识ID
final String corrId = UUID.randomUUID().toString();
// 获取队列名称
String replyQueueName = channel.queueDeclare().getQueue();
// 设置replyTo队列和correlationId请求标识
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
// 发送消息至队列
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
// 设置线程等待,每次只接收一个响应结果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
// 接受服务器返回结果
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
// 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回false
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});
// 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值
String result = response.take();
// 从消息队列中丢弃该值
channel.basicCancel(ctag);
return result;
}
// 关闭连接
public void close() throws IOException {
connection.close();
}
}
3.7、确认模式队列(confirm)
如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?
3.7.1、事务机制控制
- txSelect():开启事务
- txCommit():提交事务
- txRollback():回滚事务
package cn.yanghuisen.tx.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 事务-发送消息
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "tx";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
int i = 1/0;
System.out.println(" [x] Sent '" + message + "'");
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
// 回滚
channel.txRollback();
channel.close();
connection.close();
}
}
}
package cn.yanghuisen.tx.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 事务-接收消息
*/
public class Recv {
// 队列名称
private final static String QUEUE_NAME = "tx";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
缺点:降低了RabbitMQ的消息吞吐量
解决:使用confirm模式
总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。
3.7.2、确认模式(confirm)
生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。
confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。
实现Confirm确认机制有三种方式
1、普通Confirm模式
每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。
package cn.yanghuisen.confirm.sync.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 确认模式-同步-单条-发送消息
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 确认消息是否发送成功
if (channel.waitForConfirms()){
System.out.println("消息发送成功");
}else {
System.out.println("消息发送失败");
}
System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();
}
}
}
package cn.yanghuisen.confirm.sync.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 确认模式-同步-单条-接收消息
*/
public class Recv {
// 队列名称
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。
package cn.yanghuisen.confirm.sync.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
/**
* 确认模式-同步-批量-发送消息
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 确认消息是否发送成功-多条
// 如果有一条没被确认,就会抛IO异常
channel.waitForConfirmsOrDie();
// if (channel.waitForConfirms()){
// System.out.println("消息发送成功");
// }else {
// System.out.println("消息发送失败");
// }
System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();
}
}
}
package cn.yanghuisen.confirm.sync.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 确认模式-同步-批量-接收消息
*/
public class Recv {
// 队列名称
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}
3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。
package cn.yanghuisen.confirm.async.send;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
/**
* 确认模式-异步-发送消息
*/
public class Send {
// 队列名称
private final static String QUEUE_NAME = "confirm_async";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = null;
Channel channel = null;
// 通过工厂创建连接
try{
// 维护信息发送回执deliveryTag
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
// 创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 添加channel 监听
channel.addConfirmListener(new ConfirmListener() {
/**
* 已确认
* @param l 唯一标识
* @param b 确认多条还是单条,true多条
* @throws IOException
*/
@Override
public void handleAck(long l, boolean b) throws IOException {
// 判断确认的是多条还是单条
if (b){
System.out.println("handleAck--success-->multiple" + l);
// 清除前 l 标识ID
confirmSet.headSet(l+1).clear();
}else {
System.out.println("handleAck--success-->single" + l);
confirmSet.remove(l);
}
}
/**
* 未确认
* @param l
* @param b
* @throws IOException
*/
@Override
public void handleNack(long l, boolean b) throws IOException {
if (b){
System.out.println("handleNack--failed-->multiple-->" + l);
// 清除前 deliveryTag 项标识id
confirmSet.headSet(l + 1L).clear();
}else {
System.out.println("handleNack--failed-->single" + l);
confirmSet.remove(l);
}
}
});
// 循环发送消息
while (true){
// 消息内容
String message = "Hello World!";
// 获取unconfirm的消息序号
Long seqNo = channel.getNextPublishSeqNo();
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 将消息序号添加到SortedSet
confirmSet.add(seqNo);
}
}catch (Exception e){
e.printStackTrace();
channel.close();
connection.close();
}
}
}
package cn.yanghuisen.confirm.async.recv;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
/**
* 确认模式-异步-接收消息
*/
public class Recv {
// 队列名称
private final static String QUEUE_NAME = "confirm_async";
public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}