当前位置: 首页 > article >正文

【JAVA】Java项目实战—分布式微服务项目:分布式消息队列

在软件开发中,分布式系统和微服务架构已经成为主流。随着应用程序的复杂性增加,单体应用的局限性日益显现,分布式微服务架构能够将大型应用拆分为多个小型、独立的服务,从而提高系统的可维护性、可扩展性和容错性。

在微服务架构中,各个服务之间的通信变得至关重要。为了实现服务之间的解耦、异步处理和负载均衡,消息队列应运而生。消息队列可以有效地解决以下问题:

  1. 解耦:服务之间不直接调用,而是通过消息进行通信,降低了服务间的依赖性。

  2. 异步处理:消息发送后不需要等待响应,可以提高系统的响应速度。

  3. 负载均衡:消息队列可以将请求分发到多个消费者,实现负载均衡。

  4. 可靠性:通过持久化消息,确保即使服务宕机也不会丢失重要数据。

可以将消息队列比作一个快递公司,当你下单后,快递公司会将你的包裹收集起来,然后分发给不同的快递员。你不需要等待快递员到达你的门口再进行下一步的操作,而是可以继续做其他事情。同样,在微服务架构中,服务A可以将消息发送到消息队列,而不需要等待服务B的处理完成。

理论知识

1. 消息队列的基本概念

消息队列是一种通信机制,允许应用程序通过发送和接收消息来进行交互。它通常由以下几个组件组成:

  • 生产者(Producer):负责发送消息的应用程序或服务。

  • 消费者(Consumer):负责接收和处理消息的应用程序或服务。

  • 消息代理(Message Broker):负责接收、存储和转发消息的中间件。

2. 消息的生命周期

消息在消息队列中的生命周期通常包括以下几个阶段:

  1. 发送:生产者将消息发送到消息队列。

  2. 存储:消息代理将消息存储在队列中,等待消费者处理。

  3. 接收:消费者从消息队列中获取消息。

  4. 处理:消费者处理接收到的消息。

  5. 确认:消费者处理完成后,向消息代理确认消息已被成功处理。

3. 消息队列的实现

常见的消息队列实现有:

  • RabbitMQ:基于AMQP协议的开源消息代理,支持多种消息传递模式。

  • Kafka:分布式流处理平台,特别适合大规模数据处理。

  • ActiveMQ:开源消息中间件,支持多种协议。

实战示例:使用RabbitMQ实现消息队列

环境准备

在本示例中,我们将使用RabbitMQ作为消息队列。首先,你需要安装RabbitMQ并启动服务。可以通过Docker快速启动RabbitMQ:

docker run -d --hostname rabbit --name rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management

Maven依赖

在你的Java项目中,添加RabbitMQ的依赖。在pom.xml中添加以下内容:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.15.0</version>
</dependency>

代码实现

1. 创建生产者

生产者负责发送消息到RabbitMQ队列。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址
        try (Connection connection = factory.newConnection(); // 创建连接
             Channel channel = connection.createChannel()) { // 创建信道
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String message = "Hello World!"; // 要发送的消息
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); // 发送消息
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

代码解释

  • ConnectionFactory:用于创建连接到RabbitMQ的连接。

  • Connection:代表与RabbitMQ的连接。

  • Channel:用于发送和接收消息的信道。

  • queueDeclare:声明一个队列,如果队列不存在则创建。

  • basicPublish:发送消息到指定的队列。

2. 创建消费者

消费者负责接收并处理消息。

import com.rabbitmq.client.*;

public class Consumer {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost"); // 设置RabbitMQ服务器地址
        try (Connection connection = factory.newConnection(); // 创建连接
             Channel channel = connection.createChannel()) { // 创建信道
            // 声明队列
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            // 创建回调函数处理消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8"); // 获取消息
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); // 开始消费
        }
    }
}

代码解释

  • DeliverCallback:回调函数,用于处理接收到的消息。

  • basicConsume:开始消费消息,第一个参数是队列名,第二个参数是是否自动确认消息。

运行示例

  1. 启动RabbitMQ服务。

  2. 先运行消费者类Consumer,等待接收消息。

  3. 然后运行生产者类Producer,发送消息。

你会在消费者的控制台中看到接收到的消息。

总结

通过以上示例,我们实现了一个简单的分布式消息队列。生产者将消息发送到RabbitMQ,消费者从队列中接收并处理消息。例如订单处理、用户通知、日志收集等。通过引入消息队列,系统的可扩展性和可靠性得到了显著提升。


http://www.kler.cn/a/444473.html

相关文章:

  • SSM 与 Vue 共筑电脑测评系统:精准洞察电脑世界
  • 【Rust自学】4.2. 所有权规则、内存与分配
  • jdk和cglib动态代理区别
  • 深入详解线性代数基础知识:理解矩阵与向量运算、特征值与特征向量,以及矩阵分解方法(如奇异值分解SVD和主成分分析PCA)在人工智能中的应用
  • Moretl安全日志采集工具
  • 8_HTML5 SVG (4) --[HTML5 API 学习之旅]
  • Scala项目(一)
  • 3D和AR技术在电商行业的应用有哪些?
  • Flask框架入门与实战
  • meta-llama/Llama-3.2-1B 微调记录
  • 数据库设计范式:全面解析与实践指南
  • 【大模型】GraphRAG技术原理
  • Springboot 整合 Java DL4J 打造自然语言处理之智能写作助手
  • 防止私接小路由器
  • git remote -v(--verbose)显示你的 Git 仓库配置的远程仓库的详细信息
  • multiprocessing包详解【Python】
  • websocket服务端开发模式-应用开发-页面端修改
  • Google guava 最佳实践 学习指南之08 `BiMap`(双向映射)
  • CSS系列(23)-- 可访问性实践详解
  • Type-C厂家的环保测试:保障绿色科技的未来
  • LeetCode 1925 统计平方和三元组的数目
  • 启动springboot项目时报错Web server failed to start. Port 8080 was already in use.
  • Pytorch | 对比Pytorch中的十种优化器:基于CIFAR10上的ResNet分类器
  • 美创科技完成新一轮融资!
  • Linux-Profile工具
  • java全栈day19--Web后端实战(java操作数据库3)