【基于RabbitMQ的消息队列服务器模拟实现】
基于RabbitMQ的消息队列服务器模拟实现
消息队列(Message Queue)是一种在分布式系统中实现异步通信的技术。它允许不同的应用或服务通过发送和接收消息来协同工作,而无需直接调用对方的接口,从而解耦系统、提升可靠性和扩展性。
1. 消息队列介绍
1.1 核心概念
- 生产者(Producer):发送消息的应用(例如:订单系统生成订单后发送消息)。
- 消费者(Consumer):接收并处理消息的应用(例如:库存系统根据消息扣减库存)。
- 队列(Queue):消息的存储容器,确保消息按顺序传递。
- 消息(Message):传递的数据单元,通常包含业务内容(如 JSON、文本等)。
1.2 为什么需要消息队列?
-
解耦系统
- 生产者与消费者无需知道对方的存在,修改一方不影响另一方。
- 示例:订单系统与物流系统通过队列通信,无需直接对接。
-
异步处理
- 生产者发送消息后即可返回,消费者异步处理。
- 示例:用户注册后,异步发送邮件或短信通知。
-
流量削峰
- 突发流量时,队列缓冲请求,避免系统过载。
- 示例:电商秒杀活动中,将用户请求排队处理。
-
可靠性
- 消息持久化存储,即使消费者宕机,重启后仍可继续处理。
- 示例:支付系统确保交易消息不丢失。
1.3 常见应用场景
-
任务异步化
- 耗时操作(如生成报表、图片处理)交给后台异步处理。
-
微服务通信
- 服务间通过消息传递数据,避免直接依赖(如订单服务通知库存服务)。
-
日志收集
- 多个应用将日志发送到队列,统一由日志处理服务消费。
-
事件驱动架构
- 基于事件触发的系统(如库存不足时触发补货)。
2. 消息队列的作用
消息队列通过异步通信和缓冲机制实现系统解耦与流量削峰填谷,以下是具体原理和应用方式:
2.1 解耦(Decoupling)
1. 解耦的实现原理
-
生产者与消费者隔离:
生产者将消息发送到队列后即可返回,无需等待消费者处理;消费者只需监听队列获取消息,无需知道消息来源。双方仅依赖队列,不直接通信。 -
动态扩展:
新增消费者时(如增加库存服务),只需绑定到队列,生产者无需修改代码;同样,生产者增减也不影响消费者。
2. 典型场景
- 订单系统与下游服务:
- 订单服务生成订单后,发送消息到队列。
- 库存服务、物流服务、通知服务各自订阅队列,独立处理消息。
- 优势:物流系统升级或新增积分服务时,订单系统无需改动。
3. 技术实现(以 RabbitMQ 为例)
- 生产者:声明交换机,发送消息时指定路由键。
- 消费者:声明队列并绑定到交换机,根据路由键订阅所需消息。
2.2 削峰填谷(Traffic Shaping)
1. 削峰填谷的原理
- 削峰(Peak Shaving):
突发流量(如秒杀)时,消息队列作为缓冲区,暂存大量请求,避免直接压垮后端服务。 - 填谷(Valley Filling):
流量低谷时,消费者按固定速率处理队列中的积压消息,保持系统稳定。
2. 典型场景
- 电商秒杀:
- 用户抢购请求写入队列,而非直接访问数据库。
- 库存服务以可控速率(如每秒处理100个请求)消费队列,避免数据库崩溃。
- 优势:即使瞬间涌入10万请求,系统仍按最大处理能力平稳运行。
3. 技术实现
- 限流控制:
- 消费者设置预取数量(Prefetch Count),限制同时处理的消息数。
- 队列容量策略:
- 设置队列最大长度或消息TTL(存活时间),超限时丢弃旧消息或进入死信队列。
2.3 结合解耦与削峰的实践案例
场景:用户注册流程
-
传统同步模式:
- 注册接口需同步调用数据库写入、发送邮件、短信通知、初始化用户积分等服务。
- 问题:任一服务延迟会导致注册响应变慢,且服务间强耦合。
-
消息队列改造后:
- 注册服务将用户数据写入消息队列后立即返回。
- 邮件服务、短信服务、积分服务异步消费消息,各自独立处理。
- 优势:
- 注册响应速度提升(解耦)。
- 突发注册流量时,队列缓冲请求,服务按最大能力消费(削峰)。
消息队列通过异步通信解耦系统,通过缓冲机制削峰填谷,二者共同提升系统的:
- 弹性:应对流量波动时更从容。
- 可维护性:服务间依赖减少,易于迭代和扩展。
- 可靠性:即使部分服务暂时不可用,消息仍可安全存储等待恢复。
3. 典型消息队列产品
- RabbitMQ
- 基于 AMQP 协议,支持灵活的路由规则(如直连、主题、广播)。
- 适合对可靠性要求高的场景。
- Kafka
- 高吞吐量,支持持久化日志和流处理。
- 适合大数据场景(如日志聚合、实时分析)。
- RocketMQ
- 阿里开源,支持事务消息、顺序消息。
- 适合金融、电商等复杂场景。
- Redis Streams
- 轻量级,基于内存,适合简单场景或已有 Redis 的系统。
4. 消息队列的需求分析与设计
4.1 需求背景
核心目标是模拟 RabbitMQ 的核心功能,用于学习消息队列底层原理或构建最小可用原型。功能需覆盖生产消费模型、路由机制、基础可靠性保障。
4.2 核心功能需求
模块 | 功能描述 |
---|---|
协议层 | 自定义简单协议(类 AMQP 精简版)以及TCP,支持生产、消费、确认操作。 |
网络通信 | 基于 TCP 实现客户端-服务器通信,支持多客户端连接。 |
消息存储 | 内存队列(可扩展为磁盘持久化),支持队列创建、消息入队/出队。 |
路由机制 | 实现 Direct/Topic/Fanout 交换机,支持绑定规则与消息路由。 |
可靠性 | 消息确认机制(ACK/NACK)、消息重试(超时未确认重投递)。 |
管理接口 | 命令行或 HTTP API 管理队列、交换机、监控状态。 |
5. 消息队列服务器内部核心概念
BrokerServer 是消息队列的核心服务端组件,负责接收、存储和路由消息。以下是其内部涉及的核心概念及交互逻辑:
5.1 虚拟主机(Virtual Host)
- 作用:逻辑隔离不同业务的消息队列(类似命名空间)。
- 实现:每个虚拟主机独立管理交换机、队列和绑定关系。
5.2 连接(Connection)与信道(Channel)
- 连接(Connection):
- 客户端与 BrokerServer 之间的 TCP 长连接。
- 负责身份验证(如用户名/密码)、资源分配。
- 信道(Channel):
- 基于连接的轻量级逻辑通道(复用 TCP 连接)。
- 每个信道独立处理消息生产/消费操作,避免线程阻塞。
5.3 交换机(Exchange)
-
作用:接收生产者消息,按类型和路由规则分发到队列。
-
核心类型:
类型 路由规则 典型场景 Direct 精确匹配 routing_key
订单状态更新(如 order.paid
)Topic 通配符匹配( *
单层,#
多层)日志分类(如 logs.error.*
)Fanout 广播到所有绑定队列 新闻推送
5.4 绑定(Binding)
- 作用:定义交换机与队列之间的路由规则。
- 关键要素:
- 交换机:指定绑定到哪个交换机。
- 队列:指定目标队列。
- 路由键(Routing Key):Fanout/Topic 交换机的匹配规则。
5.5 消息路由流程
BrokerServer 内部的消息处理流程如下:
- 生产者发送消息到指定交换机,附带
routing_key
或headers
。 - 交换机匹配绑定规则,将消息路由到符合条件的队列。
- 队列存储消息,等待消费者拉取。
- 消费者订阅队列,通过轮询或推送机制获取消息。
- 消费者处理完成后发送 ACK,Broker 删除消息(若开启手动确认)。
5.6 持久化(Persistence)
- 队列持久化:
durable=True
时,队列元数据存盘(队列本身存在,但消息不保证)。 - 消息持久化:生产者发送消息时设置
delivery_mode=2
,消息内容存盘。
5.7 消息确认(Acknowledgement)
- 自动确认(Auto Ack):
- 消费者收到消息后,Broker 立即标记消息为已处理(存在丢失风险)。
- 手动确认(Manual Ack):
- 消费者处理完成后显式发送 ACK,Broker 才删除消息。
- 支持
basic_nack
拒绝消息(可选择重新入队或丢弃)。
6. 消息队列服务器模块划分与部分核心代码展示
根据图中的结构,以下是消息队列服务器的模块划分及部分代码(此处由于篇幅过长,省略所有类的get/set方法):
6.1 服务器模块
-
职责:管理服务器核心功能及资源隔离。
-
子模块:
-
消息队列系统服务器本体
/** * 这个 BrokerServer 就是 消息队列 本体服务器. * 本质上就是一个 TCP 的服务器. */ public class BrokerServer { private ServerSocket serverSocket = null; // 当前考虑一个 BrokerServer 上只有一个 虚拟主机 private VirtualHost virtualHost = new VirtualHost("default"); // 使用这个 哈希表 表示当前的所有会话(有哪些客户端正在和咱们的服务器进行通信) // 此处的 key 是 channelId, value 为对应的 Socket 对象 private ConcurrentHashMap<String, Socket> sessions = new ConcurrentHashMap<String, Socket>(); // 引入一个线程池, 来处理多个客户端的请求. private ExecutorService executorService = null; // 引入一个 boolean 变量控制服务器是否继续运行 private volatile boolean runnable = true; public BrokerServer(int port) throws IOException { serverSocket = new ServerSocket(port); } public void start() throws IOException { System.out.println("[BrokerServer] 启动!"); executorService = Executors.newCachedThreadPool(); try { while (runnable) { Socket clientSocket = serverSocket.accept(); // 把处理连接的逻辑丢给这个线程池. executorService.submit(() -> { processConnection(clientSocket); }); } } catch (SocketException e) { System.out.println("[BrokerServer] 服务器停止运行!"); // e.printStackTrace(); } } /** 一般来说停止服务器, 就是直接 kill 掉对应进程就行了. * 此处写一个单独的停止方法. 主要是用于后续的单元测试. */ public void stop() throws IOException { runnable = false; // 把线程池中的任务都放弃了. 让线程都销毁. executorService.shutdownNow(); serverSocket.close(); } // 通过这个方法, 来处理一个客户端的连接.在这一个连接中, 可能会涉及到多个请求和响应. private void processConnection(Socket clientSocket) { try (InputStream inputStream = clientSocket.getInputStream(); OutputStream outputStream = clientSocket.getOutputStream()) { // 这里需要按照特定格式来读取并解析. 此时就需要用到 DataInputStream 和 DataOutputStream try (DataInputStream dataInputStream = new DataInputStream(inputStream); DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { while (true) { // 1. 读取请求并解析. Request request = readRequest(dataInputStream); // 2. 根据请求计算响应 Response response = process(request, clientSocket); // 3. 把响应写回给客户端 writeResponse(dataOutputStream, response); } } } catch (EOFException | SocketException e) { // 对于这个代码, DataInputStream 如果读到 EOF , 就会抛出一个 EOFException 异常, 并且需要借助这个异常来结束循环 System.out.println("[BrokerServer] connection 关闭! 客户端的地址: " + clientSocket.getInetAddress().toString() + ":" + clientSocket.getPort()); } catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[BrokerServer] connection 出现异常!"); e.printStackTrace(); } finally { try { // 当连接处理完了, 就需要记得关闭 socket clientSocket.close(); // 一个 TCP 连接中, 可能包含多个 channel. 需要把当前这个 socket 对应的所有 channel 也顺便清理掉. clearClosedSession(clientSocket); } catch (IOException e) { e.printStackTrace(); } } } private Request readRequest(DataInputStream dataInputStream) throws IOException { Request request = new Request(); request.setType(dataInputStream.readInt()); request.setLength(dataInputStream.readInt()); byte[] payload = new byte[request.getLength()]; int n = dataInputStream.read(payload); if (n != request.getLength()) { throw new IOException("读取请求格式出错!"); } request.setPayload(payload); return request; } private void writeResponse(DataOutputStream dataOutputStream, Response response) throws IOException { dataOutputStream.writeInt(response.getType()); dataOutputStream.writeInt(response.getLength()); dataOutputStream.write(response.getPayload()); // 这个刷新缓冲区也是重要的操作! dataOutputStream.flush(); } private Response process(Request request, Socket clientSocket) throws IOException, ClassNotFoundException, MqException { // 1. 把 request 中的 payload 做一个初步的解析. BasicArguments basicArguments = (BasicArguments) BinaryTool.fromBytes(request.getPayload()); System.out.println("[Request] rid=" + basicArguments.getRid() + ", channelId=" + basicArguments.getChannelId() + ", type=" + request.getType() + ", length=" + request.getLength()); // 2. 根据 type 的值, 来进一步区分接下来这次请求要干啥. boolean ok = true; if (request.getType() == 0x1) { // 创建 channel sessions.put(basicArguments.getChannelId(), clientSocket); System.out.println("[BrokerServer] 创建 channel 完成! channelId=" + basicArguments.getChannelId()); } else if (request.getType() == 0x2) { // 销毁 channel sessions.remove(basicArguments.getChannelId()); System.out.println("[BrokerServer] 销毁 channel 完成! channelId=" + basicArguments.getChannelId()); } else if (request.getType() == 0x3) { // 创建交换机. 此时 payload 就是 ExchangeDeclareArguments 对象了. ExchangeDeclareArguments arguments = (ExchangeDeclareArguments) basicArguments; ok = virtualHost.exchangeDeclare(arguments.getExchangeName(), arguments.getExchangeType(), arguments.isDurable(), arguments.isAutoDelete(), arguments.getArguments()); } else if (request.getType() == 0x4) { ExchangeDeleteArguments arguments = (ExchangeDeleteArguments) basicArguments; ok = virtualHost.exchangeDelete(arguments.getExchangeName()); } else if (request.getType() == 0x5) { QueueDeclareArguments arguments = (QueueDeclareArguments) basicArguments; ok = virtualHost.queueDeclare(arguments.getQueueName(), arguments.isDurable(), arguments.isExclusive(), arguments.isAutoDelete(), arguments.getArguments()); } else if (request.getType() == 0x6) { QueueDeleteArguments arguments = (QueueDeleteArguments) basicArguments; ok = virtualHost.queueDelete((arguments.getQueueName())); } else if (request.getType() == 0x7) { QueueBindArguments arguments = (QueueBindArguments) basicArguments; ok = virtualHost.queueBind(arguments.getQueueName(), arguments.getExchangeName(), arguments.getBindingKey()); } else if (request.getType() == 0x8) { QueueUnbindArguments arguments = (QueueUnbindArguments) basicArguments; ok = virtualHost.queueUnbind(arguments.getQueueName(), arguments.getExchangeName()); } else if (request.getType() == 0x9) { BasicPublishArguments arguments = (BasicPublishArguments) basicArguments; ok = virtualHost.basicPublish(arguments.getExchangeName(), arguments.getRoutingKey(), arguments.getBasicProperties(), arguments.getBody()); } else if (request.getType() == 0xa) { BasicConsumeArguments arguments = (BasicConsumeArguments) basicArguments; ok = virtualHost.basicConsume(arguments.getConsumerTag(), arguments.getQueueName(), arguments.isAutoAck(), new Consumer() { // 这个回调函数要做的工作, 就是把服务器收到的消息可以直接推送回对应的消费者客户端 @Override public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException { /* 先知道当前这个收到的消息, 要发给哪个客户端. * 此处 consumerTag 其实是 channelId. 根据 channelId 去 sessions 中查询, * 就可以得到对应的socket 对象了, 从而可以往里面发送数据了 * 1. 根据 channelId 找到 socket 对象 */ Socket clientSocket = sessions.get(consumerTag); if (clientSocket == null || clientSocket.isClosed()) { throw new MqException("[BrokerServer] 订阅消息的客户端已经关闭!"); } // 2. 构造响应数据 SubScribeReturns subScribeReturns = new SubScribeReturns(); subScribeReturns.setChannelId(consumerTag); subScribeReturns.setRid(""); // 由于这里只有响应, 没有请求, 不需要去对应. rid 暂时不需要. subScribeReturns.setOk(true); subScribeReturns.setConsumerTag(consumerTag); subScribeReturns.setBasicProperties(basicProperties); subScribeReturns.setBody(body); byte[] payload = BinaryTool.toBytes(subScribeReturns); Response response = new Response(); // 0xc 表示服务器给消费者客户端推送的消息数据. response.setType(0xc); // response 的 payload 就是一个 SubScribeReturns response.setLength(payload.length); response.setPayload(payload); /* 3. 把数据写回给客户端. * 注意! 此处的 dataOutputStream 这个对象不能 close !!! * 如果 把 dataOutputStream 关闭, 就会直接把 clientSocket 里的 outputStream 也关了. * 此时就无法继续往 socket 中写入后续数据了. */ DataOutputStream dataOutputStream = new DataOutputStream(clientSocket.getOutputStream()); writeResponse(dataOutputStream, response); } }); } else if (request.getType() == 0xb) { // 调用 basicAck 确认消息. BasicAckArguments arguments = (BasicAckArguments) basicArguments; ok = virtualHost.basicAck(arguments.getQueueName(), arguments.getMessageId()); } else { // 当前的 type 是非法的. throw new MqException("[BrokerServer] 未知的 type! type=" + request.getType()); } // 3. 构造响应 BasicReturns basicReturns = new BasicReturns(); basicReturns.setChannelId(basicArguments.getChannelId()); basicReturns.setRid(basicArguments.getRid()); basicReturns.setOk(ok); byte[] payload = BinaryTool.toBytes(basicReturns); Response response = new Response(); response.setType(request.getType()); response.setLength(payload.length); response.setPayload(payload); System.out.println("[Response] rid=" + basicReturns.getRid() + ", channelId=" + basicReturns.getChannelId() + ", type=" + response.getType() + ", length=" + response.getLength()); return response; } private void clearClosedSession(Socket clientSocket) { // 这里要做的事情, 主要就是遍历上述 sessions hash 表, 把该被关闭的 socket 对应的键值对, 统统删掉. List<String> toDeleteChannelId = new ArrayList<>(); for (Map.Entry<String, Socket> entry : sessions.entrySet()) { if (entry.getValue() == clientSocket) { /* 不能在这里直接删除!!! * 这属于使用集合类的一个大忌!!! 一边遍历, 一边删除!!! * sessions.remove(entry.getKey()); */ toDeleteChannelId.add(entry.getKey()); } } for (String channelId : toDeleteChannelId) { sessions.remove(channelId); } System.out.println("[BrokerServer] 清理 session 完成! 被清理的 channelId=" + toDeleteChannelId); } }
-
虚拟主机(Virtual Host):
-
提供逻辑隔离,每个虚拟主机独立管理交换机、队列、绑定等资源。
-
约定交换机名字 = 虚拟主机的名字 + 交换机真实名字。按照此方式可以去区分不同的队列,进一步由于绑定是和交换机和队列都相关,此时绑定也就被隔离开了,再进一步消息和队列是强相关的,队列名区分绑定也就区分开了。
-
考虑线程安全问题:同时有多个线程进行创建和删除(交换机,队列,绑定)操作就会有线程安全问题,故应在修改前进行加锁。当前该代码粒度较大:针对A交换机进行操作,会影响到B交换机操作,但是对于Broker Server创建/删除交换机,创建/删除绑定,创建/删除队列,这些操作都属于低频操作,所以这个锁的粒度所耗费的成本依然会比较小。
public class VirtualHost { private String virtualHostName; private MemoryDataCenter memoryDataCenter = new MemoryDataCenter(); private DiskDataCenter diskDataCenter = new DiskDataCenter(); private Router router = new Router(); private ConsumerManager consumerManager = new ConsumerManager(this); // 操作交换机的锁对象 private final Object exchangeLocker = new Object(); // 操作队列的锁对象 private final Object queueLocker = new Object(); public VirtualHost(String name) { this.virtualHostName = name; // 对于 MemoryDataCenter 来说, 不需要额外的初始化操作的. 只要对象 new 出来就行了 // 但是, 针对 DiskDataCenter 来说, 则需要进行初始化操作. 建库建表和初始数据的设定. diskDataCenter.init(); // 另外还需要针对硬盘的数据, 进行恢复到内存中. try { memoryDataCenter.recovery(diskDataCenter); } catch (IOException | MqException | ClassNotFoundException e) { e.printStackTrace(); System.out.println("[VirtualHost] 恢复内存数据失败!"); } } // 创建交换机 // 如果交换机不存在, 就创建. 如果存在, 直接返回. // 返回值是 boolean. 创建成功, 返回 true. 失败返回 false public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) { // 把交换机的名字, 加上虚拟主机作为前缀. exchangeName = virtualHostName + exchangeName; try { synchronized (exchangeLocker) { // 1. 判定该交换机是否已经存在. 直接通过内存查询. Exchange existsExchange = memoryDataCenter.getExchange(exchangeName); if (existsExchange != null) { // 该交换机已经存在! System.out.println("[VirtualHost] 交换机已经存在! exchangeName=" + exchangeName); return true; } // 2. 真正创建交换机. 先构造 Exchange 对象 Exchange exchange = new Exchange(); exchange.setName(exchangeName); exchange.setType(exchangeType); exchange.setDurable(durable); exchange.setAutoDelete(autoDelete); exchange.setArguments(arguments); // 3. 把交换机对象写入硬盘 if (durable) { diskDataCenter.insertExchange(exchange); } // 4. 把交换机对象写入内存 memoryDataCenter.insertExchange(exchange); System.out.println("[VirtualHost] 交换机创建完成! exchangeName=" + exchangeName); // 上述逻辑, 先写硬盘, 后写内存. 目的就是因为硬盘更容易写失败. 如果硬盘写失败了, 内存就不写了. // 要是先写内存, 内存写成功了, 硬盘写失败了, 还需要把内存的数据给再删掉. 就比较麻烦了. } return true; } catch (Exception e) { System.out.println("[VirtualHost] 交换机创建失败! exchangeName=" + exchangeName); e.printStackTrace(); return false; } } // 删除交换机 public boolean exchangeDelete(String exchangeName) { exchangeName = virtualHostName + exchangeName; try { synchronized (exchangeLocker) { // 1. 先找到对应的交换机. Exchange toDelete = memoryDataCenter.getExchange(exchangeName); if (toDelete == null) { throw new MqException("[VirtualHost] 交换机不存在无法删除!"); } // 2. 删除硬盘上的数据 if (toDelete.isDurable()) { diskDataCenter.deleteExchange(exchangeName); } // 3. 删除内存中的交换机数据 memoryDataCenter.deleteExchange(exchangeName); System.out.println("[VirtualHost] 交换机删除成功! exchangeName=" + exchangeName); } return true; } catch (Exception e) { System.out.println("[VirtualHost] 交换机删除失败! exchangeName=" + exchangeName); e.printStackTrace(); return false; } } // 创建队列 public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) { // 把队列的名字, 给拼接上虚拟主机的名字. queueName = virtualHostName + queueName; try { synchronized (queueLocker) { // 1. 判定队列是否存在 MSGQueue existsQueue = memoryDataCenter.getQueue(queueName); if (existsQueue != null) { System.out.println("[VirtualHost] 队列已经存在! queueName=" + queueName); return true; } // 2. 创建队列对象 MSGQueue queue = new MSGQueue(); queue.setName(queueName); queue.setDurable(durable); queue.setExclusive(exclusive); queue.setAutoDelete(autoDelete); queue.setArguments(arguments); // 3. 写硬盘 if (durable) { diskDataCenter.insertQueue(queue); } // 4. 写内存 memoryDataCenter.insertQueue(queue); System.out.println("[VirtualHost] 队列创建成功! queueName=" + queueName); } return true; } catch (Exception e) { System.out.println("[VirtualHost] 队列创建失败! queueName=" + queueName); e.printStackTrace(); return false; } } // 删除队列 public boolean queueDelete(String queueName) { queueName = virtualHostName + queueName; try { synchronized (queueLocker) { // 1. 根据队列名字, 查询下当前的队列对象 MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 队列不存在! 无法删除! queueName=" + queueName); } // 2. 删除硬盘数据 if (queue.isDurable()) { diskDataCenter.deleteQueue(queueName); } // 3. 删除内存数据 memoryDataCenter.deleteQueue(queueName); System.out.println("[VirtualHost] 删除队列成功! queueName=" + queueName); } return true; } catch (Exception e) { System.out.println("[VirtualHost] 删除队列失败! queueName=" + queueName); e.printStackTrace(); return false; } } }
-
绑定的创建与删除:绑定在创建的时候比较简单,但删除的时候会涉及到交换机与队列两方面的问题:加入用户先删除交换机和队列,再去删除绑定就会发生空指针异常,解决办法有以下两种:
- 参考类似于
mysql
的外键,删除队列/交换机的时候,判定当前队列/交换机存在对应的绑定,如果存在就禁止删除队列/交换机,要求先删除绑定,再尝试删除队列/交换机。 - 删除绑定的时候,什么都不校验,直接删除。因为本质上对于绑定的删除操作就是一条删除数据库的
sql
语句不会有其他副作用。
第一种方法更严谨,第二种方法比较简单。因为查看交换机绑定简单,但查看队列绑定较难,所以这里选择方法二。
//创建绑定 public boolean queueBind(String queueName, String exchangeName, String bindingKey) { queueName = virtualHostName + queueName; exchangeName = virtualHostName + exchangeName; try { synchronized (exchangeLocker) { synchronized (queueLocker) { // 1. 判定当前的绑定是否已经存在了. Binding existsBinding = memoryDataCenter.getBinding(exchangeName, queueName); if (existsBinding != null) { throw new MqException("[VirtualHost] binding 已经存在! queueName=" + queueName + ", exchangeName=" + exchangeName); } // 2. 验证 bindingKey 是否合法. if (!router.checkBindingKey(bindingKey)) { throw new MqException("[VirtualHost] bindingKey 非法! bindingKey=" + bindingKey); } // 3. 创建 Binding 对象 Binding binding = new Binding(); binding.setExchangeName(exchangeName); binding.setQueueName(queueName); binding.setBindingKey(bindingKey); // 4. 获取一下对应的交换机和队列. 如果交换机或者队列不存在, 这样的绑定也是无法创建的. MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 队列不存在! queueName=" + queueName); } Exchange exchange = memoryDataCenter.getExchange(exchangeName); if (exchange == null) { throw new MqException("[VirtualHost] 交换机不存在! exchangeName=" + exchangeName); } // 5. 先写硬盘 if (queue.isDurable() && exchange.isDurable()) { diskDataCenter.insertBinding(binding); } // 6. 写入内存 memoryDataCenter.insertBinding(binding); } } System.out.println("[VirtualHost] 绑定创建成功! exchangeName=" + exchangeName + ", queueName=" + queueName); return true; } catch (Exception e) { System.out.println("[VirtualHost] 绑定创建失败! exchangeName=" + exchangeName + ", queueName=" + queueName); e.printStackTrace(); return false; } } //解除绑定 public boolean queueUnbind(String queueName, String exchangeName) { queueName = virtualHostName + queueName; exchangeName = virtualHostName + exchangeName; try { synchronized (exchangeLocker) { synchronized (queueLocker) { // 1. 获取 binding 看是否已经存在 Binding binding = memoryDataCenter.getBinding(exchangeName, queueName); if (binding == null) { throw new MqException("[VirtualHost] 删除绑定失败! 绑定不存在! exchangeName=" + exchangeName + ", queueName=" + queueName); } // 2. 无论绑定是否持久化了, 都尝试从硬盘删一下. 就算不存在, 这个删除也无副作用. diskDataCenter.deleteBinding(binding); // 3. 删除内存的数据 memoryDataCenter.deleteBinding(binding); System.out.println("[VirtualHost] 删除绑定成功!"); } } return true; } catch (Exception e) { System.out.println("[VirtualHost] 删除绑定失败!"); e.printStackTrace(); return false; } }
- 参考类似于
-
根据三种交换机类型决定如何转发消息:
-
DIRECT:直接转发,无需绑定
-
FANOUT:转发给所有与该交换机绑定的队列
-
TOPIC:判定
routingKey
与bindingKey
是否匹配,匹配则转发。这里就涉及到Key的构造规则以及匹配规则-
构造规则: 两种
Key
都由数字,字母,下划线组成,使用.
把Key分为多个部分,但bindingKey
中支持两种特殊符号*
,#
作为通配符且必须是作为被.
分割出来的独立部分。形如aaa.
*
.
111(合法),而aaa*
.
111(非法)。 -
匹配规则:
*
可以匹配任何一个独立的部分,#
可以匹配任何0个或者多个独立部分。以下是几个例子:用例 routingKey bindingKey 匹配情况 第一组 aaa.bbb.ccc (无 * 和 #)aaa.bbb.ccc 完全一样才算成功 第二组 aaa.dd.bbb,aaa.b.bbb,aaa.cc.bbb (有 * )aaa.*.bbb * 可以代替一段字符,其他一样算匹配成功 第三组 aaa.bbb.ccc,aaa.b.b.ccc,aaa.ccc (有 #)aaa.#.ccc # 可以代替多段或0段字符,其他一样算匹配成功 如果把bindingKey就仅设置为一个
#
号,就可以匹配到所有与之的交换机,相当于FANOUT。/** * 使用这个类, 来实现交换机的转发规则. * 同时也借助这个类验证 bindingKey 是否合法. */ public class Router { /** bindingKey 的构造规则: * 1. 数字, 字母, 下划线 * 2. 使用 . 分割成若干部分* * 3. 允许存在 * 和 # 作为通配符. 但是通配符只能作为独立的分段. */ public boolean checkBindingKey(String bindingKey) { if (bindingKey.length() == 0) { // 空字符串, 也是合法情况. 比如在使用 direct / fanout 交换机的时候, bindingKey 是用不上的. return true; } // 检查字符串中不能存在非法字符 for (int i = 0; i < bindingKey.length(); i++) { char ch = bindingKey.charAt(i); if (ch >= 'A' && ch <= 'Z') { continue; } if (ch >= 'a' && ch <= 'z') { continue; } if (ch >= '0' && ch <= '9') { continue; } if (ch == '_' || ch == '.' || ch == '*' || ch == '#') { continue; } return false; } // 检查 * 或者 # 是否是独立的部分. // aaa.*.bbb 合法情况; aaa.a*.bbb 非法情况. String[] words = bindingKey.split("\\."); for (String word : words) { // 检查 word 长度 > 1 并且包含了 * 或者 # , 就是非法的格式了. if (word.length() > 1 && (word.contains("*") || word.contains("#"))) { return false; } } /* 约定通配符之间的相邻关系. * 为啥这么约定? 因为前三种相邻的时候, 实现匹配的逻辑会非常繁琐, 同时功能性提升不大 * 1. aaa.#.#.bbb => 非法 * 2. aaa.#.*.bbb => 非法 * 3. aaa.*.#.bbb => 非法 * 4. aaa.*.*.bbb => 合法 */ for (int i = 0; i < words.length - 1; i++) { // 连续两个 ## if (words[i].equals("#") && words[i + 1].equals("#")) { return false; } // # 连着 * if (words[i].equals("#") && words[i + 1].equals("*")) { return false; } // * 连着 # if (words[i].equals("*") && words[i + 1].equals("#")) { return false; } } return true; } // routingKey 的构造规则: // 1. 数字, 字母, 下划线 // 2. 使用 . 分割成若干部分 public boolean checkRoutingKey(String routingKey) { if (routingKey.length() == 0) { // 空字符串. 合法的情况. 比如在使用 fanout 交换机的时候, routingKey 用不上, 就可以设为 "" return true; } for (int i = 0; i < routingKey.length(); i++) { char ch = routingKey.charAt(i); // 判定该字符是否是大写字母 if (ch >= 'A' && ch <= 'Z') { continue; } // 判定该字母是否是小写字母 if (ch >= 'a' && ch <= 'z') { continue; } // 判定该字母是否是阿拉伯数字 if (ch >= '0' && ch <= '9') { continue; } // 判定是否是 _ 或者 . if (ch == '_' || ch == '.') { continue; } // 该字符, 不是上述任何一种合法情况, 就直接返回 false return false; } // 把每个字符都检查过, 没有遇到非法情况. 此时直接返回 true return true; } // 这个方法用来判定该消息是否可以转发给这个绑定对应的队列. public boolean route(ExchangeType exchangeType, Binding binding, Message message) throws MqException { // 根据不同的 exchangeType 使用不同的判定转发规则. if (exchangeType == ExchangeType.FANOUT) { // 如果是 FANOUT 类型, 则该交换机上绑定的所有队列都需要转发 return true; } else if (exchangeType == ExchangeType.TOPIC) { // 如果是 TOPIC 主题交换机, 规则就要更复杂一些. return routeTopic(binding, message); } else { // 其他情况是不应该存在的. throw new MqException("[Router] 交换机类型非法! exchangeType=" + exchangeType); } } // [测试用例] // binding key routing key result // aaa aaa true // aaa.bbb aaa.bbb true // aaa.bbb aaa.bbb.ccc false // aaa.bbb aaa.ccc false // aaa.bbb.ccc aaa.bbb.ccc true // aaa.* aaa.bbb true // aaa.*.bbb aaa.bbb.ccc false // *.aaa.bbb aaa.bbb false // # aaa.bbb.ccc true // aaa.# aaa.bbb true // aaa.# aaa.bbb.ccc true // aaa.#.ccc aaa.ccc true // aaa.#.ccc aaa.bbb.ccc true // aaa.#.ccc aaa.aaa.bbb.ccc true // #.ccc ccc true // #.ccc aaa.bbb.ccc true private boolean routeTopicOld(Binding binding, Message message) { // 先把这两个 key 进行切分 String[] bindingTokens = binding.getBindingKey().split("\\."); String[] routingTokens = message.getRoutingKey().split("\\."); // 引入两个下标, 指向上述两个数组. 初始情况下都为 0 int bindingIndex = 0; int routingIndex = 0; // 此处使用 while 更合适, 每次循环, 下标不一定就是 + 1, 不适合使用 for while (bindingIndex < bindingTokens.length && routingIndex < routingTokens.length) { if (bindingTokens[bindingIndex].equals("*")) { // [情况二] 如果遇到 * , 直接进入下一轮. * 可以匹配到任意一个部分!! bindingIndex++; routingIndex++; continue; } else if (bindingTokens[bindingIndex].equals("#")) { // 如果遇到 #, 需要先看看有没有下一个位置. bindingIndex++; if (bindingIndex == bindingTokens.length) { // [情况三] 该 # 后面没东西了, 说明此时一定能匹配成功了! return true; } // [情况四] # 后面还有东西, 拿着这个内容, 去 routingKey 中往后找, 找到对应的位置. // findNextMatch 这个方法用来查找该部分在 routingKey 的位置. 返回该下标. 没找到, 就返回 -1 routingIndex = findNextMatch(routingTokens, routingIndex, bindingTokens[bindingIndex]); if (routingIndex == -1) { // 没找到匹配的结果. 匹配失败 return false; } // 找到的匹配的情况, 继续往后匹配. bindingIndex++; routingIndex++; } else { // [情况一] 如果遇到普通字符串, 要求两边的内容是一样的. if (!bindingTokens[bindingIndex].equals(routingTokens[routingIndex])) { return false; } bindingIndex++; routingIndex++; } } // [情况五] 判定是否是双方同时到达末尾 // 比如 aaa.bbb.ccc 和 aaa.bbb 是要匹配失败的. if (bindingIndex == bindingTokens.length && routingIndex == routingTokens.length) { return true; } return false; } private int findNextMatch(String[] routingTokens, int routingIndex, String bindingToken) { for (int i = routingIndex; i < routingTokens.length; i++) { if (routingTokens[i].equals(bindingToken)) { return i; } } return -1; } // 通过 DP 的方式重新实现这个方法. private boolean routeTopic(Binding binding, Message message) { // 按照 . 来切分 binding key 和 routing key // 无通配符 String[] routingTokens = message.getRoutingKey().split("\\."); // 有通配符 String[] bindingTokens = binding.getBindingKey().split("\\."); int m = routingTokens.length; int n = bindingTokens.length; // 1. 初始化 dp 表. 由于要考虑空串, dp 表的长和宽都要 + 1 // dp[i][j] 表示的含义是 bindingTokens 中的 [0, j] 能否和 routingTokens 中的 [0, i] 匹配. boolean[][] dp = new boolean[m + 1][n + 1]; // 空的 bindingKey 和 空的 routingKey 可以匹配 dp[0][0] = true; // 如果 routingKey 为空, bindingKey 只有连续为 # 的时候, 才能匹配. for (int j = 1; j <= n; j++) { if (bindingTokens[j - 1].equals("#")) { dp[0][j] = true; } else { break; } } // 2. 遍历所有情况 for (int i = 1; i <= m; i++) { for (int j = 1; j <= n; j++) { if (bindingTokens[j - 1].equals("#")) { // 这块的状态转移方程推导过程很复杂. 参考算法视频讲解 dp[i][j] = dp[i - 1][j] || dp[i][j - 1]; } else if (bindingTokens[j - 1].equals("*")) { // 如果 bindingTokens j 位置为 *, 那么 bindingTokens j - 1 位置和 routingKey i - 1 位置匹配即可. dp[i][j] = dp[i - 1][j - 1]; } else { // 如果 bindingTokens j 位置为普通字符串, 那么要求 bindingTokens j - 1 位置 和 routingKey i - 1 // 位置匹配 // 并且 bindingTokens j 位置和 routingKey i 位置相同, 才认为是匹配 if (bindingTokens[j - 1].equals(routingTokens[i - 1])) { dp[i][j] = dp[i - 1][j - 1]; } else { dp[i][j] = false; } } } } // 3. 处理返回值, 直接返回 dp 表的最后一个位置 return dp[m][n]; } // 需要考虑通配符, 复杂一些 public boolean checkBindingKeyValid(String bindingKey) { // 1. 允许是空字符串 // 2. 数字字母下划线构成 // 3. 可以包含通配符 // 4. # 不能连续出现. // 5. # 和 * 不能相邻 if (bindingKey.length() == 0) { return true; } // 先判定基础构成 for (int i = 0; i < bindingKey.length(); i++) { char ch = bindingKey.charAt(i); if (ch >= 'A' && ch <= 'Z') { continue; } if (ch >= 'a' && ch <= 'z') { continue; } if (ch >= '0' && ch <= '9') { continue; } if (ch == '.' || ch == '_' || ch == '*' || ch == '#') { continue; } return false; } // 再判定每个词的情况 // 比如 aaa.a*a 这种应该视为非法. String[] words = bindingKey.split("\\."); for (String word : words) { if (word.length() > 1 && (word.contains("*") || word.contains("#"))) { return false; } } // 再判定相邻词的情况 for (int i = 0; i < words.length - 1; i++) { // 连续两个 ## if (words[i].equals("#") && words[i + 1].equals("#")) { return false; } // # 连着 * if (words[i].equals("#") && words[i + 1].equals("*")) { return false; } // * 连着 # if (words[i].equals("*") && words[i + 1].equals("#")) { return false; } } return true; } // 不包含通配符, 规则更简单. public boolean checkRoutingKeyValid(String routingKey) { if (routingKey.length() == 0) { return true; } // 数字字母下划线构成 for (int i = 0; i < routingKey.length(); i++) { char ch = routingKey.charAt(i); if (ch >= 'A' && ch <= 'Z') { continue; } if (ch >= 'a' && ch <= 'z') { continue; } if (ch >= '0' && ch <= '9') { continue; } if (ch == '_' || ch == '.') { continue; } return false; } return true; } }
-
-
-
-
-
职责:消息的存储、路由和消费逻辑。
-
子模块:
-
交换机管理:
-
交换机类实现,处理消息路由规则。
/* * 交换机的构建 */ public class Exchange { //使用name来作为交换机的身份标识 private String name; //交换机的三种类型: DIRECT, FANOUT, TOPIC private ExchangeType type = ExchangeType.DIRECT; //交换机是否要持久化存储: true表示需要持久化; false表示非持久化 private boolean durable = false; //如果当前交换机没人使用则自动删除 - 扩展模块 - 未实现 private boolean autoDelete = false; //arguments表示的是创建交换机时指定的一些额外的参数选项, 通过该选项来开启更多功能 - 扩展功能 - 未实现 private Map<String, Objects> arguments = new HashMap<>(); }
-
交换机类型类实现
package com.example.mq.mqserver.core; public enum ExchangeType { DIRECT (0), FANOUT (1), TOPIC (2); private final int type; private ExchangeType(int type) { this.type = type; } }
-
-
队列管理:
-
存储消息的缓冲区队列实现
//Message /* * 用来存储消息的队列 */ public class MSGQueue { //表示队列时身份标识 private String name; //表示队列是否持久化 private boolean durable; //这个属性为ture表示该队列被独占, 为false表示该队列本共享 private boolean exclusive; //如果当前交换机没人使用则自动删除 - 扩展模块 - 未实现 private boolean autoDelete; //arguments表示的是创建交换机时指定的一些额外的参数选项, 通过该选项来开启更多功能 - 扩展功能 - 未实现 private Map<String, Objects> arguments = new HashMap<>(); }
-
-
绑定管理:
-
交换机与队列的绑定关系类实现
/* * 表示交换机与队列之间的绑定关系 */ public class Binding { private String exchangeName; private String queueName; private String bindingKey; //Binding是交换机和队列之间的绑定, 因此无需持久化 }
-
-
消息管理:
包含两个部分,一是属性部分BasicProperties,二是正文部分byte[]
-
属性类实现
/* * 消息的属性部分 */ public class BasicProperties { //消息的唯一身份标识, 为了保证ID唯一使用UUID算法来生成messageID private String messageId; //消息上所带有标识, 和bindingKey进行匹配 /* * 如果当前交换机类型为DIRECT, 则routingKey标识要转发的队列名 * 如果当前交换机类型为FANOUT, 则该字段无意义(不使用) * 如果当前交换机类型为TOPIC, 则要和bindingKey做匹配, 符合要求的才能转发给对应队列 */ private String routingKey; //这个属性表示消息是否要持久化, 1表示非持久化, 2表示持久化 private int deliverMode = 1; //对于RabbitMQ来书评, BasicProperties中还有很多别的属性 - 扩展内容 - 未实现 }
-
正文部分实现
package com.example.mq.mqserver.core; import java.io.Serializable; import java.util.UUID; /* * 表示要传递的消息 * 此处Message对象, 是需要能够在网络上传输, 并且需要写入到文件中 * 因此需要标准库自带的序列化/反序列化 * 且由于Message中的内容是二进制数据所以不用json格式(json格式本质上是文本格式) */ public class Message implements Serializable { private BasicProperties basicProperties = new BasicProperties(); private byte[] body; //下文的属性为辅助查找属性 - 字节 /* * 由于Message后续会存储到文件中(持久化) * 一个文件中会存储很多的消息, 如何找到某个消息, 在文件中的具体位置呢? * 用以下两个偏移量来表示 - [offsetBeg, offsetEnd) * 且这里的辅助属性不需要序列化保存到文件中, 因为消息一旦写入文件, 消息的位置就固定了 * 此处的辅助属性需要在写入文件之前再进行设定, 因此此处只是设定两个核心属性创建一个Message对象 * 作用就是让内存中的message对象快速找到硬盘中的message位置 */ //消息数据的开头距离文件开头的位置偏移 private transient long offsetBeg = 0; //消息数据的结尾距离文件开头的位置偏移 private transient long offsetEnd = 0; //这个属性来表示消息在文件中是否是有效消息 - 逻辑删除 //0x1 表示有效, 0x0表示无效 private byte isValid = 0x1; //创建一个工厂方法来实现创建Message对象的过程 /* * 该方法中创建的Message对象, 会自动生成唯一的MessageID * 若routingKey与basicProperties中的routingKey冲突, 以外面的为主 */ public static Message createMessageWithID(String routingKey, BasicProperties basicProperties, byte[] body) { Message message = new Message(); if (basicProperties != null) { message.basicProperties = basicProperties; } //此处生成的MessageId 以M-作为前缀 message.setMessageId("M-" + UUID.randomUUID()); message.basicProperties.setRoutingKey(routingKey); message.body = body; return message; } }
-
-
内存数据结构管理:
-
交换机:直接使用
HashMap
,key是name
,value是Exchange
对象。 -
队列:直接使用
HashMap
,key是name
,value是MSGQueue
对象。 -
绑定:使用嵌套的
HashMap
,key是exchangename
,value是一个HashMap
: key是queueName
,value是Binding
对象。 -
消息:直接使用
HashMap
,key是messageId
,value是Message
对象。 -
队列和消息的关联:使用嵌套的
HashMap
,key是queueName
,value是一个LinkedList
: 元素为Message
对象。 -
未确认消息(存储当前队列中哪些消息呗消费者取走了,还未应答):使用嵌套的
HashMap
,key是queueName
,value是一个HashMap
: key是messageId
,value为Message
对象。package com.example.mq.mqserver.datacenter; import com.example.mq.common.MqException; import com.example.mq.mqserver.core.Binding; import com.example.mq.mqserver.core.Exchange; import com.example.mq.mqserver.core.MSGQueue; import com.example.mq.mqserver.core.Message; import java.io.IOException; import java.util.HashMap; import java.util.LinkedList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; /** * 使用这个类来统一管理内存中的所有数据. * 该类后续提供的一些方法, 可能会在多线程环境下被使用. 因此要注意线程安全问题. */ public class MemoryDataCenter { // key 是 exchangeName, value 是 Exchange 对象 private ConcurrentHashMap<String, Exchange> exchangeMap = new ConcurrentHashMap<>(); // key 是 queueName, value 是 MSGQueue 对象 private ConcurrentHashMap<String, MSGQueue> queueMap = new ConcurrentHashMap<>(); // 第一个 key 是 exchangeName, 第二个 key 是 queueName private ConcurrentHashMap<String, ConcurrentHashMap<String, Binding>> bindingsMap = new ConcurrentHashMap<>(); // key 是 messageId, value 是 Message 对象 private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>(); // key 是 queueName, value 是一个 Message 的链表 private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>(); // 第一个 key 是 queueName, 第二个 key 是 messageId private ConcurrentHashMap<String, ConcurrentHashMap<String, Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>(); public void insertExchange(Exchange exchange) { exchangeMap.put(exchange.getName(), exchange); System.out.println("[MemoryDataCenter] 新交换机添加成功! exchangeName=" + exchange.getName()); } public Exchange getExchange(String exchangeName) { return exchangeMap.get(exchangeName); } public void deleteExchange(String exchangeName) { exchangeMap.remove(exchangeName); System.out.println("[MemoryDataCenter] 交换机删除成功! exchangeName=" + exchangeName); } public void insertQueue(MSGQueue queue) { queueMap.put(queue.getName(), queue); System.out.println("[MemoryDataCenter] 新队列添加成功! queueName=" + queue.getName()); } public MSGQueue getQueue(String queueName) { return queueMap.get(queueName); } public void deleteQueue(String queueName) { queueMap.remove(queueName); System.out.println("[MemoryDataCenter] 队列删除成功! queueName=" + queueName); } public void insertBinding(Binding binding) throws MqException { // ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); // if (bindingMap == null) { // bindingMap = new ConcurrentHashMap<>(); // bindingsMap.put(binding.getExchangeName(), bindingMap); // } // 先使用 exchangeName 查一下, 对应的哈希表是否存在. 不存在就创建一个. ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); synchronized (bindingMap) { // 再根据 queueName 查一下. 如果已经存在, 就抛出异常. 不存在才能插入. if (bindingMap.get(binding.getQueueName()) != null) { throw new MqException("[MemoryDataCenter] 绑定已经存在! exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.put(binding.getQueueName(), binding); } System.out.println("[MemoryDataCenter] 新绑定添加成功! exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } /** 获取绑定, 写两个版本: * 1. 根据 exchangeName 和 queueName 确定唯一一个 Binding * 2. 根据 exchangeName 获取到所有的 Binding */ public Binding getBinding(String exchangeName, String queueName) { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(exchangeName); if (bindingMap == null) { return null; } return bindingMap.get(queueName); } public ConcurrentHashMap<String, Binding> getBindings(String exchangeName) { return bindingsMap.get(exchangeName); } public void deleteBinding(Binding binding) throws MqException { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.get(binding.getExchangeName()); if (bindingMap == null) { // 该交换机没有绑定任何队列. 报错. throw new MqException("[MemoryDataCenter] 绑定不存在! exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } bindingMap.remove(binding.getQueueName()); System.out.println("[MemoryDataCenter] 绑定删除成功! exchangeName=" + binding.getExchangeName() + ", queueName=" + binding.getQueueName()); } // 添加消息 public void addMessage(Message message) { messageMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 新消息添加成功! messageId=" + message.getMessageId()); } // 根据 id 查询消息 public Message getMessage(String messageId) { return messageMap.get(messageId); } // 根据 id 删除消息 public void removeMessage(String messageId) { messageMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息被移除! messageId=" + messageId); } // 发送消息到指定队列 public void sendMessage(MSGQueue queue, Message message) { // 把消息放到对应的队列数据结构中. // 先根据队列的名字, 找到该队列对应的消息链表. LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(), k -> new LinkedList<>()); // 再把数据加到 messages 里面 synchronized (messages) { messages.add(message); } // 在这里把该消息也往消息中心中插入一下. 假设如果 message 已经在消息中心存在, 重复插入也没关系. // 主要就是相同 messageId, 对应的 message 的内容一定是一样的. (服务器代码不会对 Message 内容做修改 basicProperties 和 body) addMessage(message); System.out.println("[MemoryDataCenter] 消息被投递到队列中! messageId=" + message.getMessageId()); } // 从队列中取消息 public Message pollMessage(String queueName) { // 根据队列名, 查找一下, 对应的队列的消息链表. LinkedList<Message> messages = queueMessageMap.get(queueName); if (messages == null) { return null; } synchronized (messages) { // 如果没找到, 说明队列中没有任何消息. if (messages.size() == 0) { return null; } // 链表中有元素, 就进行头删. Message currentMessage = messages.remove(0); System.out.println("[MemoryDataCenter] 消息从队列中取出! messageId=" + currentMessage.getMessageId()); return currentMessage; } } // 获取指定队列中消息的个数 public int getMessageCount(String queueName) { LinkedList<Message> messages = queueMessageMap.get(queueName); if (messages == null) { // 队列中没有消息 return 0; } synchronized (messages) { return messages.size(); } } // 添加未确认的消息 public void addMessageWaitAck(String queueName, Message message) { ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName, k -> new ConcurrentHashMap<>()); messageHashMap.put(message.getMessageId(), message); System.out.println("[MemoryDataCenter] 消息进入待确认队列! messageId=" + message.getMessageId()); } // 删除未确认的消息(消息已经确认了) public void removeMessageWaitAck(String queueName, String messageId) { ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName); if (messageHashMap == null) { return; } messageHashMap.remove(messageId); System.out.println("[MemoryDataCenter] 消息从待确认队列删除! messageId=" + messageId); } // 获取指定的未确认的消息 public Message getMessageWaitAck(String queueName, String messageId) { ConcurrentHashMap<String, Message> messageHashMap = queueMessageWaitAckMap.get(queueName); if (messageHashMap == null) { return null; } return messageHashMap.get(messageId); } // 这个方法就是从硬盘上读取数据, 把硬盘中之前持久化存储的各个维度的数据都恢复到内存中. public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException { // 0. 清空之前的所有数据 exchangeMap.clear(); queueMap.clear(); bindingsMap.clear(); messageMap.clear(); queueMessageMap.clear(); // 1. 恢复所有的交换机数据 List<Exchange> exchanges = diskDataCenter.selectAllExchanges(); for (Exchange exchange : exchanges) { exchangeMap.put(exchange.getName(), exchange); } // 2. 恢复所有的队列数据 List<MSGQueue> queues = diskDataCenter.selectAllQueues(); for (MSGQueue queue : queues) { queueMap.put(queue.getName(), queue); } // 3. 恢复所有的绑定数据 List<Binding> bindings = diskDataCenter.selectAllBindings(); for (Binding binding : bindings) { ConcurrentHashMap<String, Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(), k -> new ConcurrentHashMap<>()); bindingMap.put(binding.getQueueName(), binding); } // 4. 恢复所有的消息数据 // 遍历所有的队列, 根据每个队列的名字, 获取到所有的消息. for (MSGQueue queue : queues) { LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName()); queueMessageMap.put(queue.getName(), messages); for (Message message : messages) { messageMap.put(message.getMessageId(), message); } } // 注意!! 针对 "未确认的消息" 这部分内存中的数据, 不需要从硬盘恢复. 之前考虑硬盘存储的时候, 也没设定这一块. // 一旦在等待 ack 的过程中, 服务器重启了, 此时这些 "未被确认的消息", 就恢复成 "未被取走的消息" . // 这个消息在硬盘上存储的时候, 就是当做 "未被取走" } }
-
-
数据库管理
-
职责:存储元数据(交换机、队列、绑定的定义)。
-
子模块:
-
交换机存储:
-
记录交换机的类型、参数及绑定关系。
<update id="createExchangeTable"> create table if not exists exchange( name varchar(50) primary key, type int, durable boolean, autoDelete boolean, arguments varchar(1024) ); </update> <insert id="insertExchange" parameterType="com.example.mq.mqserver.core.Exchange"> insert into exchange values(#{name}, #{type}, #{durable}, #{autoDelete}, #{arguments}); </insert> <select id="selectAllExchanges" resultType="com.example.mq.mqserver.core.Exchange"> select * from exchange; </select> <delete id="deleteExchange" parameterType="java.lang.String"> delete from exchange where name = #{exchangeName}; </delete>
这里的队列存储有一个问题,由于SQLite不支持存储键值对信息,故我们定义的类中的Map类型就不能存储到数据库中,那么就只能将它存储成String类型,那么怎么把Map键值对信息和数据库中的字符串类型相互转化呢?
关键要点在于MyBatis在完成数据库操作的时候,会自动调用到对象的getter和setter方法:
-
MyBatis往数据库中写数据,就会调用对象的getter方法,拿到属性的值,再往数据库中写。在这个过程中让get方法得到的结果是String类型的就可以直接把这个数据写到数据库中了。
-
MyBatis从数据库中读数据的时候,就会调用对象的setter方法,把数据库中读到的结果设置到对象的属性中。在这个过程中,让set方法参数是一个String,并且在set方法内部针对字符串进行解析,解析成一个Map对象。
故需要将MSGQueue以及Exchange类中的arguments的get和set方法改成以下方法,才可以正常使用数据库:
public String getArguments() { ObjectMapper mapper = new ObjectMapper(); try { return mapper.writeValueAsString(arguments); } catch (JsonProcessingException e) { e.printStackTrace(); } //如果代码异常,则返回一个空json字符串 return "{}"; } //从数据库中读取数据后, 构造Exchange对象, 会自动调用到 public void setArguments(String argumentsJson) throws JsonProcessingException { //把参数中的argumentsJson按照Json格式解析, 转成上述的Map对象 ObjectMapper mapper = new ObjectMapper(); mapper.readValue(argumentsJson, new TypeReference<HashMap<String, Object>>() {}); }
这里的TypeReference方法就可以做到这样的效果:它的第二个参数就是用来描述当前json字符串的,要转成的java对象是啥类型的。如果是个简单类型,之久使用对应类型的对象即可;但如果是集合类这样的复杂类型,就可以使用该方法的匿名内部类对象来描述复杂类型的具体信息(通过泛型参数来描述)
-
-
-
队列存储:
-
记录队列的配置(如
durable
,auto_delete
)及当前状态。<update id="createQueueTable"> create table if not exists queue( name varchar(50) primary key, durable boolean, exclusive boolean, autoDelete boolean, arguments varchar(1024) ); </update> <insert id="insertQueue" parameterType="com.example.mq.mqserver.core.MSGQueue"> insert into queue values(#{name}, #{durable}, #{exclusive}, #{autoDelete}, #{arguments}); </insert> <select id="selectAllQueues" resultType="com.example.mq.mqserver.core.MSGQueue"> select * from queue; </select> <delete id="deleteQueue" parameterType="java.lang.String"> delete from queue where name = #{queueName}; </delete>
-
-
绑定存储:
-
存储交换机与队列的绑定规则(
routing_key
,headers
)。<update id="createBindingTable"> create table if not exists binding( exchangeName varchar(50), queueName varchar(50), bindingKey varchar(256) ); </update> <insert id="insertBinding" parameterType="com.example.mq.mqserver.core.Binding"> insert into binding values(#{exchangeName}, #{queueName}, #{bindingKey}); </insert> <select id="selectAllBindings" resultType="com.example.mq.mqserver.core.Binding"> select * from binding; </select> <delete id="deleteBinding" parameterType="com.example.mq.mqserver.core.Binding"> delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName}; </delete>
-
MyBatis中需要一个MetaMapper类来通过xml来完成对表的各种操作,这里的实现如下:
@Mapper public interface MetaMapper { //提供三个核心建表方法 void createExchangeTable(); void createQueueTable(); void createBindingTable(); //针对上述三个表进行插入和删除 void insertExchange(Exchange exchange); List<Exchange> selectAllExchanges(); void deleteExchange(String exchangeName); void insertQueue(MSGQueue queue); List<MSGQueue> selectAllQueues(); void deleteQueue(String queueName); //无主键, 需要借助对象来删除 void insertBinding(Binding binding); List<Binding> selectAllBindings(); void deleteBinding(Binding binding); }
为了方便,我们使用一个类来整合所有的数据库操作:
/* 通过这个类来整合所有的数据库操作 * 数据库的初始化 = 建库建表 + 插入一些初始数据 * 期望,在broker server 启动的时候, 做出下列逻辑判定: * 1.数据库已经存在, 库表数据都有,则不做任何操作 * 2.数据库不存在,则创建库创建表,构造数据 * 如何判断存在与否呢? * 就判定meta.db这个表存在与否即可 */ public class DataBaseManager { //由于metaMappper未提供构造方法, 因此不能直接使用,需要从Spring之中拿到现成的对象 private MetaMapper metaMapper; //业务逻辑, 按照此方法来初始化 public void init(){ //手动获取到MetaMapper对象 metaMapper = MqApplication.context.getBean(MetaMapper.class); if (!checkDBExists()){ //数据库不存在, 就进行建库建表操作 crateTable(); createDefaultDate(); System.out.println("[DataBaseManager] 数据库初始化完成!"); }else{ //数据库已经存在 System.out.println("[DataBaseManager] 数据库已存在!"); } } //建库操作并不需要手动执行, 首次执行数据库操作的时候,就会自动创建出meta.db文件来(Mybatis来完成) private boolean checkDBExists() { File file = new File("./data/meta.db"); if (file.exists()) { return true; } return false; } //建表 private void crateTable() { metaMapper.createExchangeTable(); metaMapper.createQueueTable(); metaMapper.createBindingTable(); System.out.println("[DataBaseManager] 创建表完成!"); } //在数据库中添加一个默认的匿名交换机, 类型是DIRECT private void createDefaultDate() { Exchange exchange = new Exchange(); exchange.setName(""); exchange.setType(ExchangeType.DIRECT); exchange.setDurable(Boolean.TRUE); exchange.setAutoDelete(Boolean.FALSE); metaMapper.insertExchange(exchange); System.out.println("[DataBaseManager] 创建初始数据完成!"); } //其他数据库操作 public void insertExchange(Exchange exchange) { metaMapper.insertExchange(exchange); } public List<Exchange> selectAllExchange() { return metaMapper.selectAllExchanges(); } public void deleteExchange(String exchangeName) { metaMapper.deleteExchange(exchangeName); } public void insertQueue(MSGQueue queue) { metaMapper.insertQueue(queue); } public List<MSGQueue> selectAllQueue() { return metaMapper.selectAllQueues(); } public void deleteQueue(String queueName) { metaMapper.deleteQueue(queueName); } public void insertBinding(Binding binding) { metaMapper.insertBinding(binding); } public List<Binding> selectAllBinding() { return metaMapper.selectAllBindings(); } public void deleteBinding(Binding binding) { metaMapper.deleteBinding(binding); } }
-
文件管理
-
职责:消息内容和统计数据的持久化存储。
-
子模块:
-
消息内容和统计信息存储:
消息需要在硬盘上存储. 但是并不直接放到数据库中, ⽽是直接使⽤⽂件存储. 原因: 对于消息的操作并不需要复杂的 增删改查 . 对于⽂件的操作效率⽐数据库会⾼很多. 主流 MQ 的实现(包括 RabbitMQ), 都是把消息存储在⽂件中, ⽽不是数据库中.
我们给每个队列分配⼀个⽬录. ⽬录的名字为 data + 队列名. 形如 ./data/testQueue。该⽬录中包含两个固定名字的⽂件.
• queue_data.txt 消息数据⽂件, ⽤来保存消息内容.
• queue_stat.txt 消息统计⽂件, ⽤来保存消息统计信息.
queue_data.txt ⽂件格式: 使⽤⼆进制⽅式存储. 每个消息分成两个部分:
• 前四个字节, 表⽰ Message 对象的⻓度(字节数)
• 后⾯若⼲字节, 表⽰ Message 内容.
• 消息和消息之间⾸尾相连.
每个 Message 基于 Java 标准库的 ObjectInputStream / ObjectOutputStream 序列化. Message 对象中的 offsetBeg 和 offsetEnd 正是⽤来描述每个消息体所在的位置.
同时,对于新增和删除:新增直接把新的消息追加到文件末尾;但删除消息就需要涉及到类似于“顺序表搬运”这样的操作,这又是在操作磁盘文件,效率就非常低了,因此使用这种方式是不合适的,使用逻辑删除的方式是比较合适的(isValid为1,有效消息;isValid为0,无效消息表示已经删除)。
-
首先实现一个管理磁盘消息的类
/** * 通过这个类, 对磁盘上的消息进行管理 */ public class MessageFileManager { //定义一个内部类, 来表示队列的统计信息, 考虑使用static, 静态内部类 static public class Stat{ //对于这样的简单类, 就直接使用public即可, 相当于结构体 public int totalCount;//总消息数量 public int validCount;//有效消息数量 } //约定消息所有目录和文件名, 获取指定队列对应的消息文件所在路径 private String getQueueDir(String queueName) { return "./data/"+ queueName; } //获取该队列的消息数据文件路径 private String getQueueDataPath(String queueName) { return getQueueDir(queueName) + "/queue_data.txt"; } //获取该队列的消息统计文件路径 private String getQueueStatPath(String queueName) { return getQueueDir(queueName) + "/queue_stat.txt"; } private Stat readStat(String queueName) { //由于当前的消息统计文件, 可以直接使用Scanner来读取文件内容· Stat stat = new Stat(); try(InputStream in = Files.newInputStream(Paths.get(getQueueDataPath(queueName)))) { Scanner scanner = new Scanner(in); stat.totalCount = scanner.nextInt(); stat.validCount = scanner.nextInt(); return stat; }catch (IOException e){ e.printStackTrace(); } return null; } private void writeStat(String queueName, Stat stat) { //使用PrintWrite来写文件 //OutputStream写文件, 默认情况下会把原文件清空, 相当于新的数据覆盖了旧的 try (OutputStream out = Files.newOutputStream(Paths.get(getQueueDataPath(queueName)))){ PrintWriter pw = new PrintWriter(out); pw.write(stat.totalCount + "\t" + stat.validCount); pw.flush(); }catch (IOException e){ e.printStackTrace(); } } //创建队列对应的文件和目录 public void createQueueFiles(String queueName) throws IOException { //1.先创建队列对应的消息目录 File baseDir = new File(getQueueDir(queueName)); if (!baseDir.exists()){ //不存在就创建 boolean ok = baseDir.mkdir(); if (!ok){ throw new IOException("创建目录失败! baseDir=" + baseDir.getAbsolutePath()); } } //2.创建队列数据文件 File queueDataFile = new File(getQueueDataPath(queueName)); if (!queueDataFile.exists()){ boolean ok = queueDataFile.createNewFile(); if (!ok){ throw new IOException("创建文件失败! baseDir=" + queueDataFile.getAbsolutePath()); } } //3,创建消息统计文件 File queueStatFile = new File(getQueueStatPath(queueName)); if (!queueStatFile.exists()){ boolean ok = queueStatFile.createNewFile(); if (!ok){ throw new IOException("创建文件失败! baseDir=" + queueStatFile.getAbsolutePath()); } } //4.初始化消息统计文件 0\t0 Stat stat = new Stat(); stat.totalCount = 0; stat.validCount = 0; writeStat(queueName, stat); } //删除队列的目录和文件, 队列删除之后对应的消息文件也要随之删除 public void deleteQueueFiles(String queueName) throws IOException { //先删除其中文件在删除目录 File queueDataFile = new File(getQueueDataPath(queueName)); boolean ok1 = queueDataFile.delete(); File queueStatFile = new File(getQueueStatPath(queueName)); boolean ok2 = queueStatFile.delete(); File baseDir = new File(getQueueDir(queueName)); boolean ok3 = baseDir.exists(); if (!ok1 || !ok2 || !ok3){ //任意一个删除失败都算整体失败 throw new IOException("删除队列目录和文件失败! baseDir=" + baseDir.getAbsolutePath()); } } //检查队列的目录和文件是否存在 //后续有生产者给broker server生产消息, 这个消息就可能需要记录到文件上(取决于消息是否要持久化) public boolean checkQueueExists(String queueName) throws IOException { File queueDataFile = new File(getQueueDataPath(queueName)); if(!queueDataFile.exists()){ return false; } File queueStatFile = new File(getQueueStatPath(queueName)); if(!queueStatFile.exists()){ return false; } return true; } }
-
实现消息序列化/反序列化类
/** * Java中对象,都可以通过以下逻辑进行序列化/反序列化 * 前提是要让这个类能够实现Serializable这样的接口 */ public class BinaryTool { //把一个对象序列化成一个字节数组 public static byte[] toBytes(Object object) throws IOException { //这个流相当于一个变长字节数组 //既可以把object序列化的数据给之间的写入到byteArrayOutputStream中, 再统一成byte[] ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){ //此处的writeObject就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到objectOutputStream //由于它有关联到了ByteArrayOutputStream, 最终就写入到ByteArrayOutputStream objectOutputStream.writeObject(object); } //这个操作把byteArrayOutputStream中数据取出来转成byte[] return byteArrayOutputStream.toByteArray(); } //把一个字节数组, 反序列化成一个对象 public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException { Object object = null; try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){ try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){ //此处的readObject就是从data这个byte[]中读取数据进行反序列化 object = objectInputStream.readObject(); } } return object; } }
-
实现消息写入文件方法与逻辑删除方法
/** * 将一个新的消息放到队列对应的文件中 * 线程安全问题:以队列对象进行加锁即可 * 如果是两个线程往同一个队列里写消息, 此时需要阻塞等待 * 如果两个线程, 往不同队列写消息不需要阻塞等待(不同队列对应不同文件) */ public void sendMessage(MSGQueue queue, Message message) throws IOException, MqException { //1.检查当前写入的队列和文件是否存在 if (!checkQueueExists(queue.getName())){ throw new MqException("[MessageFileManager] 队列对应的文件不存在! queueName=" + queue.getName()); } //2.把Message对象进行序列化, 转成二进制的字节数组 byte[] messageBinary = BinaryTool.toBytes(message); /* * IDEA分析不出这个方法的实参究竟会传递什么, IDEA不确定这个锁是否能达到预期效果所以会报警告 * 后续调用这个方法传入的queue对象, 是后续通过内存, 管理的queue对象, 得是两个线程针对同一个对象加锁才有效 */ synchronized (queue) { //3.先获取到当前的队列数据文件的长度, 用这个来计算出该Message对象的offsetBeg和offsetEnd //把新的Message数据, 写入到队列数据文件的末尾, 此时Message对象的offsetBeg,就是当前文件长度+4 //offsetEnd就是当前文件长度 +4 + message自身长度. File queueDataFile = new File(getQueueDataPath(queue.getName())); message.setOffsetBeg(queueDataFile.length() + 4); message.setOffsetEnd(queueDataFile.length() + 4 + messageBinary.length); //4.写入消息到数据文件, 追加写入到数据文件末尾 try (OutputStream outputStream = new FileOutputStream(queueDataFile, true)) { //接下来先写当前消息长度, 占据4个字节 //outputStream.write(); //这个write方法参数是int类型, 但实际上只能写一个字节, 在流对象中经常会涉及到使用int表示byte的情况 //可以把int四个字节分别取出来, 一个字节一个字节的写呢?? —— 可以, Java中提供了现成的方法 try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){ dataOutputStream.writeInt(messageBinary.length); //写入消息本体 dataOutputStream.write(messageBinary); } } //5.更新消息统计文件 Stat stat = readStat(queue.getName()); stat.totalCount++; stat.validCount++; writeStat(queue.getName(), stat); } } /** * 逻辑删除消息, 也就是把硬盘上存储的数据得isValid属性设置成0 * 1.先把文件得这一段数据读出来, 还原回Message对象 * 2.把isValid改成0、 * 3.把上述数据重新写回到文件 * 此处参数中的Message对象必须包含 offsetBeg 和 offsetEnd 属性 且这里需要的是随机访问, 而不是从头开始读所以不能用上面的读文件方法 * 何时去调用我们刚刚写的删除硬盘上消息对象的操作? 消费者把消息正确处理了即可删除 * 这个删除就是把内存的Message对象都删除 isValid属性只是用来在文件中标识这个消息有效这样的作用的,内存中删除Message对象比较容易 */ public void deleteMessage(MSGQueue queue, Message message) throws IOException, ClassNotFoundException { synchronized (queue) { try(RandomAccessFile randomAccessFile = new RandomAccessFile(getQueueDataPath(queue.getName()), "rw")){ //1.先从文件中读取对应的Message数据 byte[] messageBinary = new byte[(int) (message.getOffsetEnd() - message.getOffsetBeg())]; randomAccessFile.seek(message.getOffsetBeg()); randomAccessFile.read(messageBinary); //2.把当前读出来的二进制数据, 转换成Message对象 Message diskMassage = (Message)BinaryTool.fromBytes(messageBinary); //3.把isValid设置为无效 diskMassage.setIsValid((byte)0x0); //4.重新写入文件 byte[] bufferDest = BinaryTool.toBytes(diskMassage); //虽然上面已经seek过了, 但是上面seek完了之后进行读操作, 这一读就导致文件光标往后移动 //移动到下一个消息的位置了, 因此要想让接下来的写入能够刚好回到之前写入的位置, 就需要重新调整文件光标 //通过这些操作对于文件来说只有标志位发生了变化 randomAccessFile.seek(diskMassage.getOffsetBeg()); randomAccessFile.write(bufferDest); } //更新统计文件! 把一个消息设为无效此时有效消息个数就-1 Stat stat = readStat(queue.getName()); if (stat.validCount > 0){ stat.totalCount--; } writeStat(queue.getName(), stat); } }
-
加载文件中消息到内存接口
/** * 从文件中读取出所有的消息内容加载到内存中 - 在程序启动的时候进行调用 * 使用LinkedList主要是后续为了方便进行头删操作 * 准备在程序驱动的时候进行调用, 这个方法的参数,只是一个queueName而不是MSG对象,因为该方法不需要加锁,只有queueName就够了 * 由于该方法是在程序启动时调用, 此时服务器还不能处理请求, 不涉及多线程操作文件 */ public LinkedList<Message> loadAllMessageFromQueue(String queueName) throws IOException, MqException, ClassNotFoundException { LinkedList<Message> messages = new LinkedList<>(); try (InputStream inputStream = Files.newInputStream(Paths.get(getQueueDataPath(queueName)))){ try (DataInputStream dataInputStream = new DataInputStream(inputStream)){ //记录光标位置 long currentOffset = 0; //一个文件中包含了很多消息, 此处势必要循环读取 while (true){ //1.读取当前消息长度, 这里的readInt可能会读到文件末尾 // readInt方法读到文件末尾会抛出EOFException异常, 这一点和之前的流对象不太一样 int messageSize = dataInputStream.readInt(); //2.按照这个长度读取消息内容 byte[] buffer = new byte[messageSize]; int actualSize = dataInputStream.read(buffer); if (messageSize != actualSize){ //如果不匹配说明文件有问题, 格式错乱了 throw new MqException("[MessageFileManager] 文件格式错误! queueName=" + queueName); } //3.把这个读到的二进制数据, 反序列化Message对象 Message message = (Message)BinaryTool.fromBytes(buffer); //4.判定一下这个消息对象是不是无效对象 if (message.getIsValid() != 0x1){ //无效数据, 直接跳过 currentOffset += (4 + messageSize); continue; } //5.有效数据则需要把Message对象加入到链表中, 还需要写入offsetBeg和offsetEnd //在进行计算Offset的时候, 需要知道当前文件光标的位置的, 由于当下使用的DataInputStream并不方便 //因此就需要手动计算下文件光标 message.setOffsetBeg(currentOffset + 4); message.setOffsetEnd(currentOffset + 4 + messageSize); currentOffset += (4 + messageSize); messages.add(message); } }catch (EOFException e){ //这个catch并非处理异常, 而是处理正常业务逻辑, 文件读到末尾, 会被readInt抛出该异常 //这个catch语句也不需要做啥特殊的事情 System.out.println("[MessageFileManager] 恢复 Message 数据完成!"); } } return messages; }
-
-
垃圾回收:
根据上述代码,此时又产生了一个问题:随着时间的推移,这个消息文件可能会越来越大,有可能大部分都是无效消息。针对这种情况,就需要考虑当前消息数据文件进行垃圾回收(GC),使用**复制算法针对消息数据文件中的垃圾进行回收,即直接遍历原有消息数据文件,把所有的有效的数据拷贝到一个新的文件中,最后把整个旧的文件删除即可。**前提是当前的空间中有效数不多,大部分都是无效数据。
什么时候触发GC?什么时候才算有效数据不多,垃圾很多呢?此处做出如下约定:
当总的消息数目超过3000,并且有效消息的数目低于总数目的50%,就触发一次GC;并且使用queue_stat.txt文件来存储相关信息,文件格式:使⽤⽂本⽅式存储。⽂件中只包含⼀⾏, ⾥⾯包含两列(都是整数),使⽤ \t 分割. 第⼀列表⽰当前总的消息数⽬;第⼆列表⽰有效消息数⽬;形如:
3000\t2000
还有一个问题,若此时某个队列中消息的数量特别多且都是有效消息,此时就会导致整个消息的数据文件特别大,后续针对整个文件的各种操作,成本上升很多。如:这个文件大小是10G,此时如果触发一次GC,整体的耗时就会非常高。
对于RabbitMQ来说,解决方案是把一个大的文件,拆成若干小的文件,即当单个文件长度达到一定阈值后,就会拆分成两个文件;以及每个单独的文件都会进行GC,如果GC之后,发现文件小了很多,就可能会和相邻的其他文件合并 – 扩展实现;
gc回收方法如下:
//检查是否要针对该队列的消息数据文件进行GC public boolean checkGC(String queueName) throws IOException { //判定是否要GC, 根据总消息数和有效消息数(这两个值都是消息统计文件中) Stat stat = readStat(queueName); if (stat != null && stat.totalCount > 3000 && (double) stat.validCount / (double) stat.totalCount < 0.5) { return true; } return false; } private String getQueueDataNewPath(String queueName) { return getQueueDir(queueName) + "/queue_data_new.txt"; } /** * 通过这个方法真正执行消息数据文件的垃圾回收操作, 使用复制算法来完成 * 创建一个新的文件名字就是queue_data_new.txt, 把之前消息数据文件中的有效消息都读出来写到新的文件中 * 删除旧的文件, 再把文件改名回queue_data.txt, 也需要更新垃圾统计信息 */ public void gc(MSGQueue queue) throws IOException, MqException, ClassNotFoundException { //进行gc的时候是针对消息数据文件进行大洗牌, 其他线程不能针对该队列的消息文件做任何修改 synchronized (queue) { //由于gc操作可能比较耗时此处统计一下消耗的时间 long gcBeg = System.currentTimeMillis(); //1.创建一个新的文件 File queueDataNewFile = new File(getQueueDataNewPath(queue.getName())); if (queueDataNewFile.exists()){ //正常情况下, 这个文件不应该存在, 如果存在, 就是意外。说明上次gc了一半程序意外崩溃 throw new MqException("[MessageFileManager] gc时发现该队列的 queue_data_new 已经存在! queueName=" + queue.getName()); } boolean ok = queueDataNewFile.createNewFile(); if (!ok){ throw new MqException("[MessageFileManager] 创建文件失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath()); } //2.从旧文件中读取所有的消息 LinkedList<Message> messages = loadAllMessageFromQueue(queue.getName()); //3.有效消息, 写入到新的文件中 try(OutputStream outputStream = Files.newOutputStream(queueDataNewFile.toPath())) { try(DataOutputStream dataOutputStream = new DataOutputStream(outputStream)){ for (Message message : messages){ byte[] buffer = BinaryTool.toBytes(message); //先写四个字节消息的长度 dataOutputStream.writeInt(buffer.length); dataOutputStream.write(buffer); } } } //4,删除旧的数据文件, 并且把新的文件重命名 File queueDataOldFile = new File(getQueueDataPath(queue.getName())); ok = queueDataOldFile.delete(); if (!ok){ throw new MqException("[MessageFileManager] 删除旧的数据文件失败! queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //5.重命名文件名 ok = queueDataNewFile.renameTo(queueDataOldFile); if (!ok){ throw new MqException("[MessageFileManager] 文件重命名失败! queueDataNewFile=" + queueDataNewFile.getAbsolutePath() + ", queueDataOldFile=" + queueDataOldFile.getAbsolutePath()); } //6.更新统计文件 Stat stat = readStat(queue.getName()); stat.totalCount = messages.size(); stat.validCount = messages.size(); writeStat(queue.getName(), stat); long gcEnd = System.currentTimeMillis(); System.out.println("[MessageFileManager] gc 执行完毕! queueName=" + queue.getName() + ", time=" + (gcEnd - gcBeg) + "ms"); } }
-
消费者订阅消息接口:此处约定消费者按照轮询的方式进行消费。
/** 订阅消息. * 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者. * consumerTag: 消费者的身份标识 * autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答. * consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作为lambda . */ public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { // 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中. queueName = virtualHostName + queueName; try { consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer); System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName); e.printStackTrace(); return false; } } public boolean basicAck(String queueName, String messageId) { queueName = virtualHostName + queueName; try { // 1. 获取到消息和队列 Message message = memoryDataCenter.getMessage(messageId); if (message == null) { throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId); } MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName); } // 2. 删除硬盘上的数据 if (message.getDeliverMode() == 2) { diskDataCenter.deleteMessage(queue, message); } // 3. 删除消息中心中的数据 memoryDataCenter.removeMessage(messageId); // 4. 删除待确认的集合中的数据 memoryDataCenter.removeMessageWaitAck(queueName, messageId); System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId=" + messageId); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName + ", messageId=" + messageId); e.printStackTrace(); return false; } }
-
6.2 客户端模块
-
职责:处理客户端与服务器的交互逻辑。
-
子模块:
-
连接管理:
-
利用一个ConnectionFactory类来建立/关闭 TCP 连接,处理心跳检测和超时断连。
package com.example.mq.mqclient; import java.io.IOException; public class ConnectionFactory { //broker server 的ip地址 private String host; //broker server的端口号 private int port; //访问broker server的哪个虚拟主机 private String virtualHostName; private String username; private String password; public Connection newConnection() throws IOException { Connection con = new Connection(host, port); return con; } }
-
-
信道管理:
-
利用Connection类和Channel类管理在单个连接上复用多个逻辑信道(Channel),隔离操作流,需提供一系列的方法和服务器提供的核心api对应。
public class Connection { private Socket socket = null; // 需要管理多个 channel. 使用一个 hash 表把若干个 channel 组织起来. private ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); private InputStream inputStream; private OutputStream outputStream; private DataInputStream dataInputStream; private DataOutputStream dataOutputStream; private ExecutorService callbackPool = null; public Connection(String host, int port) throws IOException { socket = new Socket(host, port); inputStream = socket.getInputStream(); outputStream = socket.getOutputStream(); dataInputStream = new DataInputStream(inputStream); dataOutputStream = new DataOutputStream(outputStream); callbackPool = Executors.newFixedThreadPool(4); // 创建一个扫描线程, 由这个线程负责不停的从 socket 中读取响应数据. 把这个响应数据再交给对应的 channel 负责处理. Thread t = new Thread(() -> { try { while (!socket.isClosed()) { Response response = readResponse(); dispatchResponse(response); } } catch (SocketException e) { // 连接正常断开的. 此时这个异常直接忽略. System.out.println("[Connection] 连接正常断开!"); } catch (IOException | ClassNotFoundException | MqException e) { System.out.println("[Connection] 连接异常断开!"); e.printStackTrace(); } }); t.start(); } public void close() { // 关闭 Connection 释放上述资源 try { callbackPool.shutdownNow(); channelMap.clear(); inputStream.close(); outputStream.close(); socket.close(); } catch (IOException e) { e.printStackTrace(); } } // 使用这个方法来分别处理, 当前的响应是一个针对控制请求的响应, 还是服务器推送的消息. private void dispatchResponse(Response response) throws IOException, ClassNotFoundException, MqException { if (response.getType() == 0xc) { // 服务器推送来的消息数据 SubScribeReturns subScribeReturns = (SubScribeReturns) BinaryTool.fromBytes(response.getPayload()); // 根据 channelId 找到对应的 channel 对象 Channel channel = channelMap.get(subScribeReturns.getChannelId()); if (channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId()); } // 执行该 channel 对象内部的回调. callbackPool.submit(() -> { try { channel.getConsumer().handleDelivery(subScribeReturns.getConsumerTag(), subScribeReturns.getBasicProperties(), subScribeReturns.getBody()); } catch (MqException | IOException e) { e.printStackTrace(); } }); } else { // 当前响应是针对刚才的控制请求的响应 BasicReturns basicReturns = (BasicReturns) BinaryTool.fromBytes(response.getPayload()); // 把这个结果放到对应的 channel 的 hash 表中. Channel channel = channelMap.get(basicReturns.getChannelId()); if (channel == null) { throw new MqException("[Connection] 该消息对应的 channel 在客户端中不存在! channelId=" + channel.getChannelId()); } channel.putReturns(basicReturns); } } // 发送请求 public void writeRequest(Request request) throws IOException { dataOutputStream.writeInt(request.getType()); dataOutputStream.writeInt(request.getLength()); dataOutputStream.write(request.getPayload()); dataOutputStream.flush(); System.out.println("[Connection] 发送请求! type=" + request.getType() + ", length=" + request.getLength()); } // 读取响应 public Response readResponse() throws IOException { Response response = new Response(); response.setType(dataInputStream.readInt()); response.setLength(dataInputStream.readInt()); byte[] payload = new byte[response.getLength()]; int n = dataInputStream.read(payload); if (n != response.getLength()) { throw new IOException("读取的响应数据不完整!"); } response.setPayload(payload); System.out.println("[Connection] 收到响应! type=" + response.getType() + ", length=" + response.getLength()); return response; } // 通过这个方法, 在 Connection 中能够创建出一个 Channel public Channel createChannel() throws IOException { String channelId = "C-" + UUID.randomUUID().toString(); Channel channel = new Channel(channelId, this); // 把这个 channel 对象放到 Connection 管理 channel 的 哈希表 中. channelMap.put(channelId, channel); // 同时也需要把 "创建 channel" 的这个消息也告诉服务器. boolean ok = channel.createChannel(); if (!ok) { // 服务器这里创建失败了!! 整个这次创建 channel 操作不顺利!! // 把刚才已经加入 hash 表的键值对, 再删了. channelMap.remove(channelId); return null; } return channel; } }
public class Channel { private String channelId; // 当前这个 channel 属于哪个连接. private Connection connection; // 用来存储后续客户端收到的服务器的响应. private ConcurrentHashMap<String, BasicReturns> basicReturnsMap = new ConcurrentHashMap<>(); /**如果当前 Channel 订阅了某个队列, 就需要在此处记录下对应回调是啥. 当该队列的消息返回回来的时候, 调用回调. * 此处约定一个 Channel 中只能有一个回调. */ private Consumer consumer = null; public Channel(String channelId, Connection connection) { this.channelId = channelId; this.connection = connection; } // 在这个方法中, 和服务器进行交互, 告知服务器, 此处客户端创建了新的 channel 了. public boolean createChannel() throws IOException { // 对于创建 Channel 操作来说, payload 就是一个 basicArguments 对象 BasicArguments basicArguments = new BasicArguments(); basicArguments.setChannelId(channelId); basicArguments.setRid(generateRid()); byte[] payload = BinaryTool.toBytes(basicArguments); Request request = new Request(); request.setType(0x1); request.setLength(payload.length); request.setPayload(payload); // 构造出完整请求之后, 就可以发送这个请求了. connection.writeRequest(request); // 等待服务器的响应 BasicReturns basicReturns = waitResult(basicArguments.getRid()); return basicReturns.isOk(); } /** * 响应的顺序和请求的顺序并非是一致的, 所以客户端用一个线程来读取socket中收到的所有响应数据 * 读取后把响应数据放到hash表中, 客户端中其他的代码发了请求后就可以不停的去哈希表中查找与自己匹配的响应, * 用以下方法来阻塞等待服务器的响应. */ private BasicReturns waitResult(String rid) { BasicReturns basicReturns = null; while ((basicReturns = basicReturnsMap.get(rid)) == null) { // 如果查询结果为 null, 说明包裹还没回来,此时就需要阻塞等待. synchronized (this) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } // 读取成功之后, 还需要把这个消息从哈希表中删除掉. basicReturnsMap.remove(rid); return basicReturns; } public void putReturns(BasicReturns basicReturns) { basicReturnsMap.put(basicReturns.getRid(), basicReturns); synchronized (this) { // 当前也不知道有多少个线程在等待上述的这个响应.故把所有的等待的线程都唤醒. notifyAll(); } } private String generateRid() { return "R-" + UUID.randomUUID().toString(); } // 关闭 channel, 给服务器发送一个 type = 0x2 的请求 public boolean close() throws IOException { BasicArguments basicArguments = new BasicArguments(); basicArguments.setRid(generateRid()); basicArguments.setChannelId(channelId); byte[] payload = BinaryTool.toBytes(basicArguments); Request request = new Request(); request.setType(0x2); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(basicArguments.getRid()); return basicReturns.isOk(); } // 创建交换机 public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException { ExchangeDeclareArguments exchangeDeclareArguments = new ExchangeDeclareArguments(); exchangeDeclareArguments.setRid(generateRid()); exchangeDeclareArguments.setChannelId(channelId); exchangeDeclareArguments.setExchangeName(exchangeName); exchangeDeclareArguments.setExchangeType(exchangeType); exchangeDeclareArguments.setDurable(durable); exchangeDeclareArguments.setAutoDelete(autoDelete); exchangeDeclareArguments.setArguments(arguments); byte[] payload = BinaryTool.toBytes(exchangeDeclareArguments); Request request = new Request(); request.setType(0x3); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(exchangeDeclareArguments.getRid()); return basicReturns.isOk(); } // 删除交换机 public boolean exchangeDelete(String exchangeName) throws IOException { ExchangeDeleteArguments arguments = new ExchangeDeleteArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setExchangeName(exchangeName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x4); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 创建队列 public boolean queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException { QueueDeclareArguments queueDeclareArguments = new QueueDeclareArguments(); queueDeclareArguments.setRid(generateRid()); queueDeclareArguments.setChannelId(channelId); queueDeclareArguments.setQueueName(queueName); queueDeclareArguments.setDurable(durable); queueDeclareArguments.setExclusive(exclusive); queueDeclareArguments.setAutoDelete(autoDelete); queueDeclareArguments.setArguments(arguments); byte[] payload = BinaryTool.toBytes(queueDeclareArguments); Request request = new Request(); request.setType(0x5); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(queueDeclareArguments.getRid()); return basicReturns.isOk(); } // 删除队列 public boolean queueDelete(String queueName) throws IOException { QueueDeleteArguments arguments = new QueueDeleteArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x6); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 创建绑定 public boolean queueBind(String queueName, String exchangeName, String bindingKey) throws IOException { QueueBindArguments arguments = new QueueBindArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setExchangeName(exchangeName); arguments.setBindingKey(bindingKey); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x7); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 解除绑定 public boolean queueUnbind(String queueName, String exchangeName) throws IOException { QueueUnbindArguments arguments = new QueueUnbindArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setExchangeName(exchangeName); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x8); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 发送消息 public boolean basicPublish(String exchangeName, String routingKey, BasicProperties basicProperties, byte[] body) throws IOException { BasicPublishArguments arguments = new BasicPublishArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setExchangeName(exchangeName); arguments.setRoutingKey(routingKey); arguments.setBasicProperties(basicProperties); arguments.setBody(body); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0x9); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 订阅消息 public boolean basicConsume(String queueName, boolean autoAck, Consumer consumer) throws MqException, IOException { // 先设置回调. if (this.consumer != null) { throw new MqException("该 channel 已经设置过消费消息的回调了, 不能重复设置!"); } this.consumer = consumer; BasicConsumeArguments arguments = new BasicConsumeArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setConsumerTag(channelId); // 此处 consumerTag 也使用 channelId 来表示了. arguments.setQueueName(queueName); arguments.setAutoAck(autoAck); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0xa); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } // 确认消息 public boolean basicAck(String queueName, String messageId) throws IOException { BasicAckArguments arguments = new BasicAckArguments(); arguments.setRid(generateRid()); arguments.setChannelId(channelId); arguments.setQueueName(queueName); arguments.setMessageId(messageId); byte[] payload = BinaryTool.toBytes(arguments); Request request = new Request(); request.setType(0xb); request.setLength(payload.length); request.setPayload(payload); connection.writeRequest(request); BasicReturns basicReturns = waitResult(arguments.getRid()); return basicReturns.isOk(); } }
-
-
消费者管理:
-
消费者函数式接口
/* * 只是一个单纯的函数式接口(回调函数). 收到消息之后要处理消息时调用的方法. */ @FunctionalInterface public interface Consumer { // Delivery 的意思是 "投递", 这个方法预期是在每次服务器收到消息之后, 来调用. // 通过这个方法把消息推送给对应的消费者. // (此处的方法名和参数, 参考 RabbitMQ 展开) void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException; }
-
消费者完整的执行环境类
/* * 表示一个消费者完整的执行环境: 执行的数据和配置 */ public class ConsumerEnv { private String consumerTag; private String queueName; private boolean autoAck; // 通过这个回调来处理收到的消息. private Consumer consumer; public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { this.consumerTag = consumerTag; this.queueName = queueName; this.autoAck = autoAck; this.consumer = consumer; } }
-
虚拟机中消费者订阅与确认消息的方法
/** 订阅消息. * 添加一个队列的订阅者, 当队列收到消息之后, 就要把消息推送给对应的订阅者. * consumerTag: 消费者的身份标识 * autoAck: 消息被消费完成后, 应答的方式. 为 true 自动应答. 为 false 手动应答. * consumer: 是一个回调函数. 此处类型设定成函数式接口. 这样后续调用 basicConsume 并且传实参的时候, 就可以写作为lambda . */ public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { // 构造一个 ConsumerEnv 对象, 把这个对应的队列找到, 再把这个 Consumer 对象添加到该队列中. queueName = virtualHostName + queueName; try { consumerManager.addConsumer(consumerTag, queueName, autoAck, consumer); System.out.println("[VirtualHost] basicConsume 成功! queueName=" + queueName); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicConsume 失败! queueName=" + queueName); e.printStackTrace(); return false; } } public boolean basicAck(String queueName, String messageId) { queueName = virtualHostName + queueName; try { // 1. 获取到消息和队列 Message message = memoryDataCenter.getMessage(messageId); if (message == null) { throw new MqException("[VirtualHost] 要确认的消息不存在! messageId=" + messageId); } MSGQueue queue = memoryDataCenter.getQueue(queueName); if (queue == null) { throw new MqException("[VirtualHost] 要确认的队列不存在! queueName=" + queueName); } // 2. 删除硬盘上的数据 if (message.getDeliverMode() == 2) { diskDataCenter.deleteMessage(queue, message); } // 3. 删除消息中心中的数据 memoryDataCenter.removeMessage(messageId); // 4. 删除待确认的集合中的数据 memoryDataCenter.removeMessageWaitAck(queueName, messageId); System.out.println("[VirtualHost] basicAck 成功! 消息被成功确认! queueName=" + queueName + ", messageId=" + messageId); return true; } catch (Exception e) { System.out.println("[VirtualHost] basicAck 失败! 消息确认失败! queueName=" + queueName + ", messageId=" + messageId); e.printStackTrace(); return false; } }
-
消费者消费消息的过程:所谓的消费消息就是让线程池执行消费者中对应的回调函数,在调用回调函数的时候把消息的内容通过参数传进去,消费者在最初订阅消息的时候就把回调注册给broker server(回调的内容时消费者确定的,具体的逻辑取决于它自己的业务逻辑)。因此此处客户端要做的逻辑就是能够让线程池知道执行哪个回调函数以及参数是哪个消息(来自那个队列),就需要有一个单独的扫描线程能够感知到哪个队列收到了新消息。这个过程会出现以下几个问题:
问题1:扫描线程也可以执行线程池的回调功能,为什么还需要线程池呢?答:由于是消费者给出的回调,具体的业务逻辑是什么无从得知,可能是比较耗时的业务,这时若只有一个单独的扫描线程,就可能周转不开。故需要线程池来执行这个可能会比较复杂的逻辑。
问题2:当前有很多个队列,扫描线程就一个,扫描线程如何得知当前是哪个队列有新消息?答:一个简单粗暴的办法就直接让扫描线程不停循环遍历所有队列,如果发现有新的元素就立即处理。还有一个更好的办法,再引入一个阻塞队列,队列中的元素就是有新消息的队列名字,扫描线程只需扫描这一个阻塞队列即可,此时阻塞队列中传递的队列名,相当于令牌。
问题3:如何确保消息是被正确的消费掉?答:消费者的回调方法,顺利执行完未抛异常,此时该条消息的使命就已完成,可以删除。消息确认也就是为了确保消息正确收到未丢失,因此需要:
- 在未执行回调之前,把这个消息先放到“待确认集合”中,避免因为回调失败导致消息丢失。
- 执行回调。
- 当前消费者采取的是autoAck=true,就认为回调执行完毕不抛异常就算消费成功。
- 删除硬盘和内存中消息中哈希表以及待确认消息集合中的该消息。
- 若当前消费者采取的是autoAck=false,即手动应答。需要消费者在自己的回调方法内部显式调用basicAck这个核心API。
问题4:执行回调的过程中抛出异常怎么处理?若发生这种情况,RabbitMQ的做法是另外搞一个扫描进程(erlang中的进程…)负责关注这个待确认集合中,每个待确认的消息待多久了,如果待的时间超出了范围就会把这个消息放到特定队列(死信队列 – 扩展)。
问题5:执行回调过程中,broker server崩溃,内存中数据全部丢失怎么处理?内存数据全部丢失,但硬盘数据还在,此时重启broker server,就会把硬盘中数据全部再加载到内存。消费者就可以重新消费该消息。
消费者总管理类
/** * 通过这个类, 来实现消费消息的核心逻辑. */ public class ConsumerManager { // 持有上层的 VirtualHost 对象的引用. 用来操作数据. private VirtualHost parent; // 指定一个线程池, 负责去执行具体的回调任务. private ExecutorService workerPool = Executors.newFixedThreadPool(4); // 存放令牌的队列 private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>(); // 扫描线程 private Thread scannerThread = null; public ConsumerManager(VirtualHost p) { parent = p; scannerThread = new Thread(() -> { while (true) { try { // 1. 拿到令牌 String queueName = tokenQueue.take(); // 2. 根据令牌, 找到队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null) { throw new MqException("[ConsumerManager] 取令牌后发现, 该队列名不存在! queueName=" + queueName); } // 3. 从这个队列中消费一个消息. synchronized (queue) { consumeMessage(queue); } } catch (InterruptedException | MqException e) { e.printStackTrace(); } } }); // 把线程设为后台线程. scannerThread.setDaemon(true); scannerThread.start(); } // 这个方法的调用时机就是发送消息的时候. public void notifyConsume(String queueName) throws InterruptedException { tokenQueue.put(queueName); } public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException { // 找到对应的队列 MSGQueue queue = parent.getMemoryDataCenter().getQueue(queueName); if (queue == null) { throw new MqException("[ConsumerManager] 队列不存在! queueName=" + queueName); } ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag, queueName, autoAck, consumer); synchronized (queue) { queue.addConsumerEnv(consumerEnv); // 如果当前队列中已经有了一些消息了, 需要立即就消费掉. int n = parent.getMemoryDataCenter().getMessageCount(queueName); for (int i = 0; i < n; i++) { // 这个方法调用一次就消费一条消息. consumeMessage(queue); } } } private void consumeMessage(MSGQueue queue) { // 1. 按照轮询的方式, 找个消费者出来. ConsumerEnv luckyDog = queue.chooseConsumer(); if (luckyDog == null) { // 当前队列没有消费者, 暂时不消费. 等后面有消费者出现再说. return; } // 2. 从队列中取出一个消息 Message message = parent.getMemoryDataCenter().pollMessage(queue.getName()); if (message == null) { // 当前队列中还没有消息, 也不需要消费. return; } // 3. 把消息带入到消费者的回调方法中, 丢给线程池执行. workerPool.submit(() -> { try { // 1. 把消息放到待确认的集合中. 这个操作势必在执行回调之前. parent.getMemoryDataCenter().addMessageWaitAck(queue.getName(), message); // 2. 真正执行回调操作 luckyDog.getConsumer().handleDelivery(luckyDog.getConsumerTag(), message.getBasicProperties(), message.getBody()); // 3. 如果当前是 "自动应答" , 就可以直接把消息删除了. // 如果当前是 "手动应答" , 则先不处理, 交给后续消费者调用 basicAck 方法来处理. if (luckyDog.isAutoAck()) { // 1) 删除硬盘上的消息 if (message.getDeliverMode() == 2) { parent.getDiskDataCenter().deleteMessage(queue, message); } // 2) 删除上面的待确认集合中的消息 parent.getMemoryDataCenter().removeMessageWaitAck(queue.getName(), message.getMessageId()); // 3) 删除内存中消息中心里的消息 parent.getMemoryDataCenter().removeMessage(message.getMessageId()); System.out.println("[ConsumerManager] 消息被成功消费! queueName=" + queue.getName()); } } catch (Exception e) { e.printStackTrace(); } }); } }
-
-
6.3 公共模块
-
职责:客户端和服务器的共享代码与基础功能。
-
子模块:
-
通信协议:
-
定义客户端与服务器的应用层通信协议(AMQP 精简版),包括帧格式和方法映射。
请求/响应格式:
其中 type 表⽰请求响应不同的功能. 取值如下:
0x1 0x2 0x3 0x4 0x5 0x6 0x7 0x8 0x9 0xa 0xb 创建 channel 关闭 channel 创建 exchange 销毁 exchange 创建 queue 销毁 queue 创建 binding 销毁 binding 发送 message 订阅 message 返回 ack 还有一个
0xc
是服务器给客⼾端推送的消息,是被订阅的消息响应独有的。其中 payload 部分, 会根据不同的 type, 存在不同的格式. 对于请求来说, payload 表⽰这次⽅法调⽤的各种参数信息. 对于响应来说, payload 表⽰这次⽅法调⽤的返回值。请求/响应类
/* * 表示一个网络通信中的请求对象. 按照自定义协议的格式来展开的 */ public class Request { private int type; private int length; private byte[] payload; }
/* * 这个类表示一个响应. 也是根据自定义应用层协议来的 */ public class Response { private int type; private int length; private byte[] payload; }
-
-
客户端和服务器的公共代码:
-
消费者完整的执行环境类
/* * 表示一个消费者完整的执行环境: 执行的数据和配置 */ public class ConsumerEnv { private String consumerTag; private String queueName; private boolean autoAck; // 通过这个回调来处理收到的消息. private Consumer consumer; public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) { this.consumerTag = consumerTag; this.queueName = queueName; this.autoAck = autoAck; this.consumer = consumer; } }
-
-
序列化/反序列化:
-
将消息内容(如 JSON、二进制数据)转换为协议兼容格式。
/** * Java中对象,都可以通过以下逻辑进行序列化/反序列化 * 前提是要让这个类能够实现Serializable这样的接口 */ public class BinaryTool { //把一个对象序列化成一个字节数组 public static byte[] toBytes(Object object) throws IOException { //这个流相当于一个变长字节数组 //既可以把object序列化的数据给之间的写入到byteArrayOutputStream中, 再统一成byte[] ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); try(ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream)){ //此处的writeObject就会把该对象进行序列化, 生成的二进制字节数据, 就会写入到objectOutputStream由于它有关联到了ByteArrayOutputStream // 最终就写入到ByteArrayOutputStream objectOutputStream.writeObject(object); } //这个操作把byteArrayOutputStream中数据取出来转成byte[] return byteArrayOutputStream.toByteArray(); } //把一个字节数组, 反序列化成一个对象 public static Object fromBytes(byte[] data) throws IOException, ClassNotFoundException { Object object = null; try(ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)){ try(ObjectInputStream objectInputStream = new ObjectInputStream(byteArrayInputStream)){ //此处的readObject就是从data这个byte[]中读取数据进行反序列化 object = objectInputStream.readObject(); } } return object; } }
-
-