仿 RabbitMQ 消息队列6(实战项目)
⼗. 虚拟主机设计
创建 VirtualHost
创建mqserver.VirtualHost
在 RabbitMQ 中, 虚拟主机是可以随意创建/删除的. 咱们此处为了实现简单, 并没有实现虚拟主机的管理. 因此我们默认就只有⼀个虚拟主机的存在
/**
*
* 这个类就是用来实现之前 第二步需求分析就已经定义的接口,如果想回忆一下可以看看之前的博客:
* 另外,之前在写代码的时候,遇到异常就往上抛,等待上层的类来处理,此时这个类就是最上层的类,所有的异常都在这里统一解决
*
*
*创建队列 (queueDeclare)
* 销毁队列 (queueDelete)
* 创建交换机 (exchangeDeclare)
* 销毁交换机 (exchangeDelete)
* 创建绑定 (queueBind)
* 解除绑定 (queueUnbind)
* 发布消息 (basicPublish)
* 订阅消息 (basicConsume)
* 确认消息 (basicAck)
*
*
* 如何表示交换机和虚拟主机之间的关系?
* 方案一:参考数据库,“一对多”的方案,给交换机添加虚拟主机的属性
* 方案二:重新约定,交换机的名字=虚拟主机的名字+交换机的真实名字
* 我们使用的是方案二,因为方案二不会对我们原有的代码做出太大改动,而且还很方便
* 按照这个方式,就能区分不同的队列,在队列也加上交换机的前缀
* 进一步的,由于绑定是和队列和交换机都相关,此时绑定也就自然被隔开了
* 再进一步,消息和队列是强相关的,队列名区分开了,消息自然也就区分开了。
*
*/
public class VirtualHost {
private String virtualhostName;
//管理内存的类:
private MemoryDataCenter memoryDataCenter = new MemoryDataCenter();
//管理硬盘的类:
private DiskDataCenter diskDataCenter = new DiskDataCenter();
//其中 Router ⽤来定义转发规则, 这个内容后续会介绍,这里先定义在这
private Router router = new Router();
//由于可能会发生线程安全问题,所以需要锁对象:
private Object exchangeLocker = new Object();
private Object queueLocker = new Object();
//ConsumerManager ⽤来实现消息消费. 这个内容后续会介绍
private ConsumerManager consumerManager = new ConsumerManager(this);
}
对于上面出现的两个新的类Router 和ConsumerManager 不用担心,用到的时候,我们再细讲。
- Router类用在话题交换机发布消息的时候检查rouitingKey和bindingKey的合法性,以及验证routingKey和bindingKey是否匹配。
- ConsumerManager 类用在 管理消费者,那时候我们会将关于消费者的蔡操作都放在这个类里。
实现构造⽅法和 getter
构造⽅法中会针对 DiskDataCenter 和 MemoryDataCenter 进⾏初始化,同时会把硬盘的数据恢复到内存中.
这里的MemoryDataCenter里的init方法是空的,临时加上的,为了代码的美观性就临时在该类里加了个init空方法。
//提供构造方法和getter方法,对于虚拟机setter方法就不必了
public VirtualHost(String virtualhostName) {
this.virtualhostName = virtualhostName;
//初始化内存数据和硬盘数据:
diskDataCenter.init();
memoryDataCenter.init();
try{
//进行恢复数据操作:
memoryDataCenter.recovery(diskDataCenter);
}catch (Exception e){
e.printStackTrace();
System.out.println("[VirtualHost] 恢复数据失败!!!");
}
}
public String getVirtualhostName() {
return virtualhostName;
}
public MemoryDataCenter getMemoryDataCenter() {
return memoryDataCenter;
}
public DiskDataCenter getDiskDataCenter() {
return diskDataCenter;
}
创建交换机
- 此处的 autoDelete, arguments 其实并没有使⽤. 只是先预留出来. (RabbitMQ 是⽀持的) .
- 约定, 交换机/队列的名字, 都加上 VirtualHostName 作为前缀. 这样不同 VirtualHost 中就可以存在同名的交换机或者队列了.
- exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 “exchangeCreate”.
- 先写硬盘, 后写内存. 因为写硬盘失败概率更⼤. 如果硬盘写失败了, 也就不必写内存了.
//创建交换机:注意返回值是boolean
//exchangeDeclare 的语义是, 不存在就创建, 存在则直接返回. 因此不叫做 "exchangeCreate".
//
public boolean exchangeDeclare(String exchangeName, ExchangeType exchangeType, boolean durable, boolean autoDelete, Map<String,Object> arugments){
//根据上述我们对于交换机名字的约定,拼接出新的交换机名字:
exchangeName = virtualhostName+exchangeName;
//1,先判断交换机是否已经存在:
try {
//又有可能发生线程安全问题,所以要加锁:
synchronized (exchangeLocker){
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange != null){
//如果交换机已经存在
System.out.println("[VirtualHost] 交换机已经存在!!! exchageName:"+exchangeName);
return true;
}
//2,构造Exchange对象
exchange = new Exchange();
exchange.setName(exchangeName);
exchange.setType(exchangeType);
exchange.setDurable(durable);
exchange.setAutoDelete(autoDelete);
exchange.setArguments(arugments);
//3,将数据写入硬盘:
if(durable){
diskDataCenter.insertExchange(exchange);
}
//4,将数据写入内存:
memoryDataCenter.insertExchange(exchange);
}
System.out.println("[VirtualHost] 交换机创建成功!!! exchangeName:"+exchangeName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 交换机创建失败!!! exchangeName:"+exchangeName);
e.printStackTrace();
return false;
}
}
删除交换机
//删除交换机:
/**
* 删除交换机
* 先写硬盘, 后写内存. 写硬盘失败概率更⼤, 如果异常了, 也就不写内存了.
* 如果先删除了内存里的交换机数据,此时删除硬盘的时候抛出了一个异常,那就需要再将内存中的数据给删掉,所以还是先删除硬盘比较好
*
*
* @param exchangeName
* @return
*/
public boolean exchangeDelete(String exchangeName){
//1,更新名字:
exchangeName = virtualhostName+exchangeName;
try {
synchronized (exchangeLocker){
//2,找到对应的交换机:
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
//3,如果交换机不存在,则抛出异常:
if(exchange == null){
throw new MqException("[VirtualHost] 交换机不存在!!无法删除!!!exchangeName:"+exchangeName);
}
//4,先删除硬盘里的交换机数据:
if(exchange.isDurable()){
diskDataCenter.deleteExchange(exchangeName);
}
//5,再删除内存里的交换机数据:
memoryDataCenter.deleteExchange(exchangeName);
}
System.out.println("[VirtualHost] 删除交换机成功!!! exchangeName:"+exchangeName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 删除交换机失败!!! exchangeName:"+exchangeName);
e.printStackTrace();
return false;
}
}
创建队列
//创建队列(和创建交换机的思路差不多)
public boolean queueDeclare(String queueName,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments){
//1,拼接名字:
queueName = virtualhostName+queueName;
try {
synchronized (queueLocker){
//2,判断队列是否存在:
MESGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue != null){
System.out.println("[VirtualHost] 队列已经存在!!! queueName:"+queueName);
return true;
}
//3,创建队列:
queue = new MESGQueue();
queue.setName(queueName);
queue.setDurable(durable);
queue.setExclusive(exclusive);
queue.setAutoDelete(autoDelete);
queue.setArguments(arguments);
//4,写入硬盘:
if(durable){
diskDataCenter.insertMESGQueue(queue);
}
//5,写入内存:
memoryDataCenter.insertQueue(queue);
}
System.out.println("[VirtualHost] 创建队列成功!!!queueName:"+queueName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 创建队列失败!!! queueName:"+queueName);
e.printStackTrace();
return false;
}
}
删除队列
//删除队列(相似的代码)
public boolean queueDelete(String queueName){
//1,拼接名字
queueName = virtualhostName+queueName;
try {
synchronized (queueLocker){
//2,判断队列是否存在:
MESGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 队列不存在!!! 不可删除。 queueName:"+queueName);
}
//3,删除硬盘:
if(queue.isDurable()){
diskDataCenter.deleteMESGQueue(queueName);
}
//4,删除内促:
memoryDataCenter.deleteQueue(queueName);
}
System.out.println("[VirtualHost] 队列删除成功!!! queueName:"+queueName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 队列删除失败!!! queueName:"+queueName);
e.printStackTrace();
return false;
}
}
创建绑定
//创建绑定:
public boolean queueBind(String exchangeName,String queueName,String bindingKey){
//1,拼接真实的交换机名字,队列名字:
exchangeName = virtualhostName+exchangeName;
queueName = virtualhostName+queueName;
try {
//由于同时涉及到交换机与队列,所以都加上:
synchronized (exchangeLocker){
synchronized (queueLocker){
//2,判定binding是否存在:
Binding binding = memoryDataCenter.getBinding(exchangeName,queueName);
if(binding != null){
throw new MqException("[VirtualHost] 绑定已经存在!!! exchangeName="
+ exchangeName + ", queueName=" + queueName);
}
//3,校验绑定是否合法,使用Router类的checkBindingKeyValid方法,这个方法第一次出现,先创建一个框架,后面会细写:
if(!router.checkBindingKeyValid(bindingKey)){
throw new MqException("[VirtualHost] bindingKey ⾮法! bindingKey="
+ bindingKey);
}
//4,创建绑定
binding = new Binding();
binding.setExchangeName(exchangeName);
binding.setQueueName(queueName);
binding.setBindingKey(bindingKey);
//5,取出来交换机和队列,如果他们都持久化了,绑定也要持久化:
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[VirtualHost] 对应的交换机不存在! exchangeName=" +
exchangeName);
}
MESGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 对应的队列不存在! queueName=" +
queueName);
}
if(exchange.isDurable() && queue.isDurable()){
diskDataCenter.insertBinding(binding);
}
//6,写入内存:
memoryDataCenter.insertBinding(binding);
}
}
System.out.println("[VirtualHost] 创建绑定成功!!! exchangeName="
+ exchangeName + ", queueName=" + queueName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 创建绑定失败!!! exchangeName="
+ exchangeName + ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
删除绑定
//删除绑定:
public boolean queueUnbind(String exchangeName,String queueName){
//1,拼接名字:
exchangeName = virtualhostName+exchangeName;
queueName = virtualhostName+queueName;
try {
//注意和上一个加锁顺序一样,注意顺序,防止形成死锁。
synchronized (exchangeLocker){
synchronized (queueLocker){
//2,获取绑定:
Binding binding = memoryDataCenter.getBinding(exchangeName,queueName);
if(binding == null){
throw new MqException("[VirtualHost] 对应的绑定不存在!!!");
}
//直接删除就不需要验证bindingKey了。
//4,先删除硬盘里的绑定数据:
diskDataCenter.deleteBinding(binding);
//5,删除内存里的绑定数据:
memoryDataCenter.deleteBinding(binding);
}
}
/**
* 写到这一步,有一个问题:此时的删除绑定检查了交换机和队列是否存在,一个例子:如果要删除的绑定的交换机队列原本存在
* 但是我们先将队列删除,再删除绑定,此时判断的时候就会删除绑定失败,因为此时的队列已经不存在了,已经删了,那这不是
* 个问题吗?
*
* 解决方案1:仿造 Mysql的外键,在删除交换机或队列的时候先判断一下他们是否存在绑定,如果存在绑定,则禁止删除交换机/队列
* 要求先删除绑定,再尝试删除队列或交换机(优点:严谨;缺点:麻烦)
* 解决方案2:删除绑定的时候干脆不校验交换机或队列是否存在,直接尝试删除(优点:简单;缺点:不严谨)
*
* 我们选择第二种,因为:虽然直接删除的时候可能会发生,但是问题不大,有些操作本身就没有副作用。
* //3,获取对应的交换机和队列对象,看看他们是否持久化,从而确定有没有硬盘里的绑定数据要删掉:
* Exchange exchange = memoryDataCenter.getExchange(exchangeName);
* if(exchange == null){
* throw new MqException("[VirtualHost] 对应的交换机不存在! exchangeName=" +
* exchangeName);
* }
* MESGQueue queue = memoryDataCenter.getQueue(queueName);
* if(queue == null){
* throw new MqException("[VirtualHost] 对应的队列不存在! queueName=" +
* queueName);
* }
* //4,先删除硬盘里的绑定数据:
* if(exchange.isDurable() && queue.isDurable()){
* diskDataCenter.deleteBinding(binding);
* }
*/
System.out.println("[VirtualHost] 删除绑定成功!!! exchangeName="
+ exchangeName + ", queueName=" + queueName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 删除绑定失败!!! exchangeName="
+ exchangeName + ", queueName=" + queueName);
e.printStackTrace();
return false;
}
}
发布消息
- 发布消息其实是把消息发送给指定的 Exchange, 再根据 Exchange 和 Queue 的 Binding 关系, 转发到对应队列中.
- 发送消息需要指定 routingKey, 这个值的作⽤和 ExchangeType 是相关的.
- Direct: routingKey 就是对应队列的名字. 此时不需要 binding 关系, 也不需要 bindingKey, 就可以直接转发消息.
- Fanout: routingKey 不起作⽤, bindingKey 也不起作⽤. 此时消息会转发给绑定到该交换机上的所有队列中.
- Topic: routingKey 是⼀个特定的字符串, 会和 bindingKey 进⾏匹配. 如果匹配成功, 则发到对应的队列中. 具体规则后续介绍.
- BasicProperties 是消息的元信息. body 是消息本体.
- 此时就需要我们的router类里的checkRoutingKeyValid等方法了
//发布消息:
public boolean basicPublish(String exchangeName,String routingKey,BasicProperties basicProperties,byte[] body){
try{
//1,转换消息的名字;
exchangeName = virtualhostName + exchangeName;
//2,检查routingKey是否合法;
if(!router.checkRoutingKeyValid(routingKey)){
throw new MqException("[VirtualHost] routingKey不合法!!! routingKey:"+routingKey);
}
//3,找到交换机对象:
Exchange exchange = memoryDataCenter.getExchange(exchangeName);
if(exchange == null){
throw new MqException("[VirtualHost] 对应的交换机不存在!!! exchangName:"+exchangeName);
}
//分类转发:直接交换机,扇出交换机,话题交换机:
//直接交换机归为一类,其他交换机归为一类。
if(exchange.getType() == ExchangeType.DIRECT){
//如果是直接交换机,由于routingKey就是队列的名字,此时直接转发,无关乎binding(无论这个binding是否存在,都直接转发给相应队列)
//1,先拼接队列名字:
String queueName = virtualhostName+routingKey;
//2,依据queueName找到相应对象:
MESGQueue queue = memoryDataCenter.getQueue(queueName);
if(queue == null){
throw new MqException("[VirtualHost] 对应的队列不存在!!! queueName:"+queueName);
}
//3,创建消息对象:
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
//4,直接转发消息;
//想到之前写的代码,都是需要判断一下该消息是否要被持久化,所所以我们将发送消息这一步单独封装成一个独立的方法:之后在补充
sendMessage(queue,message);
}else{
//如果是扇出交换机或话题交换机:
//1,先找到该交换机对应的所有binding对象:
ConcurrentHashMap<String,Binding> bindings = memoryDataCenter.getBindings(exchangeName);
//2,遍历所有binding,进入消息转发逻辑:
for (Map.Entry<String,Binding> entry:bindings.entrySet()) {
//1,判定队列是否存在;
Binding binding = entry.getValue();
MESGQueue queue = memoryDataCenter.getQueue(binding.getQueueName());
if(queue == null){
throw new MqException("[VirtualHost] 队列不存在!!! queueName:"+queue.getName());
}
//2,创建消息:
Message message = Message.createMessageWithId(routingKey,basicProperties,body);
//3,判断是否能转发;这一步就是区别扇出交换机和话题交换机的重要一步。
if(!router.route(exchange.getType(),binding,message)){
continue;
}
//4,真正转发消息:
sendMessage(queue,message);
}
}
System.out.println("[VirtualHost] 消息发布成功!!!");
return true;
}catch(Exception e){
System.out.println("[VirtualHost] 发布消息失败!!!");
e.printStackTrace();
return false;
}
}
//这个方法单独拎出来写,
private void sendMessage(MESGQueue queue,Message message) throws IOException, MqException, InterruptedException {
int deliveryMod = message.getDeliverMode();
//1:不持久化 2:持久化;
//1,写入硬盘:
if(deliveryMod == 2){
diskDataCenter.sendMessage(queue,message);
}
//2,写入内存:
memoryDataCenter.sendMessage(queue,message);
// 这里其实还有一个操作:通知消费者去取消息, 这里先不写,等到写到消费者代码的时候再补充:
consumerManager.notifyConsume(queue.getName());
}
创建Router类
创建 mqserver.core.Router
public class Router {
//这个类先创建在这儿,之后再完善:
}
我们约定:
- ⽀持 * 和 # 两种通配符. (* # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,
- ⽐如 a.*.b 是合法的, a.*a.b 是不合法的).
- 其中 * 可以匹配任意⼀个单词.
- 其中 # 可以匹配任意零个或者多个单词
- bindingKey 为 a.*.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b
- bindingKey 为 a.#.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b和 a.aa.bb.b 和 a.b
- ⽐如 a.*.b 是合法的, a.*a.b 是不合法的).
- 更具体的规则描述都在代码里
实现checkBindingKeyValid方法
//检验bindingKey的合法性:
public boolean checkBindingKeyValid(String bindingKey){
/**
* ⽀持 * 和 # 两种通配符. (* # 只能作为 . 切分出来的独⽴部分, 不能和其他数字字⺟混⽤,
* ⽐如 a.*.b 是合法的, a.*a.b 是不合法的).
*
* 其中 * 可以匹配任意⼀个单词.
* 其中 # 可以匹配任意零个或者多个单词
* bindingKey 为 a.*.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b
* bindingKey 为 a.#.b, 可以匹配 routingKey 为 a.a.b 和 a.b.b 和 a.aaa.b 和 a.aa.bb.b 和 a.b
*/
// 1. 允许是空字符串
// 2. 数字字⺟下划线构成
// 3. 可以包含通配符
if(bindingKey.length() == 0) return true;
// 合法的routingKey 由数字字⺟下划线构成
for(int i =0;i<bindingKey.length();i++){
if(bindingKey.charAt(i)>='0'&& bindingKey.charAt(i)<='9'){
continue;
}
if(bindingKey.charAt(i)>='a'&& bindingKey.charAt(i)<='z'){
continue;
}
if(bindingKey.charAt(i)>='A'&& bindingKey.charAt(i)<='Z'){
continue;
}
if(bindingKey.charAt(i) == '_' || bindingKey.charAt(i) == '.'){
continue;
}
if(bindingKey.charAt(i) == '*' || bindingKey.charAt(i) == '#'){
continue;
}
return false;
}
// 再判定每个词的情况
// ⽐如 aaa.a*a 这种应该视为⾮法. (注意正则表达式:)
//非法:aaa.#*.ccc
String[] bindingWords = bindingKey.split("\\.");
for (String word:bindingWords) {
if(word.length() >1 &&(word.contains("*")||word.contains("#"))){
return false;
}
}
// 再判定相邻词的情况
// 4. # 不能连续出现.
// 5. # 和 * 不能相邻
for(int i =0;i<bindingWords.length -1;i++){
if(bindingWords[i].equals("#")&&bindingWords[i+1].equals("#")){
return false;
}
if(bindingWords[i].equals("#")&&bindingWords[i+1].equals("*")){
return false;
}
if(bindingWords[i].equals("*")&&bindingWords[i+1].equals("#")){
return false;
}
}
return true;
}
实现checkRoutingKeyValid方法
//检验routingKey的合法性:
public boolean checkRoutingKeyValid(String routingKey){
if(routingKey.length() == 0){
return true;
}
// 合法的routingKey 由数字字⺟下划线构成
for(int i =0;i<routingKey.length();i++){
if(routingKey.charAt(i)>='0'&&routingKey.charAt(i)<='9'){
continue;
}
if(routingKey.charAt(i)>='a'&&routingKey.charAt(i)<='z'){
continue;
}
if(routingKey.charAt(i)>='A'&&routingKey.charAt(i)<='Z'){
continue;
}
if(routingKey.charAt(i) == '_' || routingKey.charAt(i) == '.'){
continue;
}
return false;
}
return true;
}
实现routeTopic方法
//校验bindingKey和routingKey是否匹配:
//要想完善这个方法就需要先将checkRoutingKeyValid和checkBindingKeyValid检验合法性给写了。
//
private boolean routeTopic(Binding binding, Message message) throws MqException {
//注意正则表达式:
String[] bindingKeyWords = binding.getBindingKey().split("\\.");
String[] routingKeyWords = message.getRoutingKey().split("\\.");
// 使⽤双指针的⽅式来实现匹配
int bindingIndex =0;
int routingIndex =0;
while(bindingIndex<bindingKeyWords.length && routingIndex< routingKeyWords.length){
// 1. 如果是 * , 直接进⼊下⼀轮
if(bindingKeyWords[bindingIndex].equals("*")){
bindingIndex++;
routingIndex++;
}
// 2. 如果 # 没有下⼀个位置, 则直接返回 true
else if(bindingKeyWords[bindingIndex].equals("#") && bindingIndex +1== bindingKeyWords.length){
return true;
}
// 3. 如果遇到 # , 则找到 # 下⼀个位置的 token 在 routingKey 中的位置.
else if(bindingKeyWords[bindingIndex].equals("#")){
bindingIndex++;
int tmp = findIndex(bindingKeyWords[bindingIndex],routingIndex,routingKeyWords);
if(tmp != -1){
bindingIndex++;
routingIndex = tmp +1;
}else{
return false;
}
}
// 4. 如果是普通字符, 直接匹配内容是否相等, 不相等则返回 false, 相等直接进⼊下⼀轮
else if(bindingKeyWords[bindingIndex].equals(routingKeyWords[routingIndex])){
bindingIndex++;
routingIndex++;
}else{
return false;
}
// 5. 如果能找到对应的位置了, 就可以继续匹配. 如果找不到, 就返回 false
}
// 6. 循环结束后, 检查看两个下标是否同时到达末尾. 是则匹配成功, 否则匹配失败.
if(bindingIndex == bindingKeyWords.length && routingIndex == routingKeyWords.length){
return true;
}
return false;
}
//寻找经过#配对以后 在routingKey里的下标
private int findIndex(String bindingKeyWord, int routingIndex, String[] routingKeyWords) {
for(int i =routingIndex;i<routingKeyWords.length;i++){
String ch = routingKeyWords[i];
if(bindingKeyWord.equals(ch)){
return i;
}
}
return -1;
}
实现route方法
//
public boolean route(ExchangeType exchangeType,Binding binding,Message message) throws MqException {
if(exchangeType == ExchangeType.FANOUT){
//如果是fanout类型,直接转发;
return true;
} else if (exchangeType == ExchangeType.TOPIC) {
return routeTopic(binding,message);
}else{
throw new MqException("[Router] 未知的交换机类型!!! exchangeType:"+exchangeType);
}
}
测试 Router
@SpringBootTest
public class RouterTests {
private Router router = new Router();
private Binding binding = null;
private Message message = null;
@BeforeEach
public void setUp(){
binding = new Binding();
message = new Message();
}
@AfterEach
public void tearDown(){
binding = null;
message = null;
}
/**
* // [测试⽤例]
* // binding key routing key result
* // aaa aaa true
* // aaa.bbb aaa.bbb true
* // aaa.bbb aaa.bbb.ccc false
* // aaa.bbb aaa.ccc false
* // aaa.bbb.ccc aaa.bbb.ccc true
* // aaa.* aaa.bbb true
* // aaa.*.bbb aaa.bbb.ccc false
* // *.aaa.bbb aaa.bbb false
* // # aaa.bbb.ccc true
* // aaa.# aaa.bbb true
* // aaa.# aaa.bbb.ccc true
* // aaa.#.ccc aaa.ccc true
* // aaa.#.ccc aaa.bbb.ccc true
* // aaa.#.ccc aaa.aaa.bbb.ccc true
* // #.ccc ccc true
* // #.ccc aaa.bbb.ccc true
*/
@Test
public void testRouter() throws MqException {
//先测试一下,合法性:
Assertions.assertTrue(router.checkBindingKeyValid("aaa.bbb.#.fefe_.fef"));
Assertions.assertFalse(router.checkBindingKeyValid("aaa.ddfdfe.*#.fef"));
Assertions.assertTrue(router.checkRoutingKeyValid("aaa.bbb._jeiie.oeiofaa.eeffe"));
Assertions.assertFalse(router.checkRoutingKeyValid("ejfoe..fef.!!"));
//再测试route方法:
binding.setBindingKey("aaa");
message.setRoutingKey("aaa");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.bbb");
message.setRoutingKey("aaa.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.bbb.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.*");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.*.bbb");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("*.aaa.bbb");
message.setRoutingKey("aaa.bbb");
Assertions.assertFalse(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.#");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("aaa.#.ccc");
message.setRoutingKey("aaa.aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("#.ccc");
message.setRoutingKey("ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
binding.setBindingKey("#.ccc");
message.setRoutingKey("aaa.bbb.ccc");
Assertions.assertTrue(router.route(ExchangeType.TOPIC, binding, message));
System.out.println("[RouterTests] Router测试成功!!!");
}
}
订阅消息
-
在basicConsume里用到了Consumer ,表示一个消费者的回调。回调的意思简单来说就是回调函数。就像A买了B一样东西,然后A把钱给B。这件事。回调就相当于A把钱给B。就是A拿了东西后不能直接走人,此时B需要告诉A“你需要付钱”,然后A将钱给B。这是一个简单的比方。
-
此方法又用到了consumerManager,之后会创建
//订阅消息;
//添加一个订阅者:
public boolean basicConsume(String consumerTag, String queueName, boolean autoAck, Consumer consumer){
queueName = virtualhostName+queueName;
try {
consumerManager.addConsumer(consumerTag,queueName,autoAck,consumer);
System.out.println("[VirtualHost] basicConsume 成功! queueName:" +
queueName);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] basicConsume失败!!! queueName:"+queueName);
e.printStackTrace();
return false;
}
}
创建Consumer
Consumer 相当于⼀个回调函数.在 common.Consumer 中.
这里的异常肯定不是一开始就有的,是将后面的代码完善了,自然就会提醒你加上该加的异常,所以写代码啊真是要一步一步来不要急功近利。
@FunctionalInterface
public interface Consumer {
/**
*
* @param consumerTag // consumerTag 消费者标识, 后⾯使⽤ channelId 填充
* @param basicProperties
* @param body
*/
void handleDelivery(String consumerTag, BasicProperties basicProperties,byte[] body) throws IOException, MqException;
}
创建ConsumerManager类
消费者管理类
创建mqserver.core.ConsumerManager
- parent ⽤来记录虚拟主机.
- 使⽤⼀个阻塞队列⽤来触发消息消费. 称为令牌队列. 每次有消息过来了, 都往队列中放⼀个令牌(也就是队列名), 然后消费者再去消费对应队列的消息.
- 使⽤⼀个线程池⽤来执⾏消息回调。这样令牌队列的设定避免搞出来太多线程. 否则就需要给每个队列都安排⼀个单独的线程了, 如果队列很多则开销就⽐较⼤了
- 其实RabbiteMq这样设计很巧妙的,使用单独一个线程去扫描阻塞队列,如果有消息来了,就将消息加入对应队列同时将该队列的名字加入阻塞队列,由于阻塞队列一直在扫描着,所以一看到有令牌(队列的名字)来了,就调用consumeMessage让这个队列记录的消费者将消息消费掉。如果是消费者来了,就调用addConsumer将该消费者添加到对应队列的记录消费者的链表里。太nb了。
/**
* 创建消费者 管理管理类
*/
public class ConsumerManager {
private VirtualHost parent;
//存放令牌的队列,通过令牌来触发消费线程的消费操作(使用的是阻塞队列BlockingQueue)
//token :令牌
//BlockingQueue :java内置的阻塞队列,主要用于:如果有多个消费者同时来消费消息,请问你是先将
private BlockingQueue<String> tokenQueue = new LinkedBlockingQueue<>();
//线程池:将线程个数初始化成4个,这个如果不合适可以以后再调。
private ExecutorService workerPool = Executors.newFixedThreadPool(4);
}
添加令牌接⼝
//添加令牌接口:
public void notifyConsume(String queueName) throws InterruptedException {
tokenQueue.put(queueName);
}
创建ConsumerEnv类(这个类表示⼀个订阅者的执⾏环境)
在common.ConsumerEnv
/**
* 这个类表⽰⼀个消费者的执⾏环境.
*/
public class ConsumerEnv {
private String consumerTag;
private String queueName;
private boolean autoAck;
private Consumer consumer;
public ConsumerEnv(String consumerTag, String queueName, boolean autoAck, Consumer consumer) {
this.consumerTag = consumerTag;
this.queueName = queueName;
this.autoAck = autoAck;
this.consumer = consumer;
}
public String getConsumerTag() {
return consumerTag;
}
public void setConsumerTag(String consumerTag) {
this.consumerTag = consumerTag;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public boolean isAutoAck() {
return autoAck;
}
public void setAutoAck(boolean autoAck) {
this.autoAck = autoAck;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
}
给 MsgQueue 添加⼀个订阅者列表.
//新增一个消费者链表,用来记录该队列有几个消费者访问:
private List<ConsumerEnv> consumerEnvList = new ArrayList<>();
//轮询序号:考虑线程安全问题使用AtomicInteger
private AtomicInteger atomicInteger = new AtomicInteger(0);
public void addConsumerEnv(ConsumerEnv consumerEnv){
consumerEnvList.add(consumerEnv);
}
public ConsumerEnv chooseConsumer(){
if(consumerEnvList.size() ==0){
return null;
}
int n = atomicInteger.get() % consumerEnvList.size();
//自增1
atomicInteger.getAndIncrement();
return consumerEnvList.get(n);
}
实现添加订阅者
- 新来订阅者的时候, 需要先消费掉之前积压的消息.
- consumeMessage 真正的消息消费操作, ⼀会再实现.
public void addConsumer(String consumerTag, String queueName, boolean autoAck, Consumer consumer) throws MqException {
//先消费已经积压的消息:
MESGQueue msgQueue = parent.getMemoryDataCenter().getQueue(queueName);
if(msgQueue == null){
throw new MqException("[ConsumerManager] 队列不存在! queueName:" +
queueName);
}
ConsumerEnv consumerEnv = new ConsumerEnv(consumerTag,queueName,autoAck,consumer);
synchronized (msgQueue){
msgQueue.addConsumerEnv(consumerEnv);
// 把已经积压的 n 个数据都先消费掉
int n = parent.getMemoryDataCenter().countMessages(queueName);
for(int i =0;i<n;i++){
consumeMessage(msgQueue);
}
}
}
实现扫描线程
public ConsumerManager(VirtualHost virtualHost){
parent = virtualHost;
// 启动扫描线程
Thread scanThread = new Thread(()->{
while (true){
try {
//1,拿到令牌:
//关于阻塞队列里的take方法:
// 如果队列中有元素,take() 方法会取出并移除队列头部的元素,并返回该元素。
// 如果队列中没有元素,take() 方法会阻塞当前线程,直到队列中有元素可用为止。它不会返回任何值,也不会抛出异常,而是等待直到有元素被添加到队列中。
String queueName = tokenQueue.take();
//2,找到队列;
MESGQueue queue = virtualHost.getMemoryDataCenter().getQueue(queueName);
if(queue == null){
throw new MqException("[ConsumerManager] 队列不存在!!!queueName:" + queueName);
}
//消费一个消息:
synchronized (queue){
consumeMessage(queue);
}
}catch (Exception e){
e.printStackTrace();
}
}
});
//把线程设置为后台线程:
//前台线程和后台线程的区别:前台线程不会随着进程的结束而结束,相反,进程会一直等待前台线程
//后台线程会随着进程的结束,直接结束,不会影响进程。
scanThread.setDaemon(true);
scanThread.start();
}
实现消费消息
所谓的消费消息, 其实就是调⽤消息的回调. 并把消息删除掉.
如果不是很理解,可以往后看,看到网络通信里也会有关于回调的,看到那里就理解了。
private void consumeMessage(MESGQueue msgQueue) throws MqException {
//按照轮询方式先找一个消费者出来
ConsumerEnv luckDog = msgQueue.chooseConsumer();
if(luckDog == null){
//如果还没有消费者,就先不消费:
return ;
}
//如果有消费者,则从指定队列中取出一个元素:
Message message = parent.getMemoryDataCenter().pollMessage(msgQueue.getName());
if(message == null) {
return ;
}
System.out.println("[ConsumerManager] 消息被成功消费! queueName=" +
msgQueue.getName() + ", messageId=" + message.getMessageId());
// 3. 丢到线程池中⼲活. 回调执⾏时间可能⽐较⻓. 不适合让扫描线程去调⽤.
workerPool.submit(() ->{
try{
// 1. 先把消息放到待确认队列中
parent.getMemoryDataCenter().addMessageWaitAck(msgQueue.getName(),message);
// 2. 调⽤消费者的回调
luckDog.getConsumer().handleDelivery(luckDog.getConsumerTag(), message.getBasicProperties(),
message.getBody());
// 3. 如果消息是⾃动确认, 则可以直接把消息彻底删除了.
// (这个逻辑必须放到执⾏回调后⾯. 万⼀执⾏回调⼀半服务器崩溃, 这个消息仍然存在于硬盘上,
// 下次启动还可以被继续消费到)
if(luckDog.isAutoAck()){
if(message.getDeliverMode() == 2){
parent.getDiskDataCenter().deleteMessage(msgQueue,message);
}
parent.getMemoryDataCenter().deleteMessage(message.getMessageId());
parent.getMemoryDataCenter().deleteMessageWaitAck(msgQueue.getName(), message.getMessageId());
}
//如果不是自动确认,在消费者本体,会继承Consumer,重写方法,然后在内部应答
}catch (Exception e){
e.printStackTrace();
}
});
}
消息确认
//下列⽅法只是⼿动应答的时候才会使⽤.
public boolean basicAck(String queueName,String messageId){
queueName = virtualhostName+queueName;
try {
// 删除硬盘上的数据
MESGQueue queue = memoryDataCenter.getQueue(queueName);
Message message = memoryDataCenter.getMessage(messageId);
if(message.getDeliverMode() == 2){
diskDataCenter.deleteMessage(queue,message);
}
// 删除内存中的数据
memoryDataCenter.deleteMessage(messageId);
// 删除待 ack 队列中的数据
memoryDataCenter.deleteMessageWaitAck(queueName,messageId);
System.out.println("[VirtualHost] 手动Ack 成功! queueName:" +
queueName + ", messageId:" + messageId);
return true;
}catch (Exception e){
System.out.println("[VirtualHost] 手动Ack 失败! queueName:" +
queueName + ", messageId:" + messageId);
e.printStackTrace();
return false;
}
}
测试 VirtualHost
- 操作数据库, 需要先启动 Spring 服务.
- 同时, 需要先关闭 Spring 服务, 才能删除数据库⽂件
- 使⽤ FileUtils.deleteDirector 递归的删除⽬录中的内容. 这个是 Spring ⾃带的类org.apache.tomcat.util.http.fileupload.FileUtils
创建VirtualHostTests类
@SpringBootTest
public class VirtualHostTests {
private VirtualHost virtualHost;
@BeforeEach
public void setUp(){
//如果用到了关于数据库的操作,就会使用Mybatis,此时就需要启动sping服务:
MqApplication.context = SpringApplication.run(MqApplication.class);
virtualHost = new VirtualHost("default_");
}
@AfterEach
public void tearDown() throws IOException {
MqApplication.context.close();
//将文件都删了:
File file = new File("./data/meta.db");
file.delete();
File file1 = new File("./data");
//使用spring提供的类FileUtils递归的删除data目录
FileUtils.deleteDirectory(file1);
virtualHost = null;
}
//测试exchangeDeclare
@Test
public void testExchangeDeclare(){
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
System.out.println("testExchangeDeclare 测试成功!!!");
}
//测试testExchangeDelete
@Test
public void testExchangeDelete(){
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
boolean ok1 = virtualHost.exchangeDelete("testExchange");
Assertions.assertTrue(ok1);
System.out.println("testExchangeDelete 测试成功!!!");
}
@Test
public void testQueueDeclare(){
boolean ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
System.out.println("testQueueDeclare 测试成功!!!");
}
@Test
public void testQueueDelete(){
boolean ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDelete("testQueue");
Assertions.assertTrue(ok);
System.out.println("testQueueDelete 测试成功!!!");
}
@Test
public void testQueueBind(){
//先创建交换机和队列以便使用:
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue","hello");
Assertions.assertTrue(ok);
System.out.println("testQueueBind 测试成功!!!");
}
@Test
public void testQueueUnbind(){
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue","hello");
Assertions.assertTrue(ok);
ok = virtualHost.queueUnbind("testExchange","testQueue");
Assertions.assertTrue(ok);
System.out.println("testQueueUnbind 测试成功!!!");
}
//测试直接交换机
/**
* 对于直接交换机,他的routingKey就是队列名字,此时不需要任何binding就能直接将消息发布给对应的队列
* 这也是我们此处不用创建binding的原因。
* @throws MqException
*/
@Test
public void testBasicPublishDirect() throws MqException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
//如果是直接交换机,这个routingKey就是消息队列的名字:
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
Assertions.assertTrue(ok);
Message message = virtualHost.getMemoryDataCenter().pollMessage(virtualHost.getVirtualhostName()+"testQueue");
Assertions.assertArrayEquals("hello".getBytes(),message.getBody());
System.out.println("testBasicPublish 测试成功!!!");
}
//测试fanout交换机:
@Test
public void testBasicPublishFanout() throws MqException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2",false,false,false,null);
Assertions.assertTrue(ok);
//创建绑定:由于是fanout交换机,所以和bindingKey无关
ok = virtualHost.queueBind("testExchange","testQueue","123");
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue2","123");
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","1111",null,"hello".getBytes());
Assertions.assertTrue(ok);
Message message = virtualHost.getMemoryDataCenter().pollMessage(virtualHost.getVirtualhostName()+"testQueue");
Assertions.assertArrayEquals("hello".getBytes(),message.getBody());
Message message2 = virtualHost.getMemoryDataCenter().pollMessage(virtualHost.getVirtualhostName()+"testQueue2");
Assertions.assertArrayEquals("hello".getBytes(),message2.getBody());
System.out.println("testBasicPublishFanout 测试成功!!!");
}
//测试Topic交换机:
@Test
public void testBasicPublishTopic() throws MqException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
//创建绑定:由于是topic交换机,所以就要注意bindingKey和routingKey
ok = virtualHost.queueBind("testExchange","testQueue","123.*.www");
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","123.22.www",null,"hello".getBytes());
Assertions.assertTrue(ok);
Message message = virtualHost.getMemoryDataCenter().pollMessage(virtualHost.getVirtualhostName()+"testQueue");
Assertions.assertArrayEquals("hello".getBytes(),message.getBody());
System.out.println("MessageRoutingKey:"+message.getRoutingKey());
System.out.println("testBasicPublishTopic 测试成功!!!");
}
//测试basicConsume
//先发送消息,后订阅:topic交换机
@Test
public void testBasicConsumeTopic() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",true,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue","123.*.www");
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","123.22.www",null,"hello".getBytes());
Assertions.assertTrue(ok);
//这里我们先使用匿名内部类:
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) {
System.out.println("messageId:"+basicProperties.getMessageId());
Assertions.assertEquals("123.22.www",basicProperties.getRoutingKey());
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
}
});
Assertions.assertTrue(ok);
//因为我们前面写过tearDown结束程序,如果时间不够这个消费应答还没有执行完就直接结束了的话会出错的
//所以我们多等待一些时间让消费者执行完。
Thread.sleep(500);
System.out.println("testBasicPublishTopic 测试成功!!!");
}
//先订阅再发消息:topic交换机
@Test
public void testBasicConsumeTopic2() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.TOPIC,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",true,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue","123.*.www");
Assertions.assertTrue(ok);
//这里我们使用一下lamda表达式,其实lamda表达式的本质也就是匿名内部类(lamda表达式()里的参数可以带类型也可以不带)
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true,
(consumerTag,basicProperties,body) ->{
System.out.println("messageId:"+basicProperties.getMessageId());
Assertions.assertEquals("123.22.www",basicProperties.getRoutingKey());
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
});
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","123.22.www",null,"hello".getBytes());
Assertions.assertTrue(ok);
Thread.sleep(500);
System.out.println("testBasicConsumeTopic2 测试成功!!!");
}
//先发消息再订阅,direct,由于先订阅再发消息就是将代码的位置换一换,这里就不多余测试了,就测试一个先发消息再订阅
@Test
public void testBasicConsumeDirect() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",true,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
Assertions.assertTrue(ok);
//这里我们使用一下lamda表达式,其实lamda表达式的本质也就是匿名内部类(lamda表达式()里的参数可以带类型也可以不带)
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true,
(String consumerTag, BasicProperties basicProperties, byte[] body) ->{
System.out.println("messageId:"+basicProperties.getMessageId());
Assertions.assertEquals("testQueue",basicProperties.getRoutingKey());
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
});
Assertions.assertTrue(ok);
Thread.sleep(500);
System.out.println("testBasicConsumeDirect 测试成功!!!");
}
//先发消息再订阅:fanout交换机
@Test
public void testBasicConsumeFanout() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.FANOUT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue2",false,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue","123");
Assertions.assertTrue(ok);
ok = virtualHost.queueBind("testExchange","testQueue2","123");
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","1111",null,"hello".getBytes());
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true,
(consumerTag,basicProperties,body) ->{
System.out.println("1111messageId:"+basicProperties.getMessageId());
Assertions.assertEquals("1111",basicProperties.getRoutingKey());
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
});
Assertions.assertTrue(ok);
ok = virtualHost.basicConsume("testConsumerTag", "testQueue2", true,
(consumerTag,basicProperties,body) ->{
System.out.println("messageId:"+basicProperties.getMessageId());
// if (!"111".equals(basicProperties.getRoutingKey())) {
// throw new RuntimeException("Routing key assertion failed");
// }
//这里我故意将routingKey写成 111
//由于这是多线程执行assertions可能会执行出错,但是由于打印的信息多,可能就把错误信息给
//掩盖了,但是我很好奇,为啥抛出了异常,那个出现异常的线程还能执行下去??我又catch了一下
//确实有报错啊,真是摸不着头脑
//原因:还记得这是一个多线程环境吗,况且这是一个回调函数内部,对于回调函数我们是怎么处理的
//在ConsumerManager里,先将消费者加入对应队列的消费者管理链表里,经过一个单独线程的扫描
//会发现这个消费者,并且让他消费消息,在消费消息这个函数里,会有一个线程池专门用来执行这个
//回调函数,因此如果某个回调函数报错了,也只是那个子线程停止了,并不会影响其他线程。
//这里的情况就相当于:在那个子线程里执行了这个报错断言,导致子线程停止,打印的报错数据也被覆盖了
//因此,这里的代码不会停止,会继续执行到结束。
try {
Assertions.assertEquals("111",basicProperties.getRoutingKey());
}catch (AssertionError e){
e.printStackTrace();
}
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
});
Assertions.assertTrue(ok);
Thread.sleep(500);
System.out.println("testBasicConsumeFanout 测试成功!!!");
}
@Test
public void testBasicAck() throws InterruptedException {
boolean ok = virtualHost.exchangeDeclare("testExchange", ExchangeType.DIRECT,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.queueDeclare("testQueue",true,false,false,null);
Assertions.assertTrue(ok);
ok = virtualHost.basicPublish("testExchange","testQueue",null,"hello".getBytes());
Assertions.assertTrue(ok);
//在这里我们将autoAck设置成false,手动应答:
ok = virtualHost.basicConsume("testConsumerTag", "testQueue", true,
(String consumerTag, BasicProperties basicProperties, byte[] body) ->{
System.out.println("messageId:"+basicProperties.getMessageId());
Assertions.assertEquals("testQueue",basicProperties.getRoutingKey());
Assertions.assertEquals(1,basicProperties.getDeliverMode());
Assertions.assertArrayEquals("hello".getBytes(),body);
// ⼿动调⽤ ack
boolean ok2 = virtualHost.basicAck("testQueue",basicProperties.getMessageId());
Assertions.assertTrue(ok2);
});
Assertions.assertTrue(ok);
Thread.sleep(500);
System.out.println("testBasicAck 测试成功!!!");
}
}