【RabbitMQ】08-延迟消息
1. 延迟消息
2. 死信交换机
正常队列不需要接受消息。
@Configuration
public class NormalQueueConfig {
@Bean
public DirectExchange normalExchange() {
return new DirectExchange("normal.direct");
}
@Bean
public Queue normalQueue() {
return QueueBuilder
.durable("normal.queue")
.deadLetterExchange("dlx.direct")
.build();
}
@Bean
public Binding normalBings(Queue normalQueue, DirectExchange normalExchange) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with("hi");
}
}
死信交换机需要key和normalQueue一样。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "dlx.queue", durable = "true"),
exchange = @Exchange(name = "dlx.direct", type = ExchangeTypes.DIRECT),
key = {"hi"}
))
public void listenDlxQueue(String msg) {
log.info("消费者监听 dlx.message:【{}】", msg);
}
生产者
@Test
void testSendDelayMessage() {
rabbitTemplate.convertAndSend("normal.direct", "hi", "hello", message -> {
message.getMessageProperties().setExpiration("10000");
return message;
});
}
3. DelayExchange插件
插件
docker volume inspect mq-plugins
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
生产者
@Test
void testDelayMessageByPlugin() {
rabbitTemplate.convertAndSend("delay.direct", "hi", "hello", message -> {
message.getMessageProperties().setDelay(10000);
return message;
});
}
消费者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = {"hi"}
))
public void listenDelayQueue(String msg) {
log.info("消费者监听到 delay.queue的消息:【{}】", msg);
}