当前位置: 首页 > article >正文

Java项目--仿RabbitMQ的消息队列--内存数据管理

目录

一、引言

二、MemoryDataCenter

1.设计数据结构

2.封装Exchange方法

3.封装MsgQueue方法

4.封装Binding方法

5.封装Message

6.实现待确定消息的管理

7.将数据从硬盘上恢复到内存中

三、测试MemoryDataCenter

1.准备工作

2.测试交换机

3.测试队列

4.测试绑定

5.测试消息

6.测试发送消息

7.测试待确认消息

8.测试从硬盘上读取消息到内存

四、总结


一、引言

  上一篇文章介绍了统一硬盘处理的操作,这一篇文章我们就简单介绍一下数据在内存里面的管理。

二、MemoryDataCenter

1.设计数据结构

// 此处为了线程安全,我们使用ConcurrentHashMap这样的数据结构
    private ConcurrentHashMap<String,Exchange> exchangeMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String,MsgQueue> queueMap = new ConcurrentHashMap<>();

    // 绑定:使用嵌套的HashMap,key是exchangeName,value也是一个HashMap(key是queueName,value是Binding对象)
    private ConcurrentHashMap<String,ConcurrentHashMap<String, Binding>> bindingMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String, Message> messageMap = new ConcurrentHashMap<>();

    private ConcurrentHashMap<String, LinkedList<Message>> queueMessageMap = new ConcurrentHashMap<>();

    // 表示“未被确认”的消息:使用嵌套的HashMap,key是queueName,value是HashMap(key是messageId,value是Message对象)
    // 存储当前队列中哪些消息被消费者取走,但是还没有应答
    private ConcurrentHashMap<String,ConcurrentHashMap<String,Message>> queueMessageWaitAckMap = new ConcurrentHashMap<>();

2.封装Exchange方法

/*
    封装交换机
     */
    public void insertExchange(Exchange exchange){
        exchangeMap.put(exchange.getName(),exchange);
        System.out.println("[MemoryDataCenter] 交换机添加成功!exchangeName="+exchange.getName());
    }

    public Exchange getExchange(String exchangeName){
        return exchangeMap.get(exchangeName);
    }

    public void deleteExchange(String exchangeName){
        exchangeMap.remove(exchangeName);
        System.out.println("[MemoryDataCenter] 交换机删除成功!exchangeName="+exchangeName);
    }

3.封装MsgQueue方法

/*
    封装队列
     */
    public void insertQueue(MsgQueue queue){
        queueMap.put(queue.getName(),queue);
        System.out.println("[MemoryDataCenter] 队列添加成功!queueName="+queue.getName());
    }

    public MsgQueue getQueue(String queueName){
        return queueMap.get(queueName);
    }

    public void deleteQueue(String queueName){
        queueMap.remove(queueName);
        System.out.println("[MemoryDataCenter] 队列删除成功!queueName="+queueName);
    }

4.封装Binding方法

/*
    封装绑定
     */
    public void insertBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName(),
                                                        k -> new ConcurrentHashMap<>());
        synchronized (bindingMap){
            if(bindingMap.get(binding.getQueueName())!=null){
                throw new MqException("[MemoryDataCenter] 绑定已经存在!exchangeName="+binding.getExchangeName()
                        +",queueName="+binding.getQueueName());
            }
            bindingMap.put(binding.getQueueName(),binding);
        }
        System.out.println("[MemoryDataCenter] 绑定添加成功!exchangeName="+binding.getExchangeName()
                +",queueName="+ binding.getQueueName());
    }

    public Binding getBinding(String exchangeName,String queueName){
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(exchangeName);
        if(bindingMap==null){
            return null;
        }
        return bindingMap.get(queueName);
    }

    public ConcurrentHashMap<String,Binding> getBindings(String exchangeName){
        return bindingsMap.get(exchangeName);
    }

    public void deleteBinding(Binding binding) throws MqException {
        ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.get(binding.getExchangeName());
        if(bindingMap==null){
            throw new MqException("[MemoryDataCenter] 绑定不存在!exchangeName="+binding.getExchangeName()
                    +",queueName="+binding.getQueueName());
        }
        bindingMap.remove(binding.getQueueName());
        System.out.println("[MemoryDataCenter] 绑定删除成功!exchangeName="+binding.getExchangeName()
                +",queueName="+binding.getQueueName());
    }

