当前位置: 首页 > article >正文

RabbitMQ深度探索:简单实现 MQ

基于多线程队列实现 MQ :

  1. 实现类:
    public class ThreadMQ {
        private static LinkedBlockingDeque<JSONObject> broker = new LinkedBlockingDeque<JSONObject>();
    
        public static void main(String[] args) {
            //创建生产者线程
            Thread producer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            Thread.sleep(1000);
                            JSONObject data = new JSONObject();
                            data.put("phone","11111111");
                            broker.offer(data);
                        }catch (Exception e){
    
                        }
                    }
                }
            },"生产者");
    
            producer.start();
            Thread consumer = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true){
                        try {
                            JSONObject data = broker.poll();
                            if(data != null){
                                System.out.println(Thread.currentThread().getName() + data.toJSONString());
                            }
                        }catch (Exception e){
    
                        }
                    }
                }
            },"消费者");
            consumer.start();
    
        }
    }

基于 netty 实现 MQ:

  1. 执行过程:
    1. 消费者 netty 客户端与 nettyServer 端 MQ 服务器保持长连接,MQ 服务器端保存消费者连接
    2. 生产者 netty 客户端发送请求给 nettyServer 端 MQ 服务器,MQ 服务器端再将消息内容发送给消费者
  2. 执行流程:
    1. 导入 Maven 依赖:
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
          <version>4.0.23.Final</version>
      </dependency>
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <dependency>
          <groupId>org.apache.commons</groupId>
          <artifactId>commons-lang3</artifactId>
          <version>3.11</version>
      </dependency>
      <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>3.6.5</version>
      </dependency>
    2. 服务端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import org.apache.commons.lang3.StringUtils;
      
      import java.io.UnsupportedEncodingException;
      import java.util.ArrayList;
      import java.util.concurrent.LinkedBlockingDeque;
      
      /**
       * @ClassName BoyatopMQServer2021
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQServer {
          public void bind(int port) throws Exception {
              /**
               * Netty 抽象出两组线程池BossGroup和WorkerGroup
               * BossGroup专门负责接收客户端的连接, WorkerGroup专门负责网络的读写。
               */
              EventLoopGroup bossGroup = new NioEventLoopGroup();
              EventLoopGroup workerGroup = new NioEventLoopGroup();
              ServerBootstrap bootstrap = new ServerBootstrap();
              try {
                  bootstrap.group(bossGroup, workerGroup)
                          // 设定NioServerSocketChannel 为服务器端
                          .channel(NioServerSocketChannel.class)
                          //BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
                          //用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,Java将使用默认值50。
                          .option(ChannelOption.SO_BACKLOG, 100)
                          // 服务器端监听数据回调Handler
                          .childHandler(new BoyatopNettyMQServer.ChildChannelHandler());
                  //绑定端口, 同步等待成功;
                  ChannelFuture future = bootstrap.bind(port).sync();
                  System.out.println("当前服务器端启动成功...");
                  //等待服务端监听端口关闭
                  future.channel().closeFuture().sync();
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //优雅关闭 线程组
                  bossGroup.shutdownGracefully();
                  workerGroup.shutdownGracefully();
              }
          }
      
          private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                  // 设置异步回调监听
                  ch.pipeline().addLast(new BoyatopNettyMQServer.MayiktServerHandler());
              }
          }
      
          public static void main(String[] args) throws Exception {
              int port = 9008;
              new BoyatopNettyMQServer().bind(port);
          }
      
          private static final String type_consumer = "consumer";
      
          private static final String type_producer = "producer";
          private static LinkedBlockingDeque<String> msgs = new LinkedBlockingDeque<>();
          private static ArrayList<ChannelHandlerContext> ctxs = new ArrayList<>();
      
          // 生产者投递消息的:topicName
          public class MayiktServerHandler extends SimpleChannelInboundHandler<Object> {
      
              /**
               * 服务器接收客户端请求
               *
               * @param ctx
               * @param data
               * @throws Exception
               */
              @Override
              protected void channelRead0(ChannelHandlerContext ctx, Object data)
                      throws Exception {
                  //ByteBuf buf=(ByteBuf)data;
                  //byte[] req = new byte[buf.readableBytes()];
                  //buf.readBytes(req);
                  //String body = new String(req, "UTF-8");
                  //System.out.println("body:"+body);
                  JSONObject clientMsg = getData(data);
                  String type = clientMsg.getString("type");
                  switch (type) {
                      case type_producer:
                          producer(clientMsg);
                          break;
                      case type_consumer:
                          consumer(ctx);
                          break;
                  }
              }
      
              private void consumer(ChannelHandlerContext ctx) {
                  // 保存消费者连接
                  ctxs.add(ctx);
                  // 主动拉取mq服务器端缓存中没有被消费的消息
                  String data = msgs.poll();
                  if (StringUtils.isEmpty(data)) {
                      return;
                  }
                  // 将该消息发送给消费者
                  byte[] req = data.getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              private void producer(JSONObject clientMsg) {
                  // 缓存生产者投递 消息
                  String msg = clientMsg.getString("msg");
                  msgs.offer(msg); //保证消息不丢失还可以缓存硬盘
      
                  //需要将该消息推送消费者
                  ctxs.forEach((ctx) -> {
                      // 将该消息发送给消费者
                      String data = msgs.poll();
                      if (data == null) {
                          return;
                      }
                      byte[] req = data.getBytes();
                      ByteBuf firstMSG = Unpooled.buffer(req.length);
                      firstMSG.writeBytes(req);
                      ctx.writeAndFlush(firstMSG);
                  });
              }
      
              private JSONObject getData(Object data) throws UnsupportedEncodingException {
                  ByteBuf buf = (ByteBuf) data;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  return JSONObject.parseObject(body);
              }
      
      
              @Override
              public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                  ctx.flush();
              }
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                      throws Exception {
      
                  ctx.close();
              }
          }
      }
    3. 生产端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class BoyatopNettyMQProducer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new BoyatopNettyMQProducer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              BoyatopNettyMQProducer client = new BoyatopNettyMQProducer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "producer");
                  JSONObject msg = new JSONObject();
                  msg.put("userId", "123456");
                  msg.put("age", "23");
                  data.put("msg", msg);
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
    4. 客户端:
      package com.qcby.springboot.MQ;
      
      import com.alibaba.fastjson.JSONObject;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.ByteBuf;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.*;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioSocketChannel;
      
      /**
       * @ClassName BoyatopNettyMQProducer
       * @Author
       * @Version V1.0
       **/
      public class NettyMQConsumer {
          public void connect(int port, String host) throws Exception {
              //配置客户端NIO 线程组
              EventLoopGroup group = new NioEventLoopGroup();
              Bootstrap client = new Bootstrap();
              try {
                  client.group(group)
                          // 设置为Netty客户端
                          .channel(NioSocketChannel.class)
                          /**
                           * ChannelOption.TCP_NODELAY参数对应于套接字选项中的TCP_NODELAY,该参数的使用与Nagle算法有关。
                           * Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到来,组装成大的数据包进行发送,虽然该算法有效提高了网络的有效负载,但是却造成了延时。
                           * 而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输。和TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。
                           */
                          .option(ChannelOption.TCP_NODELAY, true)
                          .handler(new ChannelInitializer<SocketChannel>() {
                              @Override
                              protected void initChannel(SocketChannel ch) throws Exception {
                                  ch.pipeline().addLast(new NettyMQConsumer.NettyClientHandler());
                                  1. 演示LineBasedFrameDecoder编码器
      //                            ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
      //                            ch.pipeline().addLast(new StringDecoder());
                              }
                          });
      
                  //绑定端口, 异步连接操作
                  ChannelFuture future = client.connect(host, port).sync();
                  //等待客户端连接端口关闭
                  future.channel().closeFuture().sync();
              } finally {
                  //优雅关闭 线程组
                  group.shutdownGracefully();
              }
          }
      
          public static void main(String[] args) {
              int port = 9008;
              NettyMQConsumer client = new NettyMQConsumer();
              try {
                  client.connect(port, "127.0.0.1");
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      
          public class NettyClientHandler extends ChannelInboundHandlerAdapter {
      
      
              @Override
              public void channelActive(ChannelHandlerContext ctx) throws Exception {
                  JSONObject data = new JSONObject();
                  data.put("type", "consumer");
                  // 生产发送数据
                  byte[] req = data.toJSONString().getBytes();
                  ByteBuf firstMSG = Unpooled.buffer(req.length);
                  firstMSG.writeBytes(req);
                  ctx.writeAndFlush(firstMSG);
              }
      
              /**
               * 客户端读取到服务器端数据
               *
               * @param ctx
               * @param msg
               * @throws Exception
               */
              @Override
              public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                  ByteBuf buf = (ByteBuf) msg;
                  byte[] req = new byte[buf.readableBytes()];
                  buf.readBytes(req);
                  String body = new String(req, "UTF-8");
                  System.out.println("客户端接收到服务器端请求:" + body);
              }
      
              // tcp属于双向传输
      
              @Override
              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                  ctx.close();
              }
          }
      }
  3. 持久化机制:
    1. 如果 MQ 接收到生产者投递信息,如果消费者不存在的情况下,消息是否会丢失?
    2. 答:不会丢失,消息确认机制必须要消费者消费成功之后,在通知给 MQ 服务器端,删除该消息
  4. MQ 服务器将该消息推送给消费者:
    1. 消费者已经和 MQ 服务器保持长连接
    2. 消费者在第一次启动的时候会主动拉取信息
  5. MQ 如何实现高并发思想:
    1. MQ 消费者根据自身能力情况,拉取 MQ 服务器端消费消息
    2. 默认的情况下取出一条消息
  6. 缺点:
    1. 存在延迟问题
  7. 需要考虑 MQ 消费者提高速率的问题:
    1. 如何提高消费者速率:消费者实现集群、消费者批量获取消息即可

