仿 RabbitMQ 的消息队列1(实战项目)
一,消息队列的背景知识
我们以前学过阻塞队列,其实阻塞队列和消息队列的原理差不多。
在实际的后端开发中, 尤其是分布式系统⾥, 跨主机之间使⽤⽣产者消费者模型, 也是⾮常普遍的需求.
因此, 我们通常会把阻塞队列, 封装成⼀个独⽴的服务器程序, 并且赋予其更丰富的功能.
这样的程序我们就称为 消息队列 (Message Queue, MQ)
二,需求分析
具体的生产者 消费者 阻塞队列之间关系如图:
我们主要就是对Broker server进行开发:
在 Broker 中, ⼜存在以下概念.
- 虚拟机 (VirtualHost): 类似于 MySQL 的 “database”, 是⼀个逻辑上的集合. ⼀个 BrokerServer 上可以存在多个 VirtualHost.
- 交换机 (Exchange): ⽣产者把消息先发送到 Broker 的 Exchange 上. 再根据不同的规则, 把消息转发给不同的 Queue.
- 队列 (Queue): 真正⽤来存储消息的部分. 每个消费者决定⾃⼰从哪个 Queue 上读取消息.
- 绑定 (Binding): Exchange 和 Queue 之间的关联关系. Exchange 和 Queue 可以理解成 “多对多” 关系. 使⽤⼀个关联表就可以把这两个概念联系起来.
- 消息 (Message): 传递的内容.
- 如图所示:
核心API
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
交换机类型
• Direct: ⽣产者发送消息时, 直接指定被该交换机绑定的队列名.
• Fanout: ⽣产者发送的消息会被复制到该交换机的所有队列中.
• Topic: 绑定队列到交换机上时, 指定⼀个字符串为 bindingKey. 发送消息指定⼀个字符串为routingKey. 当 routingKey 和 bindingKey 满⾜⼀定的匹配条件的时候, 则把消息投递到指定队列.(这里看不明白就当先了解概念,后面涉及到的时候会详谈)。
持久化
Exchange, Queue, Binding, Message 都有持久化需求.
当程序重启 / 主机重启, 保证上述内容不丢失.
网络通信
- 创建 Connection
- 关闭 Connection
- 创建 Channel
- 关闭 Channel
- 创建队列 (queueDeclare)
- 销毁队列 (queueDelete)
- 创建交换机 (exchangeDeclare)
- 销毁交换机 (exchangeDelete)
- 创建绑定 (queueBind)
- 解除绑定 (queueUnbind)
- 发布消息 (basicPublish)
- 订阅消息 (basicConsume)
- 确认消息 (basicAck)
可以看到, 在 broker 的基础上, 客⼾端还要增加 Connection 操作和 Channel 操作.Connection 对应⼀个 TCP 连接.Channel 则是 Connection 中的逻辑通道.⼀个 Connection 中可以包含多个 Channel.Channel 和 Channel 之间的数据是独⽴的. 不会相互⼲扰.这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接.
这样的设定主要是为了能够更好的复⽤ TCP 连接, 达到⻓连接的效果, 避免频繁的创建关闭 TCP 连接
消息应答
• ⾃动应答: 消费者只要消费了消息, 就算应答完毕了. Broker 直接删除这个消息.
• ⼿动应答: 消费者⼿动调⽤应答接⼝, Broker 收到应答请求之后, 才真正删除这个消息.(⼿动应答的⽬的, 是为了保证消息确实被消费者处理成功了. 在⼀些对于数据可靠性要求⾼的场景, ⽐较常⻅.)
三,模块划分
四,项目创建
创建 SpringBoot 项⽬.
使⽤ SpringBoot 2 系列版本, Java 17.
依赖引⼊ Spring Web 和 MyBatis.
五,创建核心类
先将这几个包创建了,common(共同部分),mqclient(客户端),mqserver(服务器)
mqserver里:core(核心)mapper(数据库的映射),model
在core包里创建核心类:Exchange
/**
* 这个类表示一个交换机
*/
public class Exchange {
//此处用name表示交换机的身份识别:(唯一)
private String name;
//交换机类型:direct,fanout,topic
private ExchangeType type = ExchangeType.DIRECT;
//该交换机是否要持久化储存, true为持久化,false为不持久化
private boolean durable = false;
//自动删除:如果当前交换机没人用了,就自动删除,此功能我们只是列出来,后续不会进行实现(RabbiteMq是有的)
private boolean autoDelete = false;
//arguments表示创建交换机的一些额外参数,今后我们并不会实现,只是先列出来。
private Map<String, Object> arguments = new HashMap<>();
//先省略getter和setter,因为这里还有细节,后面会细说。
}
创建ExchangeType类
public enum ExchangeType {
DIRECT(0),
FANOUT(1),
TOPIC(2);
private final int type;
ExchangeType(int type){
this.type = type;
}
public int getType(){
return type;
}
}
创建MESGQueue 类
(由于直接使用名字Queue和标准库里的queue重复,所以就以MESGQueue命名消息队列)
/**
* MESG ->message MESGQueue = 消息队列,储存消息的队列
*/
public class MESGQueue {
//名字:
private String name;
//是否持久化保存:true持久化保存, false不持久化保存
private boolean durable = false;
//独有的,如果为true表示只能有一个消费者使用,如果为false表示所有的消费者都能使用,此处只是列出来,暂时不做实现
private boolean exclusive = false;
//是否自动删除:当不再使用这个消息队列的时候是否自动删除。此处只是列出来,暂时不做实现
private boolean autoDelete = false;
//表示拓展参数:此处只是列出来,暂时不做实现
private Map<String,Object> arguments = new HashMap<>();
//先省略getter和setter,因为这里还有细节,后面会细说。
创建Binding类
/**
* 队列与交换机之间的关联关系
*/
public class Binding {
private String exchangeName;
private String queueName;
//bindingKey就是在出题,当发来一个消息的时候会附带一个routingKey,此时会验证routingKey是否和bindingKey符合
//某种匹配规则,如果符合,就将这个消息加入到该消息队列当中。
private String bindingKey;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
创建消息 Message类
由于以后的消息要储存在文件当中,所以要进行序列化,因此这个Message要实现Serializable接口。
/**
* 表示一个要传递的消息
*/
public class Message implements Serializable {
//要传递消息的属性:
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
//表示偏移量,由于我们要将消息储存在一个文件中,所以记忆好 begin和end 能找到这个消息的具体存在的位置[begin,end)
private transient long offsetBeg;//transient表示不被序列化
private transient long offsetEnd;
//0x1表示有效 0x0表示无效
private byte isVail = 0x1;
//创建一个能自动生成message的工厂类:
public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body){
Message message = new Message();
if(basicProperties != null) {
message.setBasicProperties(basicProperties);
}
message.setMessageId("M+"+ UUID.randomUUID().toString());
message.setRoutingKey(routingKey);
message.setBody(body);
//对于offsetBeg,offsetEnd,isVail,此时只是在内存中创建了一个对象,这些值会在之后的持久化操作中进行设置。
return message;
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setMessageId(String messageId){
this.basicProperties.setMessageId(messageId);
}
public String getMessageId(){
return this.basicProperties.getMessageId();
}
public void setRoutingKey(String routingKey){
this.basicProperties.setRoutingKey(routingKey);
}
public String getRoutingKey(){
return this.basicProperties.getRoutingKey();
}
public void setDeliverMode(int deliverMode){
this.basicProperties.setDeliverMode(deliverMode);
}
public int getDeliverMode(){
return this.basicProperties.getDeliverMode();
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsVail() {
return isVail;
}
public void setIsVail(byte isVail) {
this.isVail = isVail;
}
}
消息里的基本属性 BasicProperties 类
public class BasicProperties implements Serializable {
//使用String作为唯一身份标识,使用UUID 生成messageid
private String messageId;
//是一个消息 带有的内容,为了和bindingKey做匹配
//如果当前交换机是direct,routingKey就是转发的队列名
//如果当前交换机是fanout,不会使用routingKey,因为那样无意义。
//如果当前交换机是topic,此时bindingKey就要和routingKey做匹配,只有符合要求才会转发给相应的队列。
private String routingKey;
//表示是否要持久化,不持久化 1, 持久化:2
private int deliverMode = 1;
public String getMessageId() {
return messageId;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
public String getRoutingKey() {
return routingKey;
}
public void setRoutingKey(String routingKey) {
this.routingKey = routingKey;
}
public int getDeliverMode() {
return deliverMode;
}
public void setDeliverMode(int deliverMode) {
this.deliverMode = deliverMode;
}
}
总的来说,现在创建的类一共就这些:
六,数据库设计
对于 Exchange, MSGQueue, Binding, 我们使⽤数据库进⾏持久化保存.
此处我们使⽤的数据库是 SQLite, 是⼀个更轻量的数据库.
SQLite 只是⼀个动态库(当然, 官⽅也提供了可执⾏程序 exe), 我们在 Java 中直接引⼊ SQLite 依赖, 即可直接使⽤, 不必安装其他的软件.
1,配置sqlite,引⼊ pom.xml 依赖
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.41.0.1</version>
</dependency>
简图如下:
2,配置数据源 application.yml (默认是 .properties后缀,可以直接重命名改成yml后缀,这是常用的方法,但是改完以后就要注意格式对齐了)
spring:
datasource:
url: jdbc:sqlite:./data/meta.db
username:
password:
driver-class-name: org.sqlite.JDBC
mybatis:
mapper-locations: classpath:mapper/**Mapper.xml
Username 和 password 空着即可.
此处我们约定, 把数据库⽂件放到 ./data/meta.db 中.
SQLite 只是把数据单纯的存储到⼀个⽂件中. ⾮常简单⽅便.
3,创建数据库表
准备工作:
创建MetaMapper类,和MetaMapper.xml
metaMapper.xml里的配置
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.mq.mqserver.mapper.MetaMapper">
创建数据库表的方法实现
@Mapper
public interface MetaMapper {
//创建 交换机数据库表
void createExchangeTable();
//创建 队列数据库表
void createMESGQueueTable();
//创建 绑定数据库表
void createBindingTable();
}
但是,对于数据库的操作,有四种 insert,delete,select,update
可是,没有创建create啊,所以我们将create语句写在update里。
创建对应xml
<update id="createExchangeTable">
create table if not exists exchange(
name varchar(50) primary key,
type int,
durable boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update>
<update id="createMESGQueueTable">
create table if not exists MESGQueue(
name varchar(50) primary key,
durable boolean,
exclusive boolean,
autoDelete boolean,
arguments varchar(1024)
);
</update>
<update id="createBindingTable">
create table if not exists binding(
exchangeName varchar(50),
queueName varchar(50),
bindingKey varchar(256)
);
</update>
插入
//对ExchangeTable进行插入操作
void insertExchangeTable(Exchange exchange);
//对MESGQueueTable进行插入操作
void insertMESGQueueTable(MESGQueue mesgQueue);
//对BindingTable进行插入操作
void insertBindingTable(Binding binding);
插入对应 xml
<insert id="insertExchangeTable" parameterType="com.example.mq.mqserver.core.Exchange">
insert into exchange values (#{name},#{type},#{durable},#{autoDelete},#{arguments});
</insert>
<insert id="insertMESGQueueTable" parameterType="com.example.mq.mqserver.core.MESGQueue">
insert into MESGQueue values (#{name},#{durable},#{exclusive},#{autoDelete},#{arguments});
</insert>
<insert id="insertBindingTable" parameterType="com.example.mq.mqserver.core.Binding">
insert into binding values (#{exchangeName},#{queueName},#{bindingKey});
</insert>
删除
//对ExchangeTable进行删除操作
void deleteExchangeTable(String exchangeName);
//对MESGQueueTable进行删除操作
void deleteMESGQueueTable(String mesgQueueName);
//对BindingTable进行删除操作
void deleteBindingTable(Binding binding);
删除对应 xml
<delete id="deleteExchangeTable">
delete from exchange where name = #{exchangeName};
</delete>
<delete id="deleteMESGQueueTable">
delete from MESGQueue where name = #{mesgQueueName};
</delete>
<delete id="deleteBindingTable" parameterType="com.example.mq.mqserver.core.Binding">
delete from binding where exchangeName = #{exchangeName} and queueName = #{queueName};
</delete>
查询
//查找所有的exchange交换机
List<Exchange> selectExchangeTable();
//查找所有的DESGQueue消息队列
List<MESGQueue> selectMESGQueueTable();
//查找所有的Binding绑定
List<Binding> selectBindingTable();
查询对应 xml
<select id="selectExchangeTable" resultType="com.example.mq.mqserver.core.Exchange">
select * from exchange;
</select>
<select id="selectMESGQueueTable" resultType="com.example.mq.mqserver.core.MESGQueue">
select * from MESGQueue;
</select>
<select id="selectBindingTable" resultType="com.example.mq.mqserver.core.Binding">
select * from binding;
</select>