5.封装Message

/*
    封装消息
     */
    public void addMessage(Message message){
        messageMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 新消息添加成功!messageId="+message.getMessageId());
    }

    public Message getMessage(String messageId){
        return messageMap.get(messageId);
    }

    public void removeMessage(String messageId){
        messageMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息删除成功!messageId="+messageId);
    }

    public void sendMessage(MsgQueue queue,Message message){
        LinkedList<Message> messages = queueMessageMap.computeIfAbsent(queue.getName(),k->new LinkedList<>());
        synchronized (messages){
            messages.add(message);
        }
        addMessage(message);
        System.out.println("[MemoryDataCenter] 消息投递到队列成功!messageId="+message.getMessageId());
    }

    public Message pollMessage(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null){
            return null;
        }
        synchronized (messages){
            if(messages.size()==0){
                return null;
            }
            Message currentMessage = messages.remove(0);
            System.out.println("[MemoryDataCenter] 消息从队列中取出!messageId="+currentMessage.getMessageId());
            return currentMessage;
        }
    }

    public int getMessageCount(String queueName){
        LinkedList<Message> messages = queueMessageMap.get(queueName);
        if(messages==null){
            return 0;
        }
        synchronized (messages){
            return messages.size();
        }
    }

6.实现待确定消息的管理

/*
    实现待确定消息的管理
     */
    public void addMessageWaitAck(String queueName,Message message){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.computeIfAbsent(queueName
                                    ,k->new ConcurrentHashMap<>());
        messageHashMap.put(message.getMessageId(),message);
        System.out.println("[MemoryDataCenter] 消息进入待确认队列!queueName="+queueName);
    }

    public void removeMessageWaitAck(String queueName,String messageId){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap==null){
            return;
        }
        messageHashMap.remove(messageId);
        System.out.println("[MemoryDataCenter] 消息从待确认队列中删除!messageId="+messageId);
    }

    public Message getMessageWaitAck(String queueName,String messageId){
        ConcurrentHashMap<String,Message> messageHashMap = queueMessageWaitAckMap.get(queueName);
        if(messageHashMap==null){
            return null;
        }
        return messageHashMap.get(messageId);
    }

7.将数据从硬盘上恢复到内存中

public void recovery(DiskDataCenter diskDataCenter) throws IOException, MqException, ClassNotFoundException {
        // 1.清空所有哈希表
        exchangeMap.clear();
        queueMap.clear();
        bindingsMap.clear();
        messageMap.clear();
        queueMessageMap.clear();
        // 2.恢复所有交换机数据
        List<Exchange> exchanges = diskDataCenter.selectAllExchanges();
        for(Exchange exchange:exchanges){
            exchangeMap.put(exchange.getName(),exchange);
        }
        // 3.恢复所有队列数据
        List<MsgQueue> queues = diskDataCenter.selectAllQueues();
        for(MsgQueue queue:queues){
            queueMap.put(queue.getName(),queue);
        }
        // 4.恢复所有绑定数据
        List<Binding> bindings = diskDataCenter.selectAllBindings();
        for(Binding binding:bindings){
            ConcurrentHashMap<String,Binding> bindingMap = bindingsMap.computeIfAbsent(binding.getExchangeName()
                                                            ,k-> new ConcurrentHashMap<>());
            bindingMap.put(binding.getQueueName(),binding);
        }
        // 5.恢复所有消息数据
        for(MsgQueue queue:queues){
            LinkedList<Message> messages = diskDataCenter.loadAllMessageFromQueue(queue.getName());
            queueMessageMap.put(queue.getName(),messages);
            for(Message message:messages){
                messageMap.put(message.getMessageId(),message);
            }
        }
    }

三、测试MemoryDataCenter

1.准备工作

