8.RabbitMQ队列详解
八、RabbitMQ队列详解
1、队列属性
-
Type
设置队列的队列类型;
-
Name
队列名称,就是一个字符串,随便一个字符串就可以;
-
Durability
声明队列是否持久化,代表队列在服务器重启后是否还存在;
-
Auto delete:
是否自动删除
如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;
-
Exclusive
exclusive属性的队列只对第一个连接它的消费者可见(之后其它消费者无法访问该队列),并且在连接断开时自动删除
基本上不设置它,设置成false
-
Arguments:队列的其他属性,例如指定DLX(死信交换机等);
-
x-expires:Number
当Queue(队列)在指定的时间未被访问则队列将被自动删除
-
x-message-ttl:Number
发布的消息在队列中存在多长时间后被取消(单位毫秒)
-
x-overflow:String
设置队列溢出行为
当达到队列的最大长度时消息的处理方式
-
drop-head:删除头部消息
-
reject-publish:超过队列长度后,后面发布的消息会被拒绝接收
-
reject-publish-dlx:超过队列长度后,后面发布的消息会被拒绝接收并发布到死信交换机
仲裁队列类型仅支持:drop-head and reject-publish两种
-
-
x-max-length:Number
队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
-
x-max-length-bytes:Number
队列在开始从头部删除就绪消息之前可以包含的总正文大小
受限于内存大小,超过该阈值则从队列头部开始删除消息
-
x-single-active-consumer:默认为false
表示队列是否是只能有一个消费者
设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略
设置为false时消息循环分发给所有消费者(默认false)
-
x-dead-letter-exchange:String
指定队列关联的死信交换机
有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;
-
x-dead-letter-routing-key:String
指定死信交换机的路由键
一般和7一起定义
-
x-queue-mode:String(理解下即可)
队列类型x-queue-mode=lazy懒队列,在磁盘上尽可能多地保留消息以减少RAM使用
如果未设置则队列将保留内存缓存以尽可能快地传递消息;
如果设置则队列消息将保存在磁盘上,消费时从磁盘上读取消费
-
x-queue-master-locator:String(用的较少)
在集群模式下设置队列分配到的主节点位置信息;
每个queue都有一个master节点,所有对于queue的操作都是事先在master上完成,之后再slave上进行相同的操作;
每个不同的queue可以坐落在不同的集群节点上,这些queue如果配置了镜像队列,那么会有1个master和多个slave
基本上所有的操作都落在master上,那么如果这些queues的master都落在个别的服务节点上,而其他的节点又很空闲,这样就无法做到负载均衡,那么势必会影响性能;
关于master queue host 的分配有几种策略,可以在queue声明的时候使用x-queue-master-locator参数,或者在policy上设置queue-master-locator,或者直接在rabbitmq的配置文件中定义queue_master_locator有三种可供选择的策略
-
min-masters:选择master queue数最少的那个服务节点host;
-
client-local:选择与client相连接的那个服务节点host;
-
random:随机分配;
-
2、属性测试
8.2.1、非持久化属性测试
测试模块rabbitmq-08-properties-01
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn1
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
Message message = MessageBuilder.withBody("hello world2".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.1";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.1";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.test1.1";
public static final String ROUTING_NAME2 = "key.properties.test1.2";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
* 不持久化、不设置名字
*
* @return
*/
@Bean
public Queue normalQueue1() {
return QueueBuilder.nonDurable()
.build();
}
/**
* 正常队列
* 不持久化、设置名字
*
* @return
*/
@Bean
public Queue normalQueue2() {
return QueueBuilder.nonDurable(RabbitMQConstant.QUEUE_NAME1)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue1
* @return
*/
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue2
* @return
*/
@Bean
public Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {
return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties01Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties01Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
发行消息查看控制台
-
重启服务器查看队列是否存在
8.2.2、持久化测试
是否自动删除
如果为true,当没有消费者连接到这个队列的时候,队列会自动删除
测试模块rabbitmq-08-properties-02
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn2
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
Message message = MessageBuilder.withBody("hello world2".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.2";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.2";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.test1.2";
public static final String ROUTING_NAME2 = "key.properties.test2.2";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
* 持久化、不设置名字
*
* @return
*/
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable()
.build();
}
/**
* 正常队列
* 持久化、设置名字
*
* @return
*/
@Bean
public Queue normalQueue2() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue1
* @return
*/
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue2
* @return
*/
@Bean
public Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {
return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties02Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties02Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
发送消息查看控制台
-
重启服务器查看控制台
8.2.3、自动删除测试
测试模块rabbit-08-properties-03
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn3
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
Message message = MessageBuilder.withBody("hello world2".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME2, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
消费者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReceivemessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg1(Message message) {
log.info("1接收到的消息为:{}", new String(message.getBody()));
}
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME2})
public void receiveMsg2(Message message) {
log.info("2接收到的消息为:{}", new String(message.getBody()));
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.3";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.test1.3";
public static final String QUEUE_NAME2 = "queue.properties.test2.3";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.test1.3";
public static final String ROUTING_NAME2 = "key.properties.test2.3";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列1
*
* @return
*/
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.autoDelete()// 设置自动删除
.build();
}
/**
* 正常队列2
*
* @return
*/
@Bean
public Queue normalQueue2() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME2)
// .autoDelete()// 设置不自动删除,默认就是不自动删除
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue1
* @return
*/
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue2
* @return
*/
@Bean
public Binding bindingNormal2(DirectExchange normalExchange, Queue normalQueue2) {
return BindingBuilder.bind(normalQueue2).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME2);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties03Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties03Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
有消费者连接时
-
无消费连接时
8.2.4、可见性测试
普通队列允许的消费者没有限制,多个消费者绑定到同一个队列时,RabbitMQ会采用轮询进行投递
如果需要消费者独占队列,在队列创建的时候,设定属性exclusive为true。
exclusive有两个作用
-
当连接关闭时connection.close()该队列是否会自动删除;
-
该队列是否是私有的private
如果设置为false则可以使用两个消费者都访问同一个队列,没有任何问题
如果设置为true则会对当前队列加锁,其他连接connection是不能访问的,同一个连接的不同channel是可以访问的。如果强制访问会报如下异常:
It could be originally declared on another connection or the exclusive property value does not match that of the original declaration., class-id=60, method-id=20)
测试模块rabbit-08-properties-04
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn4
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
{
Message message = MessageBuilder.withBody("hello world2".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.4";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.test1.4";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.test1.4";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列1
*
* @return
*/
@Bean
public Queue normalQueue1() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.exclusive()//声明只有第一个连接的消费者可见,(之后其它消费者无法访问该队列),并且在连接断开时自动删除
//.autoDelete()// 设置自动删除
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue1
* @return
*/
@Bean
public Binding bindingNormal1(DirectExchange normalExchange, Queue normalQueue1) {
return BindingBuilder.bind(normalQueue1).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
消费者
这里定义的消费者表示是在同一个Connection中消费消息的多个消费者,用来测试是否同一个Connection中的多个消费者可以消费
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReceivemessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg1(Message message) {
log.info("消费者1接收到的消息为:{}", new String(message.getBody()));
}
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg2(Message message) {
log.info("消费者2接收到的消息为:{}", new String(message.getBody()));
}
}
发送消息
这里面除了发送消息,也定义两个Connection连接测试是否允许不同Connection的连接访问
package com.longdidi;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;
import java.io.IOException;
@SpringBootApplication
public class Rabbitmq08Properties04Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties04Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
{//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("longdidi");
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调方法 当收到信息 自动执行该方法
consumerTag
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("连接1接收到信息:" + new String(body));
}
};
channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);
// 6.释放资源
channel.close();
connection.close();
}
{
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置工厂参数
factory.setHost("192.168.1.101");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123456");
factory.setVirtualHost("longdidi");
Connection connection = factory.newConnection();
//3.创建channel
Channel channel = connection.createChannel();
DefaultConsumer consumer = new DefaultConsumer(channel) {
/*
回调方法 当收到信息 自动执行该方法
consumerTag
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("连接2接收到信息:" + new String(body));
}
};
channel.basicConsume(RabbitMQConstant.QUEUE_NAME1, true, consumer);
// 6.释放资源
channel.close();
connection.close();
}
}
}
测试
-
启动程序查看
-
查看同一Connection内的多个消费者是否可以连接
注释掉主程序中的两个Connection连接代码重启测试
-
测试自动删除
停止程序运行查看队列是否自动删除
8.2.5、Arguments测试
(1)、删除属性测试
x-expires属性:当Queue(队列)在指定的时间未被访问则队列将被自动删除
测试模块rabbitmq-08-properties-05
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn5
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.5";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.5";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.5";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-expires", 10000L); //当Queue(队列)在指定的时间未被访问则队列将被自动删除
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties05Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties05Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
启动应用查看控制台
-
等待超时后查看控制台
(2)、设置队列过期时间
发布的消息在队列中存在多长时间后被取消(单位毫秒)
测试模块rabbitmq-08-properties-06
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn6
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.6";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.6";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.6";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 5000L);//发布的消息在队列中存在多长时间后被取消(单位毫秒)
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties06Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties06Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
发送消息查看控制台
-
超时后查看控制台
(3)、设置队列长度
x-max-length:Number
队列所能容下消息的最大长度,当超出长度后新消息将会覆盖最前面的消息
测试模块rabbitmq-08-properties-07
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn7
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 8; i++) {
String str = "hello world1" + i;
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.7";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.7";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.7";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;
import java.io.IOException;
@SpringBootApplication
public class Rabbitmq08Properties07Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties07Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
- 查看队列消息长度
-
查看存活消息
(4)、设置队列溢出行为
设置队列溢出行为
当达到队列的最大长度时消息的处理方式:有效值为drop-head(删除头部消息)、reject-publish(拒绝发布)或reject-publish-dlx(拒绝发布到死信交换机)
仲裁队列类型仅支持:drop-head and reject-publish两种
测试模块rabbitmq-08-properties-08
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn6
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 8; i++) {
String str = "hello world1" + i;
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.8";
public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.8";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.8";
public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.8";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.8";
public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.8";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length", 5);// 队列的溢出行为,删除头部
// arguments.put("x-overflow", "drop-head");// 队列的溢出行为,删除头部(默认行为)
//arguments.put("x-overflow", "reject-publish");//队列的溢出行为,拒绝发布
arguments.put("x-overflow", "reject-publish-dlx");//队列的溢出行为,拒绝接收消息,超过长度的消息会被发送到死信交换机而不是拒绝接收
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.deadLetterExchange(RabbitMQConstant.EXCHANGE_DLX_NAME)
.deadLetterRoutingKey(RabbitMQConstant.ROUTING_DLX_NAME1)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1)
.build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties08Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties08Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
-
当属性设置为drop-head时
如果超过队列长度,先入队的消息会被先删除(如果配置了死信交换机则会移至死信交换机)
-
当属性设置为reject-publish时
队列消息
常看消息详情
-
当属性设置为reject-publish-dlx时
拒绝接收后面的消息并将拒绝的消息放到死信交换机中
(5)、设置队列内存大小
x-max-length-bytes:Number
队列在开始从头部删除就绪消息之前可以包含的总正文大小
受限于内存大小,超过该阈值则从队列头部开始删除消息
测试模块rabbitmq-08-properties-09
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn9
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
for (int i = 0; i < 3; i++) {
String str = "你好我好大家好" + i;
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
for (int i = 0; i < 3; i++) {
String str = "hello" + i;
Message message = MessageBuilder.withBody(str.getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.9";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.9";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.9";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-max-length-bytes", 30);//队列的内存大小
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;
import java.io.IOException;
@SpringBootApplication
public class Rabbitmq08Properties09Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties09Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(6)、设置单一消费者
表示队列是否是只能有一个消费者
设置为true时注册的消费组内只有一个消费者消费消息,其他被忽略
设置为false时消息循环分发给所有消费者(默认false)
测试模块rabbitmq-08-properties-10
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn10
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
消费者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class ReceivemessageService {
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg1(Message message) {
log.info("消费者1接收到的消息为:{}", new String(message.getBody()));
}
@RabbitListener(queues = {RabbitMQConstant.QUEUE_NAME1})
public void receiveMsg2(Message message) {
log.info("消费者2接收到的消息为:{}", new String(message.getBody()));
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.10";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.10";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.10";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-single-active-consumer", true);//队列的最大长度
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.constants.RabbitMQConstant;
import com.longdidi.service.SendMessageService;
import com.rabbitmq.client.DefaultConsumer;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import com.rabbitmq.client.*;
import java.io.IOException;
@SpringBootApplication
public class Rabbitmq08Properties10Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties10Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试
(7)、设置死信交换机
测试模块rabbitmq-08-properties-11
引入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置MQ
server:
port: 8080
spring:
application:
name: properties-learn11
rabbitmq:
host: 192.168.1.101
port: 5672
username: admin
password: 123456
virtual-host: longdidi
生产者
package com.longdidi.service;
import com.longdidi.constants.RabbitMQConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageBuilder;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.Date;
@Service
@Slf4j
public class SendMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMsg() {
{
Message message = MessageBuilder.withBody("hello world1".getBytes())
.build();
rabbitTemplate.convertAndSend(RabbitMQConstant.EXCHANGE_NAME, RabbitMQConstant.ROUTING_NAME1, message);
log.info("消息发送完毕,发送时间为:{}", new Date());
}
}
}
定义常量
package com.longdidi.constants;
public class RabbitMQConstant {
// 正常交换机
public static final String EXCHANGE_NAME = "exchange.properties.normal.11";
public static final String EXCHANGE_DLX_NAME = "exchange.properties.dlx.11";
// 队列
public static final String QUEUE_NAME1 = "queue.properties.normal.11";
public static final String QUEUE_DLX_NAME1 = "queue.properties.dlx.11";
// 路由key
public static final String ROUTING_NAME1 = "key.properties.normal.11";
public static final String ROUTING_DLX_NAME1 = "key.properties.dlx.11";
}
定义MQ队列
package com.longdidi.config;
import com.longdidi.constants.RabbitMQConstant;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class RabbitConfig {
/**
* 正常交换机
*
* @return
*/
@Bean
public DirectExchange normalExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_NAME).build();
}
/**
* 正常队列
*
* @return
*/
@Bean
public Queue normalQueue() {
//Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments)
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", RabbitMQConstant.EXCHANGE_DLX_NAME);//死信交换机
arguments.put("x-dead-letter-routing-key", RabbitMQConstant.ROUTING_DLX_NAME1);//死信路由key
return QueueBuilder.durable(RabbitMQConstant.QUEUE_NAME1)
.ttl(5000)// 设置超时时间
.withArguments(arguments)
.build();
}
/**
* 正常交换机和正常队列绑定
*
* @param normalExchange
* @param normalQueue
* @return
*/
@Bean
public Binding bindingNormal(DirectExchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(RabbitMQConstant.ROUTING_NAME1);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public DirectExchange dlxExchange() {
return ExchangeBuilder.directExchange(RabbitMQConstant.EXCHANGE_DLX_NAME).build();
}
/**
* 死信队列
*
* @return
*/
@Bean
public Queue dlxQueue() {
return QueueBuilder.durable(RabbitMQConstant.QUEUE_DLX_NAME1)
.build();
}
/**
* 死信交换机和死信队列绑定
*
* @param dlxExchange
* @param dlxQueue
* @return
*/
@Bean
public Binding bindingDlx(DirectExchange dlxExchange, Queue dlxQueue) {
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(RabbitMQConstant.ROUTING_DLX_NAME1);
}
}
发送消息
package com.longdidi;
import com.longdidi.service.SendMessageService;
import jakarta.annotation.Resource;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class Rabbitmq08Properties11Application implements ApplicationRunner {
public static void main(String[] args) {
SpringApplication.run(Rabbitmq08Properties11Application.class, args);
}
@Resource
private SendMessageService sendMessageService;
/**
* 程序一启动就会运行该方法
*
* @param args
* @throws Exception
*/
@Override
public void run(ApplicationArguments args) throws Exception {
sendMessageService.sendMsg();
}
}
测试