当前位置: 首页 > article >正文

Springboot + netty + rabbitmq + myBatis

目录

  • 0.为什么用消息队列
  • 1.代码文件创建结构
  • 2.pom.xml文件
  • 3.三个配置文件开发和生产环境
  • 4.Rabbitmq 基础配置类 TtlQueueConfig
  • 5.建立netty服务器 + rabbitmq消息生产者
  • 6.建立常规队列的消费者 Consumer
  • 7.建立死信队列的消费者 DeadLetterConsumer
  • 8.建立mapper.xml文件
  • 9.建立mapper文件接口
  • 10.建立接口ProducerController 测试
  • 11.测试接口请求1
  • 12.测试接口请求2
  • 13.网络助手测试NetAssist.exe
  • 14.观察rabbitmq界面管理简单介绍

0.为什么用消息队列

  1. 流量消峰
  2. 应用解耦
  3. 异步确认

1.代码文件创建结构

在这里插入图片描述

2.pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.test</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <packaging>war</packaging>
    <url/>
    <licenses>
        <license/>
    </licenses>
    <developers>
        <developer/>
    </developers>
    <scm>
        <connection/>
        <developerConnection/>
        <tag/>
        <url/>
    </scm>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.86.Final</version> <!-- 根据需要选择版本 -->
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>dynamic-datasource-spring-boot-starter</artifactId>
            <version>3.5.0</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>easyexcel</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <scope>runtime</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project> 

3.三个配置文件开发和生产环境

在这里插入图片描述
文件一 application.properties

#开发环境
spring.profiles.active=dev
#生产环境
#spring.profiles.active=prod

文件二 application-dev.properties 开发环境

spring.application.name=demo
server.servlet.context-path=/demo1
server.port=1001
#spring.main.allow-circular-references=true
spring.rabbitmq.host=实际ip地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root
spring.rabbitmq.virtual-host=/
##创建单线程监听容器 本项目目前用的是单线程  这里预期值2000 需根据实际情况调整
spring.rabbitmq.listener.simple.prefetch=2000
##创建多线程监听容器
#spring.rabbitmq.listener.direct.prefetch=2000
#spring.rabbitmq.listener.simple.acknowledge-mode=auto
# application.properties 示例
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL
#开启消息确认机制
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true


netty.server.port=1002
netty.server.bossThreads=1
netty.server.workerThreads=1


