当前位置: 首页 > article >正文

用 BlockingQueue 打造轻量级消息队列服务:从原理到实现

在日常开发中,消息队列是解耦系统、提升并发性能的重要工具。然而,对于一些简单场景,引入外部消息队列(如 RabbitMQ、Kafka)可能过于复杂。这时,Java 提供的 BlockingQueue 是一个轻量级但功能强大的选择,可以帮助快速实现内存级别的消息队列服务。

本文将介绍如何使用 BlockingQueue 搭建一个简单的消息队列服务,并实现生产者、消费者和监控功能。


一、为什么选择 BlockingQueue

BlockingQueue 是 Java 并发包(java.util.concurrent)中的一个接口,专为生产者-消费者模型设计,具备以下特点:

  1. 线程安全:内部使用锁和条件变量,无需额外同步机制。
  2. 阻塞能力:生产者在队列满时阻塞,消费者在队列空时阻塞,天然适配消息队列的使用场景。
  3. 多种实现ArrayBlockingQueueLinkedBlockingQueuePriorityBlockingQueue 等满足不同需求。

适合在以下场景中使用:

  • 内存中处理小规模任务队列。
  • 简单的异步任务调度。
  • 无需持久化的高效消息传递。

二、消息队列服务的架构设计

我们通过以下几个模块搭建消息队列服务:

  1. 生产者:负责向队列发送消息。
  2. 消费者:从队列中取出消息并处理。
  3. 队列服务:统一管理队列及其操作。
  4. 监控模块:提供队列状态(如大小)及运行情况。

三、核心实现

1. 定义消息队列服务

创建一个消息队列服务类,使用 BlockingQueue 管理消息队列:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class MessageQueueService<T> {

    private final BlockingQueue<T> queue;

    public MessageQueueService(int capacity) {
        this.queue = new LinkedBlockingQueue<>(capacity); // 有界队列
    }

    // 生产消息
    public void produce(T message) throws InterruptedException {
        queue.put(message); // 阻塞式放入队列
        System.out.println("Produced: " + message);
    }

    // 消费消息
    public T consume() throws InterruptedException {
        T message = queue.take(); // 阻塞式取出消息
        System.out.println("Consumed: " + message);
        return message;
    }

    // 获取队列大小
    public int getQueueSize() {
        return queue.size();
    }
}

2. 编写生产者任务

生产者负责不断向队列中放入消息:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ProducerTask implements Runnable {

    private final MessageQueueService<String> queueService;

    public ProducerTask(MessageQueueService<String> queueService) {
        this.queueService = queueService;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 100; i++) { // 模拟 100 条消息
                queueService.produce("Message-" + i);
                Thread.sleep(100); // 模拟生产间隔
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

3. 编写消费者任务

消费者从队列中取出消息并处理:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerTask implements Runnable {

    private final MessageQueueService<String> queueService;

    public ConsumerTask(MessageQueueService<String> queueService) {
        this.queueService = queueService;
    }

    @Override
    public void run() {
        try {
            while (true) { // 持续消费消息
                String message = queueService.consume();
                processMessage(message);
                Thread.sleep(200); // 模拟处理时间
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void processMessage(String message) {
        System.out.println("Processed: " + message);
    }
}

4. 监控模块

通过定时任务输出队列状态,便于观察运行情况:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class QueueMonitor {

    private final MessageQueueService<?> queueService;

    public QueueMonitor(MessageQueueService<?> queueService) {
        this.queueService = queueService;
    }

    public void startMonitoring() {
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
        executor.scheduleAtFixedRate(() -> {
            System.out.println("Queue size: " + queueService.getQueueSize());
        }, 0, 1, TimeUnit.SECONDS); // 每秒输出一次队列大小
    }
}

5. 主程序整合

启动生产者、消费者和监控服务:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BlockingQueueDemo {

    public static void main(String[] args) {
        // 初始化消息队列服务
        MessageQueueService<String> queueService = new MessageQueueService<>(10);

        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // 启动生产者
        executor.execute(new ProducerTask(queueService));

        // 启动消费者
        executor.execute(new ConsumerTask(queueService));

        // 启动监控服务
        new QueueMonitor(queueService).startMonitoring();

        // 添加关闭钩子,优雅关闭线程池
        Runtime.getRuntime().addShutdownHook(new Thread(executor::shutdown));
    }
}

四、运行效果

运行程序后,您将看到以下日志输出:

Produced: Message-0
Queue size: 1
Consumed: Message-0
Processed: Message-0
Queue size: 0
Produced: Message-1
Queue size: 1
...

五、优化与扩展

1. 增加并发支持

通过配置多个生产者或消费者线程提高处理能力:

executor.execute(new ProducerTask(queueService)); // 多个生产者
executor.execute(new ConsumerTask(queueService)); // 多个消费者

2. 超时处理

可使用 offerpoll 方法实现带超时的生产和消费,避免长时间阻塞。

3. 优雅关闭

添加标志位控制消费者结束循环,实现优雅关闭。

4. 适配其他消息模型

根据需求切换到其他 BlockingQueue 实现,如:

  • PriorityBlockingQueue:按优先级消费消息。
  • DelayQueue:支持延迟消息。

六、总结

通过 BlockingQueue,我们可以轻松实现一个内存级别的轻量消息队列。它在简单场景中具有良好的性能和易用性,是一个无需引入外部依赖的高效解决方案。


http://www.kler.cn/a/408542.html

相关文章:

  • repmgr安装及常用运维指令
  • CSS3新特性——字体图标、2D、3D变换、过渡、动画、多列布局
  • 第三十九篇 ShuffleNet V1、V2模型解析
  • (免费送源码)计算机毕业设计原创定制:Java+JSP+HTML+JQUERY+AJAX+MySQL springboot计算机类专业考研学习网站管理系统
  • 利用Python爬虫获取商品评论:技术与实践
  • 软件工程导论 选填题知识点总结
  • [Docker-显示所有容器IP] 显示docker-compose.yml中所有容器IP的方法
  • 本地推流,服务器拉流全流程
  • SCP文件传输命令解析
  • C++:用红黑树封装map与set-1
  • 前端:JavaScript (学习笔记)【2】
  • 每日计划-1124
  • 嵌入式Linux——文件类型
  • 【jvm】java对象的访问定位
  • MATLAB的语音信号采集与处理分析
  • Puppeteer 和 Cheerio 在 Node.js 中的应用
  • React学习06- API扩展
  • 如何理解tensor中张量的维度
  • Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:电影院后台管理系统(前后端源码 + 数据库 sql 脚本)
  • 亚太杯数学建模C题思路与算法(2024)
  • 手机文件可以打印出来吗
  • linux 运维常用命令
  • 使用NAS开启无纸化办公,Docker部署开源文档管理系统『Paperless-ngx』
  • kali中信息收集的一些常用工具
  • hugo文章支持数学公式
  • 第三十章 TCP 客户端 服务器通信 - 作业服务器资源