http://www.kler.cn/a/532694.html

相关文章:

  • Intel 与 Yocto 项目的深度融合:全面解析与平台对比
  • Med-R2:基于循证医学的检索推理框架:提升大语言模型医疗问答能力的新方法
  • Rust中使用ORM框架diesel报错问题
  • LLM - 基于LM Studio本地部署DeepSeek-R1的蒸馏量化模型
  • 三路排序算法
  • 构建一个数据分析Agent:提升分析效率的实践
  • nlp文章相似度
  • STM32 串口发送与接收
  • 硬件产品经理:需求引力模型(DGM)
  • 用 OpenCV 画圆:让图像处理更简单有趣
  • 昇思打卡营第五期(MindNLP特辑)番外:硅基流动 x 华为云DeepSeek V3 API推理MindTinyRAG
  • 排序算法--冒泡排序
  • 最新版Node.js下载安装指定版本图文版教程(非常详细)
  • 动态获取脚本名称作为日志文件的名称
  • 要将DsspSeek微调为行业专用的大模型,需要结合领域知识、数据优化和模型调整策略。
  • 【Linux系统】SIGCHLD 信号(选学了解)
  • 基于微信小程序的私家车位共享系统设计与实现(LW+源码+讲解)
  • linux内核源代码中__init的作用?
  • 【仿12306项目】基于SpringCloud,使用Sentinal对抢票业务进行限流
  • Linux01——初识Linux
  • 【Python】NumPy(一):数据类型、创建数组及基本操作
  • Docker使用指南(二)——容器相关操作详解(实战案例教学,创建/使用/停止/删除)
  • 开发指南094-in语句的处理
  • Maven(Ⅱ):依赖范围,依赖传递,依赖阻断,可选依赖
  • 10分钟本地部署Deepseek-R1
  • Laravel Validation validated() 的实现