当前位置: 首页 > article >正文

微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

微服务设计模式 - 发布订阅模式(Publisher Subscriber Pattern)

Publisher-Subscriber-Pattern3.

定义

发布-订阅模式(Publisher-Subscriber Pattern)是一种常见的设计模式,被广泛用于云计算和分布式系统中,以实现松散耦合的组件间通信。发布-订阅模式本质上是一种消息传递模式,其中消息发布者(Publisher)不会将消息直接发送给特定的接收者(Subscriber),而是将消息发布到一个中介(如消息通道或事件总线),订阅者通过订阅中介来接收感兴趣的消息。通过这种方式,发布者和订阅者之间实现了松耦合。

结构

发布-订阅模式主要包括以下几个组件:

  1. 发布者:生产并发布消息的源头。
  2. 中介:用于发布和传递消息的消息通道或事件总线。
  3. 订阅者:接收并处理消息的终点。
+-----------+             +---------+             +---------+
|  发布者   |  ---发布--> |  中介   | ---广播-->  |  订阅者  |
+-----------+             +---------+             +---------+
                                ^                     |
                                |                     |
+-----------+                   |        订阅         |
|  订阅者   | ------------------+
+-----------+

工作原理

Publisher Subscriber Pattern Sequence

发布-订阅模式的工作流程如下:

  1. 消息发布:发布者生成消息并将其发布到中介(如消息队列或事件总线)。
  2. 消息存储与管理:中介接收并存储消息,同时管理订阅者信息。
  3. 消息广播:中介将消息广播给所有订阅了该消息的订阅者。
  4. 消息接收与处理:订阅者接收消息并进行处理。

优势与应用场景

发布-订阅模式(Publisher-Subscriber Pattern)在分布式系统和微服务架构中被广泛应用,主要优势如下:

1. 松耦合

定义和实现独立:发布者和订阅者相互独立,发布者不需要知道订阅者的具体信息,反之亦然。各个服务之间互不依赖,使得系统模块可以独立开发、测试和部署。

  • 应用情景:一个电商平台上,订单服务和库存服务可以独立存在。订单创建后,库存减少操作不需要紧耦合在一起,可以通过消息队列来解耦。

2. 扩展性

服务扩展变得简单:可以方便地添加新的订阅者,而无需修改已有的发布者或者其他订阅者。新订阅者只需要订阅相应的消息即可。

  • 应用情景:在已有用户服务和通知服务的基础上,可以快速添加一个新的分析服务来订阅用户注册消息以进行数据分析。

3. 灵活性和适应性

动态消息路由:通过路由键和主题交换机,可以实现复杂的消息路由和选择,使得系统更具灵活性和适应性。

  • 应用情景:广告系统通过特定兴趣标签路由消息到不同的消费者,实现个性化推荐。

4. 提高性能和可靠性

异步处理提升性能:发布者可以立即发布消息而不必等待消息被处理,从而提高系统的响应速度。订阅者可以按自己的节奏处理消息,减少系统的耦合和脆弱性。

  • 应用情景:支付系统可以在用户付款后立即返回结果,后续的账单生成和通知可以异步处理。

RabbitMQ

RabbitMQ 作为一个高性能、高可靠性的消息队列系统,与发布-订阅模式非常契合。它提供了丰富的功能,使得在实际开发中实现发布-订阅模式变得相对简单和高效。

RabbitMQ 如何支持发布-订阅模式

  1. 交换机(Exchange):RabbitMQ 使用交换机来接收并路由消息。发布者将消息发送到交换机,交换机再根据路由键将消息转发到绑定的队列。不同类型的交换机(如 fanoutdirecttopicheaders)可以实现各种不同的路由策略。
  2. 队列(Queue):队列是存储消息的地方。消费者从队列中提取消息进行处理。通过将队列绑定到交换机,可以实现消息的广播和分发。
  3. 绑定(Binding):绑定定义了交换机如何将消息路由到队列。绑定一般会指定一个路由键或者模式,从而实现消息过滤和选择。

