当前位置: 首页 > article >正文

springboot框架使用RabbitMQ举例代码

以前分享过一个理论有兴趣的小伙伴可以看下
https://blog.csdn.net/Drug_/article/details/138164180

不多说 还是直接上代码

第一步:引入依赖 可以不指定版本

     <!--        amqp -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

第二步 配置文件

  #配置rabbitMq 服务器
  rabbitmq:
    host: ${rabbitmq.rabbitmqHost}
    port: ${rabbitmq.rabbitmqPort}
    username: ${rabbitmq.rabbitmqUsername}
    password: ${rabbitmq.rabbitmqPassword}
    virtual-host: ${rabbitmq.rabbitmqVhost}
    connection-timeout: 5000
    #消费者配置
    listener:
      simple:
# 不建议使用 自带的重试配置  因为有几种情况会失效  在网上摘抄的 网友的测试
#  重试机制使用场景:
#  1. 如果是业务代码,比如空指针之类的异常那重试机制其实没什么用
#  2. 代码中不能使用try/catch捕获异常,否则重试机制失效
#  我在消费者 使用了 try 发现 确实失效了  所以 我觉得 需要手动在消费者里累计重试次数    自行处理异常
#        retry:
#          enabled: true  #开启消费者retry重试机制
#          max-attempts: 3  # 最大重试次数
#          multiplier: 2.0 # 重试间隔时间倍数
#          initial-interval: 1000 # 初始重试间隔时间(毫秒)
#          max-interval: 10000 # 最大重试间隔时间(毫秒)
        acknowledge-mode: manual # 手动确认消息,防止消息丢失 auto manual手动确认模式
        default-requeue-rejected: true #是否将拒绝的消息重新入队。默认是 true,即拒绝的消息会重新入队。 配合手动确认模式
        concurrency: 1 #: 消费者线程池的并发数。设置同时处理的消费者数量
        max-concurrency: 1 #最大并发消费者数量
        prefetch: 1 # 限制每个消费者一次可以获取的消息数量,防止消息在某个消费者身上发生阻塞
    #生产者配置
#    publisher-returns: true  # 启用发布者返回模式。设置为 true 启用,确保如果消息无法路由到目标队列,则会返回给生产者。
    # none: 不启用发布者确认。
    # correlated: 启用发布者确认并使用 CorrelationData 对象,可以在回调中进行处理。
    #: 启用简单的发布者确认模式,不带 CorrelationData。
#    publisher-confirm-type: none

第三步定义常量 :

package com.testweb.testweb.rabbitmq.web;

/**
 * User:Json
 * Date: 2024/9/3
 **/
public class MqConstant {

    public static final String TestDirectRouting = "rabbitmq.TestDirectRouting";
    public static final String TestDirectQueue = "rabbitmq.TestDirectQueue";
    public static final String TestDirectExchange = "rabbitmq.TestDirectExchange";
}

第四步 消费者定义:

package com.testweb.testweb.rabbitmq.web.consumer;

import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import com.rabbitmq.client.Channel;

import java.util.HashMap;
import java.util.Map;

/**
 * User:Json
 * Date: 2024/9/3
 * 消费者
 **/
@Configuration
public class TestConsumer {