@SpringBootTest
public class MemoryDataCenterTests {
    private MemoryDataCenter memoryDataCenter = null;

    @BeforeEach
    public void setUp(){
        memoryDataCenter = new MemoryDataCenter();
    }

    @AfterEach
    public void tearDown(){
        memoryDataCenter = null;
    }

    private Exchange createTestExchange(String exchangeName){
        Exchange exchange = new Exchange();
        exchange.setName(exchangeName);
        exchange.setType(ExchangeType.DIRECT);
        exchange.setDurable(true);
        exchange.setAutoDelete(false);
        return exchange;
    }

    private MsgQueue createTestQueue(String queueName){
        MsgQueue queue = new MsgQueue();
        queue.setName(queueName);
        queue.setDurable(true);
        queue.setExclusive(false);
        queue.setAutoDelete(false);
        return queue;
    }
    
    private Message createTestMessage(String content){
        Message message =       Message.createMessageWithId("testRoutingKey",null,content.getBytes());
        return message;
    }


    
}

2.测试交换机

@Test
    public void testExchange(){
        Exchange expectedExchange = createTestExchange("testExchange");
        memoryDataCenter.insertExchange(expectedExchange);

        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange,actualExchange);
        memoryDataCenter.deleteExchange("testExchange");
        actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertNull(actualExchange);
    }

3.测试队列

@Test
    public void testQueue(){
        MsgQueue expectedQueue = createTestQueue("testQueue");
        memoryDataCenter.insertQueue(expectedQueue);

        MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue,actualQueue);
        memoryDataCenter.deleteQueue("testQueue");
        actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertNull(actualQueue);
    }

4.测试绑定

@Test
    public void testBinding() throws MqException {
        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        memoryDataCenter.insertBinding(expectedBinding);

        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding,actualBinding);

        ConcurrentHashMap<String,Binding> bindingMap = memoryDataCenter.getBindings("testExchange");
        Assertions.assertEquals(1,bindingMap.size());
        Assertions.assertEquals(expectedBinding,bindingMap.get("testQueue"));

        memoryDataCenter.deleteBinding(expectedBinding);
        actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertNull(actualBinding);
    }

5.测试消息

