RabbitMQ 消息队列代码实战1
RabbitMQ 消息队列代码实战1
1. 准备工作
首先,我们需要加入rabbitmq的amqp client依赖
<!-- amqp client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
其次,我们需要编写一个连接mq和通道的工具类ConnectionUtils,如下:
package com.mcp.lab.mq.rabbit.common.util;
import com.mcp.lab.mq.rabbit.common.domain.ConnInfo;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConnectionUtils {
/**
* 连接器(默认)
*
* @return
* @throws IOException
* @throws TimeoutException
*/
public static Connection getConnection() throws IOException, TimeoutException {
ConnInfo connInfo = new ConnInfo.Builder()
.setHost("Your RabbitMQ Broker Host")
.setPort(5672)
.setVirtualHost("Your Virtual Host(自定义)")
.setUsername("your rabbit admin user")
.setPassword("your rabbit admin password")
.build();
return getConnection(connInfo);
}
}
2. 简单模式实例
-
生产者(Producer)代码
ConsoleSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class ConsoleSender {
private static final String QUIT = "Q";
public static final String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
// 创建队列声明
// queue:队列名
// durable:是否持久化
// exclusive:是否排外 即只允许该channel访问该队列 一般等于true的话用于一个队列只能有一个消费者来消费的场景
// autoDelete:是否自动删除 消费完删除
// arguments:其他属性
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 输入发送的消息
Scanner input = new Scanner(System.in);
String msg = "";
while (true) {
System.out.print("请输入发送的消息: ");
msg = input.nextLine();
if (QUIT.equals(msg.toUpperCase())) {
break;
}
// exchange,队列,参数,消息字节体
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Producer发送的消息: " + msg);
}
// 清理工作
channel.close();
connection.close();
}
}
- 消费者(Consumer)代码
SimpleReceiver
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class SimpleReceiver {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Consumer] Received from queue - '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
运行结果:
1. 首先运行结果生产者(ConsoleSender)
我们在控制台的输入如下:
请输入发送的消息: 789
Producer发送的消息: 789
请输入发送的消息: 111
Producer发送的消息: 111
请输入发送的消息: q
2. 其次运行消费者(SimpleReceiver)
显示如下:
[Consumer] Received from queue - 'simple_queue':'789'
[Consumer] Received from queue - 'simple_queue':'111'
3. 工作队列模式实例
- 生产者(Producer)代码
WorkQueueSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class WorkQueueSender {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
String message = "work mode message" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[Producer] Sent '" + message + "'");
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
- 消费者代码(模拟2个消费者)
WorkQueueReceiver1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkQueueReceiver1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Work Consumer 1] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
WorkQueueReceiver2
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class WorkQueueReceiver2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发送一条消息给消费者
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Work Consumer 2] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
运行结果:
1. Producer运行:
[Producer] Sent 'work mode message0'
...
[Producer] Sent 'work mode message99'
---------------------------------------------------
2. Consumer1运行:
[Work Consumer 1] Received 'work_queue':'work mode message0'
[Work Consumer 1] Received 'work_queue':'work mode message2'
...
[Work Consumer 1] Received 'work_queue':'work mode message98'
---------------------------------------------------
3. Consumer2运行:
[Work Consumer 2] Received 'work_queue':'work mode message1'
[Work Consumer 2] Received 'work_queue':'work mode message3'
...
[Work Consumer 2] Received 'work_queue':'work mode message99'
注: 从上面结果可以看出,2个消费者以抢占的方式消费消息且不重复。
4. 发布订阅模式实例
- 生产者(Producer)代码
ConsolePublishSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class ConsolePublishSender {
private static final String QUIT = "Q";
private static final String EXCHANGE_NAME = "publish_logs";
public static void main(String[] argv) throws Exception {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// 输入发送的消息
Scanner input = new Scanner(System.in);
String msg = "";
while (true) {
System.out.print("请输入发送的消息: ");
msg = input.nextLine();
if (QUIT.equals(msg.toUpperCase())) {
break;
}
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println(" [Publisher] Sent '" + msg + "'");
}
channel.close();
connection.close();
}
}
- 消费者代码(模拟2个消费者)
SubscribeReceive1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class SubscribeReceive1 {
private static final String EXCHANGE_NAME = "publish_logs";
public static void main(String[] argv) throws Exception {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 订阅消息的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Subscriber 1] Received '" + message + "'");
};
// 消费者,有消息时触发订阅回调函数
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
SubscribeReceive2
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.nio.charset.StandardCharsets;
public class SubscribeReceive2 {
private static final String EXCHANGE_NAME = "publish_logs";
public static void main(String[] argv) throws Exception {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 订阅消息的回调函数
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Subscriber 2] Received '" + message + "'");
};
// 消费者,有消息时触发订阅回调函数
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
运行结果:
1. Producer运行:
请输入发送的消息: topic 1
[Publisher] Sent 'topic 1'
请输入发送的消息: topic 2
[Publisher] Sent 'topic 2'
请输入发送的消息: haha
[Publisher] Sent 'haha'
请输入发送的消息: q
---------------------------------------------------
2. Consumer1运行:
[*] Waiting for messages. To exit press CTRL+C
[Subscriber 1] Received '发布的主题信息'
[Subscriber 1] Received 'topic 1'
[Subscriber 1] Received 'topic 2'
[Subscriber 1] Received 'haha'
---------------------------------------------------
3. Consumer2运行:
[*] Waiting for messages. To exit press CTRL+C
[Subscriber 2] Received '发布的主题信息'
[Subscriber 2] Received 'topic 1'
[Subscriber 2] Received 'topic 2'
[Subscriber 2] Received 'haha'
注: 多个接收者接收到一模一样的消息。该模式用于多个消费者订阅同一个主题。
5. 路由模式实例
- 生产者(Producer)代码
ConsoleRouteSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;
public class ConsoleRouteSender {
private static final String QUIT = "Q";
private final static String EXCHANGE_NAME = "exchange_direct";
private final static String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
// 交换机声明
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
// 输入发送的消息
Scanner input = new Scanner(System.in);
String msg = "";
while (true) {
System.out.print("请输入发送的消息: ");
msg = input.nextLine();
if (QUIT.equals(msg.toUpperCase())) {
break;
}
// 只有routingKey相同的才会消费
channel.basicPublish(EXCHANGE_NAME, "key2", null, msg.getBytes());
//channel.basicPublish(EXCHANGE_NAME, "key", null, msg.getBytes());
System.out.println("[Route Producer] Sent '" + msg + "'");
}
channel.close();
connection.close();
}
}
- 消费者代码(模拟2个消费者)
RouteReceiver1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RouteReceiver1 {
private final static String QUEUE_NAME = "queue_routing";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 指定路由的key,接收key和key2
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Route Consumer 1] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
RouteReceiver2
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class RouteReceiver2 {
private final static String QUEUE_NAME = "queue_routing2";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 仅接收key2
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key2");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Route Consumer 2] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
运行结果:
1. Producer运行:
请输入发送的消息: abc
[Route Producer] Sent 'abc'
请输入发送的消息: test
[Route Producer] Sent 'test'
请输入发送的消息: q
---------------------------------------------------
2. Consumer1运行:
[Route Consumer 1] Received 'key2':'abc'
[Route Consumer 1] Received 'key2':'test'
---------------------------------------------------
3. Consumer2运行:
[Route Consumer 2] Received 'key2':'abc'
[Route Consumer 2] Received 'key2':'test'
如果把sender中的key2改成key,运行结果如下:
请输入发送的消息: 123
[Route Producer] Sent '123'
请输入发送的消息: 456
[Route Producer] Sent '456'
请输入发送的消息: 789
[Route Producer] Sent '789'
请输入发送的消息: q
[Route Consumer 1] Received 'key':'123'
[Route Consumer 1] Received 'key':'456'
[Route Consumer 1] Received 'key':'789'
consumer2没有数据,因为route key没有匹配。
6. 主题模式实例
- 生产者(Producer)代码
SimpleTopicSender
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class SimpleTopicSender {
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
String message = "topics model message with key.1";
channel.basicPublish(EXCHANGE_NAME, "key.1", null, message.getBytes());
System.out.println("[Producer] Sent '" + message + "'");
String message2 = "topics model message with key.1.2";
channel.basicPublish(EXCHANGE_NAME, "key.1.2", null, message2.getBytes());
System.out.println("[Producer] Sent '" + message2 + "'");
channel.close();
connection.close();
}
}
- 消费者代码(模拟2个消费者)
TopicReceiver1
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicReceiver1 {
private final static String QUEUE_NAME = "queue_topic";
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 可以接收key.1
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "key.*");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Consumer 1] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
TopicReceiver2
import com.mcp.lab.mq.rabbit.common.util.ConnectionUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
public class TopicReceiver2 {
private final static String QUEUE_NAME = "queue_topic2";
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, InterruptedException, TimeoutException {
// 获取一个连接
Connection connection = ConnectionUtils.getConnection();
// 从连接获取一个通道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// *号代表单个单词,可以接收key.1
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// #号代表多个单词,可以接收key.1.2
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.#");
channel.basicQos(1);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [Consumer 2] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
});
}
}
运行结果:
1. Producer运行:
[Producer] Sent 'topics model message with key.1'
[Producer] Sent 'topics model message with key.1.2'
---------------------------------------------------
2. Consumer1运行:
[Consumer 1] Received 'key.1':'topics model message with key.1'
---------------------------------------------------
3. Consumer2运行:
[Consumer 2] Received 'key.1':'topics model message with key.1'
[Consumer 2] Received 'key.1.2':'topics model message with key.1.2'