    //队列
    @Bean
    public Queue TestDirectQueue() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-ha-policy", "all"); //将队列设置为在集群中的所有节点上都可用
        return new Queue(MqConstant.TestDirectQueue, true, false, false, args);
    }


    @Bean
    public DirectExchange TestDirectExchange() {
        return new DirectExchange(MqConstant.TestDirectExchange, true, false);
    }


    @Bean
    public Binding bindingDirect() {
        return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with(MqConstant.TestDirectRouting);
    }


    @RabbitListener(queues = MqConstant.TestDirectQueue)
    public void process1(Message testMessage, Channel channel) {
        // 消息的唯一标识id
        long deliveryTag = testMessage.getMessageProperties().getDeliveryTag();
        //重试次数
        Integer retryCount =(Integer) testMessage.getMessageProperties().getHeaders().getOrDefault("retryCount", 0);

        try {
            // 处理消息的业务逻辑
            System.out.println("Received order message: " + new String(testMessage.getBody()));
            //假装异常
            int a=  1/0;
            // 手动确认消息
            // deliveryTag 唯一标识
            // multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            if (retryCount < 3) { // 设置最大重试次数
                try {

                    System.out.println("处理失败,拒绝消息并重新入队 :" + testMessage);
                    MessageProperties messageProperties = new MessageProperties();
                    messageProperties.setHeader("retryCount", retryCount + 1);
                    Message newMessage = new Message(testMessage.getBody(), messageProperties);

                    // 重新入队  未写完
                    channel.basicPublish(MqConstant.TestDirectExchange, MqConstant.TestDirectRouting, null, newMessage.getBody());
                    // 手动确认原消息,防止死循环
                    channel.basicAck(deliveryTag, false);

                    // 处理失败,拒绝消息并重新入队 方式1
                    // 消息标识 deliveryTag,
                    // multiple 是否批量拒绝,true 表示拒绝当前及之前的所有消息,false 表示仅拒绝当前消息,,
                    // requeue  是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃
                //    channel.basicNack(deliveryTag, false, true);
                    // 处理失败,拒绝消息并重新入队 方式2
                    // 消息标识 deliveryTag
                    // requeue 是否将消息重新放回队列,true 表示重新放入队列,false 表示丢弃。
                    //channel.basicReject(long deliveryTag, boolean requeue);
//                3. 使用场景
//                basicNack:
//                当你需要拒绝一批消息时,使用 basicNack 是更好的选择,尤其是当你想在消费失败时批量拒绝多条消息。
//                适用于更复杂的场景,比如一次性处理多个未确认的消息。
//                basicReject:
//                当你只想拒绝当前消息时,basicReject 是一个简化的选择。它通常用于更简单的场景,只需处理当前消息即可。
//                适合处理单个消息的拒绝。

                    //如果你在消费者 里 只写了 消息确定 没有写 如果异常后 的处理 默认是不会把消息重新放回队列的
                } catch (Exception nackException) {
                    System.out.println("重新入队失败!!!");
                    // 处理 nack 失败的情况
                    nackException.printStackTrace();
                }
            } else {
                System.out.println("达到最大重试次数 将消息发送到死信队列或进行其他处理!!!");
                try {
                    channel.basicReject(deliveryTag, false); // 丢弃消息或转发到死信队列
                } catch (Exception rejectException) {
                    rejectException.printStackTrace();
                }
            }

        }
    }


}

第五步:生产者

package com.testweb.testweb.rabbitmq.web.producer;

import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * User:Json
 * Date: 2024/9/3
 * 生产者
 **/
@Component
public class TestProducer {
    @Resource
    AmqpTemplate amqpTemplate;


    public <T> void produce(T payload){
        amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,
                MqConstant.TestDirectRouting, payload);
    }
}

第六步 测试:

package com.testweb.testweb.rabbitmq.web.controller;

import com.testweb.testweb.rabbitmq.web.MqConstant;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * User:Json
 * Date: 2024/9/3
 **/
@RestController
@RequestMapping("/testMq")
public class TestMqController {

    @Resource
    AmqpTemplate amqpTemplate;

    @GetMapping("test")
    @CrossOrigin(origins = "*")
    public void test(@RequestParam String msg){
       amqpTemplate.convertAndSend(MqConstant.TestDirectExchange,
                MqConstant.TestDirectRouting, msg);
    }

}


http://www.kler.cn/a/375545.html

相关文章:

  • 【JavaEE初阶 — 多线程】Thread类的属性
  • 原生鸿蒙的竞争力到底如何?
  • 【AI日记】24.11.01 LangChain、openai api和github copilot
  • Unix 中文件权限设置
  • oc的若干方法转为swift 请求不执行
  • 【Flask】四、flask连接并操作数据库
  • ansible详细介绍和具体步骤
  • 4路CAN转WiFi网关
  • 《影像科学与光化学》
  • php反序列化常见魔术方法整理
  • 硅谷甄选(三)登录注册
  • Cloud Native Spring in Action
  • 排序算法:从原理到 Java 实现
  • 【JavaGuide】十大经典排序算法总结
  • B站狂神说+mybatis增删改查操作
  • 提升网站安全性 HTTPS的重要性与应用指南
  • 后端:Spring、Spring Boot-实例化Bean依赖注入(DI)
  • 【android12】【AHandler】【3.AHandler原理篇AHandler类方法全解】
  • (10)文件操作
  • linux虚拟机上使用USB转串口工具
  • HTTP 全知道:请求与响应的深度剖析
  • Spring Boot JPA中的Page组件详解
  • 【时间之外】IT人求职和创业应知【23】
  • 【日常记录-Java】JarFile
  • Gorilla Mk1机器人:CubeMars电机加持,助力高空作业新突破
  • 3259. 超级饮料的最大强化能量