交换机类型

  • Direct Exchange:直接交换机将消息路由到绑定键匹配的队列上。

  • Fanout Exchange:扇出交换机将消息广播到所有绑定的队列上,不考虑路由键。

  • Topic Exchange:主题交换机通过路由键模式(如 user.*order.#)路由消息,允许更灵活的路由策略。

  • Headers Exchange:头部交换机通过消息的头部属性来路由消息。

示例代码

在微服务架构中,发布-订阅模式常用于事件驱动的通信。以下是一个利用RabbitMQ实现发布-订阅模式的示例。

+-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     用户队列  | 
|            |                 |               | (user.*)        |              |
+-----------+                  +---------------+                  +--------------+
                                                                |              |
                                                                |  订阅者一    | <---处理消息--
                                                                +--------------+

+-----------+                  +---------------+                  +--------------+
|            |                 |               |                 |              |
|  发布者    | ---发布消息---> |     主题交换机 | --路由键--->  |     订单队列  | 
|            |                 |               | (order.*)       |              |
+-----------+                  +---------------+                  +--------------+
                                                                |              |
                                                                |  订阅者二    | <---处理消息--
                                                                +--------------+

项目说明

为了让消费者仅消费特定的事件,我们可以通过多种方式实现。常见的方法包括:

  1. 消息过滤:在发送消息时添加特定的标签或属性,消费者根据这些标签或属性进行过滤。
  2. 不同的队列或主题:将不同类型的事件发布到不同的消息队列或主题上,消费者订阅他们感兴趣的队列或主题。

在Spring Boot中,使用RabbitMQ时,我们可以利用路由键(Routing Key)和基于主题的交换机(Topic Exchange)实现这种功能,具体实现步骤:

  1. 配置RabbitMQ的交换机、队列和绑定关系。

  2. 发布者在发布消息时指定路由键。

  3. 消费者通过 @RabbitListener 注解订阅特定的路由键。

项目结构

event-driven-system/
│
├── src/main/java/com/example/event/
│   ├── EventApplication.java
│   ├── config/
│   │   └── RabbitConfig.java
│   ├── publisher/
│   │   └── MessagePublisher.java
│   ├── subscriber/
│   │   ├── UserSubscriber.java
│   │   └── OrderSubscriber.java
└── src/main/resources/
    ├── application.yml

代码流程

  • 发布者调用 publishUserEvent 方法,向 eventExchange 发布用户创建消息,消息被根据路由键 user.create 发送到 userQueue 中,然后 User Subscriber 消费者从 userQueue 队列中接收消息并处理。
  • 发布者调用 publishOrderEvent 方法,向 eventExchange 发布订单创建消息,消息被根据路由键 order.create 发送到 orderQueue 中,然后 Order Subscriber 消费者从 orderQueue 队列中接收消息并处理。

Publisher Subscriber Pattern Flow

这个设计确保了不同类型的消费者(用户订阅者和订单订阅者)能够接收和处理他们各自关注的消息,从而实现了消费者只消费特定事件的需求。

相关源代码

EventApplication.java

主程序启动类。

package com.example.event;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class EventApplication {

    public static void main(String[] args) {
        SpringApplication.run(EventApplication.class, args);
    }
}
RabbitConfig.java

在配置类中配置基于主题的交换机、队列及其绑定关系。我们可以在配置类中配置 RabbitListenerContainerFactory,并在使用 @RabbitListener 注解时自动使用这个工厂。这能让我们自定义一些与监听器相关的配置,比如并发消费者的数量、消息的确认模式等。

package com.example.event.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.listener.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    @Bean
    public Queue userQueue() {
        return new Queue("userQueue");
    }

    @Bean
    public Queue orderQueue() {
        return new Queue("orderQueue");
    }

    @Bean
    public TopicExchange eventExchange() {
        return new TopicExchange("eventExchange");
    }

    @Bean
    public Binding userBinding(Queue userQueue, TopicExchange eventExchange) {
        return BindingBuilder.bind(userQueue).to(eventExchange).with("user.*");
    }

    @Bean
    public Binding orderBinding(Queue orderQueue, TopicExchange eventExchange) {
        return BindingBuilder.bind(orderQueue).to(eventExchange).with("order.*");
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    @Bean
    public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        // 设置消费者的并发数量,等配置
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        return factory;
    }
}
MessagePublisher.java

