- yml 文件:
spring:
rabbitmq:
####连接地址
host: 127.0.0.1
####端口号
port: 5672
####账号
username: guest
####密码
password: guest
### 地址
virtual-host: boyatopVirtualHost
listener:
simple:
retry:
#开启消费者进行重试(程序异常的情况)
enabled: true
#最大重试次数
max-attempts: 5
#重试间隔时间
initial-interval: 3000
#手动确认机制
acknowledge-mode: manual
datasource:
url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8
username: root
password: root
driver-class-name: com.mysql.jdbc.Driver
boyatop:
#备胎交换机
dlx:
exchange: boyatop_dlx_exchange
queue: boyatop_dlx_queue
routingKey: dlx
#普通交换机
order:
exchange: boyatop_order_exchange
queue: boyatop_order_queue
routingKey: order
- 配置类:
@Component
public class IdempotentExchangeConfig {
//交换机
@Value("${boyatop.order.exchange}")
private String order_exchange;
//普通队列
@Value("${boyatop.order.queue}")
private String order_queue;
//普通队列的 key
@Value("${boyatop.order.routingKey}")
private String order_rotingKey;
//死信交换机
@Value("${boyatop.dlx.exchange}")
private String dlx_exchange;
//死信队列
@Value("${boyatop.dlx.queue}")
private String dlx_queue;
//死信队列的 key
@Value("${boyatop.dlx.routingKey}")
private String dlx_routingKey;
//定义死信交换机
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(dlx_exchange);
}
//定义死信队列
@Bean
public Queue dlxQueue(){
return new Queue(dlx_queue);
}
//定义普通交换机
@Bean
public DirectExchange orderExchange(){
return new DirectExchange(order_exchange);
}
//定义普通队列
@Bean
public Queue orderQueue(){
//订单队列绑定死信交换机
Map<String,Object> arguments = new HashMap<>(2);
arguments.put("x-dead-letter-exchange",dlx_exchange);
arguments.put("x-dead-letter-routing-key",dlx_routingKey);
return new Queue(order_queue,true,false,false,arguments);
// return QueueBuilder.durable(order_queue).withArguments(arguments).build();
}
//订单队列绑定交换机
@Bean
public Binding bindingOrderExchange(DirectExchange orderExchange, Queue orderQueue){
return BindingBuilder.bind(orderQueue)
.to(orderExchange)
.with(order_rotingKey);
}
//死信队列绑定交换机
@Bean
public Binding bindingDlxExchange(DirectExchange dlxExchange, Queue dlxQueue){
return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlx_routingKey);
}
}
- 实体类:
@Data
@NoArgsConstructor
public class OrderEntity implements Serializable {
private Integer id;
private String orderName;
private String orderId;
public OrderEntity(String orderName, String orderId) {
this.orderName = orderName;
this.orderId = orderId;
}
}
- Mapper:
public interface OrderMapper {
@Insert("INSERT into order_entity value (null,#{orderName},#{orderId})")
int addOrder(OrderEntity orderEntity);
@Select("select * from order_entity where order_id = #{orderId} ")
OrderEntity getOrder(String orderId);
}
- 生产者:
@Component
@Slf4j
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${boyatop.order.exchange}")
private String order_exchange;
//普通队列的 key
@Value("${boyatop.order.routingKey}")
private String order_rotingKey;
public void sendMsg(String orderName,String orderId){
OrderEntity orderEntity = new OrderEntity(orderName,orderId);
rabbitTemplate.convertAndSend(order_exchange,order_rotingKey,orderEntity,message -> {
message.getMessageProperties().setExpiration("5000");
return message;
});
}
}
- 消费者:
@Component
@Slf4j
@RabbitListener(queues = "boyatop_order_queue")
public class OrderConsumer {
@Autowired
private OrderMapper orderMapper;
@RabbitHandler
public void process(OrderEntity orderEntity, Message message, Channel channel){
try{
String orderId = orderEntity.getOrderId();
if(StringUtils.isEmpty(orderId)){
return;
}
OrderEntity dbOrderEntity = orderMapper.getOrder(orderId);
if(dbOrderEntity != null){
//出现异常,消息拒收,进入死信队列人为处理
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}
int result = orderMapper.addOrder(orderEntity);
//出现异常
int i = 1 / 0;
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("监听内容:" + orderEntity);
}catch (Exception e){
// 记录该消息日志形式 存放数据库db中、后期通过定时任务实现消息补偿、人工实现补偿
//将该消息存放到死信队列中,单独写一个死信消费者实现消费。
}
}
}