@Test
    public void testMessage(){
        Message expectedMessage = createTestMessage("testMessgae");
        memoryDataCenter.addMessage(expectedMessage);
        Message actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage,actualMessage);
        memoryDataCenter.removeMessage(expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessage(expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

6.测试发送消息

 @Test
    public void testSendMessage(){
        MsgQueue queue = createTestQueue("testQueue");
        List<Message> expectedMessages = new ArrayList<>();
        for(int i=0;i<10;i++){
            Message message = createTestMessage("testMessage"+i);
            memoryDataCenter.sendMessage(queue,message);
            expectedMessages.add(message);
        }
        List<Message> actualMessage = new ArrayList<>();
        while(true){
            Message message = memoryDataCenter.pollMessage("testQueue");
            if(message==null){
                break;
            }
            actualMessage.add(message);
        }
        Assertions.assertEquals(expectedMessages.size(),actualMessage.size());
        for(int i=0;i<actualMessage.size();i++){
            Assertions.assertEquals(expectedMessages.get(i),actualMessage.get(i));
        }
    }

7.测试待确认消息

@Test
    public void testMessageWaitAck(){
        Message expectedMessage = createTestMessage("expectedMessage");
        memoryDataCenter.addMessageWaitAck("testQueue",expectedMessage);
        Message actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
        Assertions.assertEquals(expectedMessage,actualMessage);
        memoryDataCenter.removeMessageWaitAck("testQueue",expectedMessage.getMessageId());
        actualMessage = memoryDataCenter.getMessageWaitAck("testQueue",expectedMessage.getMessageId());
        Assertions.assertNull(actualMessage);
    }

8.测试从硬盘上读取消息到内存

@Test
    public void testRecovery() throws IOException, MqException, ClassNotFoundException {
        SpringDemoMqApplication.context = SpringApplication.run(SpringDemoMqApplication.class);
        DiskDataCenter diskDataCenter = new DiskDataCenter();
        diskDataCenter.init();

        Exchange expectedExchange = createTestExchange("testExchange");
        diskDataCenter.insertExchange(expectedExchange);

        MsgQueue expectedQueue = createTestQueue("testQueue");
        diskDataCenter.insertQueue(expectedQueue);

        Binding expectedBinding = new Binding();
        expectedBinding.setExchangeName("testExchange");
        expectedBinding.setQueueName("testQueue");
        expectedBinding.setBindingKey("testBindingKey");
        diskDataCenter.insertBinding(expectedBinding);

        Message expectedMessage = createTestMessage("testContent");
        diskDataCenter.sendMessage(expectedQueue,expectedMessage);

        memoryDataCenter.recovery(diskDataCenter);

        Exchange actualExchange = memoryDataCenter.getExchange("testExchange");
        Assertions.assertEquals(expectedExchange.getName(),actualExchange.getName());
        Assertions.assertEquals(expectedExchange.getType(),actualExchange.getType());

        MsgQueue actualQueue = memoryDataCenter.getQueue("testQueue");
        Assertions.assertEquals(expectedQueue.getName(),actualQueue.getName());
        Assertions.assertEquals(expectedQueue.isExclusive(),actualQueue.isExclusive());

        Binding actualBinding = memoryDataCenter.getBinding("testExchange","testQueue");
        Assertions.assertEquals(expectedBinding.getExchangeName(),actualBinding.getExchangeName());
        Assertions.assertEquals(expectedBinding.getQueueName(),actualBinding.getQueueName());

        Message actualMessage = memoryDataCenter.pollMessage("testQueue");
        Assertions.assertEquals(expectedMessage.getMessageId(),actualMessage.getMessageId());
        Assertions.assertEquals(expectedMessage.getRoutingKey(),actualMessage.getRoutingKey());
        Assertions.assertEquals(expectedMessage.getDeliverMode(),actualMessage.getDeliverMode());
        Assertions.assertArrayEquals(expectedMessage.getBody(),actualMessage.getBody());

        SpringDemoMqApplication.context.close();
        File dataDir = new File("./data");
        FileUtils.deleteDirectory(dataDir);
    }

四、总结

  本篇文章主要介绍了一下如何在内存中管理数据,代码层面也是对相关概念进行操作以及测试,下一篇文章我们将学习虚拟主机设计内容,感谢观看!


http://www.kler.cn/a/441638.html

相关文章:

  • 循环队列(C语言版)
  • 【游戏设计原理】77 - 沙盒与导轨
  • 快速学习GO语言总结
  • qiankun+vite+vue3
  • 【经验分享】ARM Linux-RT内核实时系统性能评估工具
  • STM32+W5500+以太网应用开发+003_TCP服务器添加OLED(u8g2)显示状态
  • Qt WORD/PDF(四)使用 QAxObject 对 Word 替换(QWidget)
  • VScode:常见问题的原因及其解决方案
  • Linux高性能服务器编程 | 读书笔记 | 9.定时器
  • Git简介和特点
  • Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:教室信息管理系统(前后端源码 + 数据库 sql 脚本)
  • Unity全局光照详解
  • 基于Spring Boot的摄影师分享交流社区
  • 了解ARM的千兆以太网——RK3588
  • AI技术赋能电商行业:创新应用与未来展望
  • linux 添加默认网关
  • 25上半年软考《系统架构设计师》,备考大纲已出!
  • Qt Q_ENUM enum 转 QString 枚举字符串互转; C++模板应用
  • Rust关键字实例解析
  • 虚拟机安装+XS hell+Xfit(安装方法大致都相同,若不一样,可看其他的)
  • PVE——OpenWRT 硬盘 size单位的调整
  • 【计算机网络】期末考试预习复习|上
  • Java全体系精华(上):从基础到框架,构建坚实开发技能
  • MySQL(七)---C/C++连接MySQL
  • Roslyn 是什么?
  • 从 CephFS 到 JuiceFS:同程旅行亿级文件存储平台构建之路