发布者在发布消息时指定路由键,以便消费者能按需求过滤消息。

package com.example.event.publisher;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MessagePublisher {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void publishUserEvent(String message) {
        rabbitTemplate.convertAndSend("eventExchange", "user.create", message);
        System.out.printf("Published user event: %s%n", message);
    }

    public void publishOrderEvent(String message) {
        rabbitTemplate.convertAndSend("eventExchange", "order.create", message);
        System.out.printf("Published order event: %s%n", message);
    }
}
UserSubscriber.java

用户订阅者只会接收指定的“user”事件,通过 @RabbitListener 注解明确指定使用 rabbitListenerContainerFactory,从而确保我们自定义的容器配置能够被应用。

package com.example.event.subscriber;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class UserSubscriber {
    @RabbitListener(queues = "userQueue", containerFactory = "rabbitListenerContainerFactory")
    public void handleUserMessage(String message) {
        System.out.printf("Received user event message: %s%n", message);
    }
}
OrderSubscriber.java

订单订阅者只会接收指定的“order”事件。

package com.example.event.subscriber;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class OrderSubscriber {
    @RabbitListener(queues = "orderQueue", containerFactory = "rabbitListenerContainerFactory")
    public void handleOrderMessage(String message) {
        System.out.printf("Received order event message: %s%n", message);
    }
}
application.yml

rabbitmq配置。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest

总结

Robert-C-Martin-Quote-All-race-conditions-deadlock-conditions-and

发布-订阅模式是实现松耦合系统的强大工具,在云计算和分布式系统中应用广泛。通过使用如RabbitMQ的消息中间件,我们可以很容易地在Spring Boot项目中实现这一模式。通过本文的示例,我们展示了如何利用 RabbitListenerContainerFactory 配置消费者行为,并通过Spring Boot注解 @RabbitListener 进行订阅,从而实现发布-订阅模式的精细化控制和一个完整的发布-订阅模式。这种模式不仅提高了系统的扩展性和灵活性,还大大简化了开发过程中的依赖管理。


http://www.kler.cn/a/379352.html

相关文章:

  • 如果 MySQL 主库出现了问题,从库该何去何从呢?
  • C#的Event事件示例小白级剖析
  • ubuntu20.04 加固方案-设置SSH是否使用业界认可的加密算法
  • Jetson Orin NX平台自研载板 IMX477相机掉线问题调试记录
  • 算法【Java】—— 动态规划之斐波那契数列模型
  • YOLOv8改进,YOLOv8引入ResCBAM注意力机制,二次创新C2f结构
  • [java][高级]FilterListenerAjax
  • 同舟化工:实现LTC全流程数字化管控,赋能销售,提升运营效率
  • 基于springboot的Java学习论坛平台
  • 计算机系统架构
  • 【Python单元测试】pytest框架单元测试 配置 命令行操作 测试报告 覆盖率
  • Java项目管理与SSM框架介绍
  • 基于Multisim汽车尾灯电路左转右转刹车检查功能电路(含仿真和报告)
  • 一张自增表里面总共有 7 条数据,删除了最后 2 条数据,重启 mysql 数据库,又插入了一条数据,此时 id 是几?
  • 15分钟学 Go 第 33 天:项目结构
  • 【Git】如何在 Git 中高效合并分支:完整指南
  • 算法笔记()
  • 有效的数独(C语言解法)
  • Kubernetes中的cm存储
  • Docker入门系列——网络
  • Python 中不能正确输出两个浮点数乘积的解决方法
  • 回溯2:深入探讨C语言中的操作符 —— 从基础到进阶
  • Spring中lazy-init属性
  • 大模型日报|10 篇必读的大模型论文
  • 【eNSP】企业网络架构实验
  • 监听el-table中 自定义封装的某个组件的值发现改变调用函数