server.max-http-header-size=655360
mybatis.mapper-locations=classpath:mapper/*Mapper.xml
mybatis.type-aliases-package=com.mt.entity
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
logging.level.com.ysd.mapper=info
logging.file.name=demo.log
logging.level.com.mt.mapper=info
#mybatis.configuration.log-impl=org.apache.ibatis.logging.stdout.StdOutImpl
mybatis.configuration.map-underscore-to-camel-case=true
pagehelper.helper-dialect=mysql
pagehelper.reasonable=true
spring.main.allow-circular-references=true
spring.jackson.default-property-inclusion=non_null

#<!--=====================数据库1 ====================-->
spring.datasource.dynamic.英文数据库名称1.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.dynamic.datasource.英文数据库名称1.username=root
spring.datasource.dynamic.datasource.英文数据库名称1.password=root
spring.datasource.dynamic.datasource.英文数据库名称1.driver-class-name=com.mysql.cj.jdbc.Driver
#<!--=====================数据库2 ====================-->
spring.datasource.dynamic.英文数据库名称2.url=jdbc:mysql://ip地址:3306/英文数据库名称?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=false
spring.datasource.dynamic.datasource.英文数据库名称2.username=root
spring.datasource.dynamic.datasource.英文数据库名称2.password=root
spring.datasource.dynamic.datasource.英文数据库名称2.driver-class-name=com.mysql.cj.jdbc.Driver

4.Rabbitmq 基础配置类 TtlQueueConfig

rabbitmq基础信息配置已经在application-dev.properities中进行配置过一部分

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class TtlQueueConfig {
    public static final String X_EXCHANGE = "X";
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    public static final String DEAD_LETTER_QUEUE = "QD";

    @Bean("xExchange")
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }

    @Bean("yExchange")
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }

    @Bean("queueA")
    public Queue queueA() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 150000);//超出150秒没有被消费  就会进入死信队列
        return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
    }

    @Bean
    public Binding queueaBindingX(@Qualifier("queueA") org.springframework.amqp.core.Queue queueA,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queueA).to(xExchange).with("XA");
    }

    //声明队列 B ttl 为 40s 并绑定到对应的死信交换机
    @Bean("queueB")
    public Queue queueB() {
        Map<String, Object> args = new HashMap<>(3);
        //声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //声明当前队列的死信路由 key
        args.put("x-dead-letter-routing-key", "YD");
        //声明队列的 TTL
        args.put("x-message-ttl", 40000);//超出40秒没有被消费  就会进入死信队列
        return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
    }

    //声明队列 B 绑定 X 交换机
    @Bean
    public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
                                  @Qualifier("xExchange") DirectExchange xExchange) {
        return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
    }

    //声明死信队列 QD
    @Bean("queueD")
    public Queue queueD() {
        return new Queue(DEAD_LETTER_QUEUE);
    }

    //声明死信队列 QD 绑定关系
    @Bean
    public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
                                        @Qualifier("yExchange") DirectExchange yExchange) {
        return BindingBuilder.bind(queueD).to(yExchange).with("YD");
    }

    @Bean
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory = new SimpleRabbitListenerContainerFactory();
        //这个connectionFactory就是我们自己配置的连接工厂直接注入进来
        simpleRabbitListenerContainerFactory.setConnectionFactory(connectionFactory);
        //这边设置消息确认方式由自动确认变为手动确认
        simpleRabbitListenerContainerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        //设置消息预取数量
        // simpleRabbitListenerContainerFactory.setPrefetchCount(1);
        return simpleRabbitListenerContainerFactory;
    }
}

5.建立netty服务器 + rabbitmq消息生产者

创建服务器类 初始化启动服务器NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;




@Component
@Slf4j
public class NettyServer implements ApplicationRunner {
    @Autowired
    private RabbitTemplate rabbitTemplate;
	//有可能在项目初始化的时候加载不出来导致项目隐形报错
	//加载application-dev.proerities文件中 对应参数配置项
    @Value("${netty.server.port}")
    private int port;

    public int getPort () {
        return port;
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("netty服务启动端口"+getPort());
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // 用于接收进来的连接
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // 用于处理已经被接收的连接
        ServerBootstrap b = new ServerBootstrap();
        b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // 使用Nio的通道类型
                .childHandler(new ChannelInitializer<SocketChannel>() { // 添加一个处理器来处理接收到的数据
                    @Override
                    public void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline p = ch.pipeline();
                        p.addLast(new StringDecoder());
                        p.addLast(new StringEncoder());
                        p.addLast(new SimpleChannelInboundHandler<String>() {
                            @Override
                            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                //netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列
                                sendMessage(ctx,msg);
                            }

                            @Override
                            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                                cause.printStackTrace();
                                ctx.close();
                            }
                        });
                    }
                });

        // 绑定端口,开始接收进来的连接
        ChannelFuture f = b.bind(getPort()).sync();
        // 等待服务器socket关闭
        f.channel().closeFuture().sync();
    }
    
 	//netty监听端口消息发送消息通过rabbitmq生产者发送到消息队列中
 	//充当消息的生产者发送
    public  void  sendMessage(ChannelHandlerContext ctx, String msg){
	  //  接收netty监听信息来源作为消息生产者
        rabbitTemplate.convertAndSend("X","XA","来自QA"+msg);
        ctx.writeAndFlush("===>"+msg);
    }
}

6.建立常规队列的消费者 Consumer

import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Slf4j
@Component
public class Consumer {
    @Resource
    private MessageProcessService messProceService;
    private Map<Integer, Message> messageMap = new HashMap<>();
    private  int i =1;
    private  int j =1;
    @RabbitListener(queues = "QA")
    public void receiveQA(Message message, Channel channel) throws InterruptedException, IOException {
        i++;
        synchronized (this) {
            messageMap.put(i,message);
            if (messageMap.size() >= 1500) {
                    // 模拟数据库插入操作
        System.out.println("模拟插入数据库操作,QA队列处理消息数:" + messageMap.size());
                processMessagesBatch(channel);
                System.out.println("模拟插入成功, 准备进行下一次收集");
            }
        }
    }


//    @RabbitListener(queues = "QB")
//    public void receiveQB(Message message, Channel channel) throws IOException {
//        String msg = new String(message.getBody());
//        log.info("当前时间:{},监听QB队列信息{}", new Date().toString(), msg);
//    }

    public static class SleepUtils {
        public static void sleep(int second) {
            try {
                Thread.sleep(1000 * second);
            } catch (InterruptedException _ignored) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void processMessagesBatch(Channel channel) throws  IOException {
        List<String> list = new ArrayList<>();
        long startTime = System.currentTimeMillis();
        Map<Integer, Message> tempMessageMap = new HashMap<>(messageMap);
        messageMap.clear();
        for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
            list.add("C" + (j++) + ":===>" + "QA队列收集" + new String(map.getValue().getBody()));
        }
        int count = messProceService.add(list);
        if (count == list.size()) {
            System.out.println("插入数据成功");
            for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
                channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);
            }
            list.clear();
        }
        long durationSeconds = (System.currentTimeMillis() - startTime) / 1000;
        System.out.println("插入1500条数据执行时间: " + durationSeconds);
    }
}

7.建立死信队列的消费者 DeadLetterConsumer

import com.rabbitmq.client.Channel;
import com.test.demo.service.MessageProcessService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.io.IOException;
import java.util.*;

@Slf4j
@Component
public class DeadLetterConsumer {
    @Resource
    private MessageProcessService messProceService;
    private Map<Integer, Message> messageMap = new HashMap<>();
    private int i = 1;
    private  int j =1;
    @RabbitListener(queues = "QD")
    public void receiveD(Message message, Channel channel) throws IOException {
        i++;
        synchronized (this) {
            messageMap.put(i, message);
            if (messageMap.size() >= 100) {
                // 模拟数据库插入操作
                System.out.println("模拟插入数据库操作,死信队列处理消息数:" + messageMap.size());
                processMessagesBatch(channel);
                System.out.println("模拟插入成功, 准备进行下一次收集");
                // 确认当前消息,以便RabbitMQ知道它可以释放此消息
            }
        }
    }

    private void processMessagesBatch(Channel channel) throws  IOException {
        List<String> list = new ArrayList<>();
        // 复制当前消息映射以避免在迭代时修改
        Map<Integer, Message> tempMessageMap = new HashMap<>(messageMap);
        messageMap.clear(); 
        long startTime = System.currentTimeMillis();
        for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
            list.add("C" + (j++) + ":===>" + "死信队列收集" + new String(map.getValue().getBody()));
        }
        int count = messProceService.add(list);
        long durationSeconds = (System.currentTimeMillis() - startTime) / 1000;
        System.out.println("插入50条数据执行时间: " + durationSeconds);
        if (count == list.size()) {
            System.out.println("插入数据成功");
            list.clear();
            for (Map.Entry<Integer, Message> map : tempMessageMap.entrySet()) {
                channel.basicAck(map.getValue().getMessageProperties().getDeliveryTag(), false);
            }
        }
    }

}

8.建立mapper.xml文件

MessageProcessMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.test.demo.mapper.MessageProcessMapper">
    <insert id="add">
        INSERT INTO t_xinyang_direct (direct,create_date)
        VALUES
        <foreach collection="list" item="item" separator=",">
            (
            #{item},
             SYSDATE()
            )
        </foreach>
    </insert>
</mapper>

9.建立mapper文件接口

MessageProcessMapper


import com.baomidou.dynamic.datasource.annotation.DS;
import org.apache.ibatis.annotations.Mapper;

import java.util.List;

@Mapper
@DS("英文数据库名称1")
public interface MessageProcessMapper {
    int add(List<String> list);
}

10.建立接口ProducerController 测试

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@RestController
@RequestMapping("/api")
public class ProducerController {
    @GetMapping("/hello/{message}")
    public ResponseEntity<String> sayHello(@PathVariable String message) {
        return ResponseEntity.ok("Hello, RabbitMQ!==>"+message);
    }

    @Autowired
    private RabbitTemplate rabbitTemplate;
    @GetMapping("/sendMsg/{message}")
    public void sendMsg(@PathVariable String message){
        log.info("接收接口发送的消息请求");
        rabbitTemplate.convertAndSend("X","XA","消息来自tt1为10s的队列"+message);
        rabbitTemplate.convertAndSend("X","XB","消息来自tt1为150s的队列"+message);
    }

}

11.测试接口请求1

测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/hello/返回浏览器输入内容
在这里插入图片描述

12.测试接口请求2

测试请求接口 /hello
http://实际本机ip地址:1001/demo1/api/sendRabbitMq/发给rabbitmq消息
在这里插入图片描述

13.网络助手测试NetAssist.exe

主要目的模拟向netty端口进行发送数据,通过netty监听到的信息然后通过rabbitmq的生产者发送rabbitmq的队列中,让消费者进行消费,如果消费者绑定死信队列,那么消费者从队列中取出消息后,经过一定时间未确认即不进行消费确认或者拒绝,然后入之前绑定好的死信队列中,供死信队列绑定的死信消费者进行消费处理。

14.观察rabbitmq界面管理简单介绍

在这里插入图片描述


http://www.kler.cn/a/330398.html

相关文章:

  • Cesium加载地形
  • USB学习——基本概念
  • Android系统定制APP开发_如何对应用进行系统签名
  • 【Powershell】Windows大法powershell好(二)
  • shell基础使用及vim的常用快捷键
  • Autoencoder(李宏毅)机器学习 2023 Spring HW8 (Boss Baseline)
  • oracle 新建用户,用户插入数据报错:ORA-01950: 对表空间 ‘USERS‘ 无权限
  • 23 vue3之详解scoped样式穿透vuecss新特性
  • Java面试题之JVM面试题
  • 2. 将GitHub上的开源项目导入(clone)到(Linux)服务器上——深度学习·科研实践·从0到1
  • 攻防世界----->easyre-153
  • 在PC端连接苹果手机(iPhone)时,即使已经开启了开发者模式(开发者权限),但仍然无法成功连接,是什么原因?
  • C++ 语言特性11 - 继承构造函数
  • 洞悉go.dev
  • 集师专属心理咨询 心理培训 心理知识服务小程序搭建 专属知识付费小程序搭建
  • VScode 自定义代码配色方案
  • 关于 Latex 使用 BibTeX 进行参考文献管理的相关
  • 数据资料安全治理新时代,AI/ML 来助力!
  • Find My汽车钥匙|苹果Find My技术与钥匙结合,智能防丢,全球定位
  • SD card + Arduino IDE 如何提高SD card的读写速度
  • 音视频入门
  • 【LeetCode HOT 100】详细题解之二叉树篇
  • 汽车信息安全 -- 存到HSM中的密钥还需包裹吗?
  • some 蓝桥杯题
  • 不同版本的 Selenium 和 WebDriver 的 API 兼容性问题
  • cGANs with Projection Discriminator