ABP - 事件总线之分布式事件总线
ABP - 事件总线之分布式事件总线
- 1. 分布式事件总线的集成
- 1.2 基于 RabbitMQ 的分布式事件总线
- 2. 分布式事件总线的使用
- 2.1 发布
- 2.2 订阅
- 2.3 事务和异常处理
- 3. 自己扩展的分布式事件总线实现
事件总线可以实现代码逻辑的解耦,使代码模块之间功能职责更清晰。而分布式事件总线的功能不止这些,它允许我们通过消息队列进行中转,发布和订阅跨应用/服务边界传输的事件,经常被用作微服务或不同应用程序之间异步发送和接收消息的手段,属于分布式应用通讯的方式之一。
分布式事件总线依赖于 消息队列 中间件,ABP 框架中提供了4种开箱即用的提供程序,我们也可以基于抽象的接口自行实现分布式事件总线提供程序,已有的四种分别适用于 RabbitMQ、Kafka、Rebus 等消息队列,还有一种是默认实现:进程内的分布式事件总线,它允许我们在没有接入消息队列时,也能够编写与分布式体系结构兼容的代码,方便日后可能的微服务拆分,这时它的工作方式与本地事件总线一样,整体的设计思想就和微软的分布式缓存一样。
1. 分布式事件总线的集成
以下的演示还是基于控制台程序,分布式事件总线不会默认集成在 ABP 启动模板之中,需要我们自行集成,Web 应用的集成方式也是一样的。
通过以下命令创建一个控制台程序启动模板:
abp new AbpDistributeEventBusSample -t console
之后再打开解决方案,由于分布式事件总线是在不同应用程序之间进行通讯的,所以还需要再创建一个控制台项目进行演示,将解决方案中的项目复制一份即可。
本地分布式事件总线
首先讲一下进程内的事件总线的集成,这个还是有些必要的,如果有考虑后续进行微服务拆分的情况下,前期对于事件总线的使用可以基于这个进行开发。当然并不是说本地事件总线就不推荐使用,从我自己的日常工作经验中,很多时候还是分布式事件总线和本地事件总线搭配使用的。
首先,分布式事件总线的核心依赖包为 Volo.Abp.EventBus,和本地事件总线一样。我们在需要集成的项目的根目录下,通过以下命令进行集成:
abp add-package Volo.Abp.EventBus
由于是本地分布式事件总线,所以这种方式下是没办法跨进程通讯的,使用方式和上一篇讲的本地事件总线类似,不过使用的时候不再通过 ILocalEventBus接口,而是通过 IDistributedEventBus 接口,主要是用于业务逻辑的解耦,同时也为后续可能的分布式拆分做好准备。具体的使用方式下面细讲。
1.2 基于 RabbitMQ 的分布式事件总线
ABP 框架提供了三种开箱即用的分布式事件总线提供程序,分别对应 RabbitMQ、Kafka、Rebus,通过结合第三方消息队列实现真正基于消息跨进程通讯的分布式事件总线,这里主要讲一下基于 RabbitMQ 的方式,其他方式用法类似。
首先,基于 RabbitMQ 的分布式事件总线需要安装 Volo.Abp.EventBus.RabbitMQ 驱动程序包。可通过一下方式安装:
abp add-package Volo.Abp.EventBus.RabbitMQ
上面创建的两个工程都要安装,因为我们要演示两个进程间的通讯。
之后,需要部署 RabbitMQ 消息队列,这里我通过 docker 快速启动一个带有管理平台的 RabbitMQ,命令如下:
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq rabbitmq:management
RabbitMQ 默认用户密码为:guest / guest,这里只是用于测试就直接使用了。生产环境中大家最好将默认用户禁用,另行创建自己的用户。RabbitMQ 相关的更详细的使用和配置这里就不细讲了,详细内容可见 [[2.1 RabbitMQ基本概念]] 系列文章。
然后,添加分布式事件总线相应的配置,我们可以在 appsettings.json 文件中添加以下配置节点:
"RabbitMQ": {
"Connections": {
// 这里的配置支持 RabbitMQ 官方 sdk 中的 ConnectionFactory 的任意属性的配置
"Default": {
"HostName": "localhost", // rabbitmq 地址,集群环境下多个ip用逗号分隔
"Port": "5672", // rabbbitmq 端口,默认5672
"UserName": "guest",
"Password": "guest"
}
// 允许配置多个 rabbitmq 连接,但只能有一个用于事件总线
//"Second": {
// "HostName": "xxx.xxx.xxx.xxx", // rabbitmq 地址,集群环境下多个ip用逗号分隔
// "Port": "5672" // rabbbitmq 端口,默认5672
//}
},
"EventBus": {
"ClientName": "MyClientName", // 用于事件总线的队列名
"ExchangeName": "MyExchangeName", // 用于事件总线的交换机名称
// "ConnectionName": "Default" // 配置多个连接时,指定用于事件总线的 RabbitMQ 连接,默认是 Default
}
}
以上配置,最后都会被转换为 AbpRabbitMqOptions 和 AbpRabbitMqEventBusOptions,所以我们也可以直接在代码中对这两个选项进行配置:
Configure<AbpRabbitMqOptions>(options =>
{
options.Connections.Default.UserName = "guest";
options.Connections.Default.Password = "guest";
options.Connections.Default.HostName = "localhost";
options.Connections.Default.Port = 5672;
});
Configure<AbpRabbitMqEventBusOptions>(options =>
{
options.ClientName = "TestApp1";
options.ExchangeName = "TestMessages";
});
两种方式选择一种即可,如果两种方式同时使用,代码配置优先于配置文件。解决方案的两个项目都需要进行配置,进行通讯的两个项目需要连接到同一个队列。
完成上面的配置之后,启动应用,即可看到 RabbitMQ 中创建了我们配置的交换机和队列:
2. 分布式事件总线的使用
2.1 发布
事件发布需要一个事件对象,官方将之称为 Eto(事件传输对象),这是一个普通类,用于保存和事件相关的数据,一般以 Eto 作为后缀。就算一个事件不需要传输任何数据,也必须创建一个空类,这和上一章的本地事件总线是一样的,由于在分布式事件触发之后,事件对象会被序列化传输到消息队列中,所以事件对象应避免循环引用、多态、私有setter,并提供默认(空)构造函数,如果你有其他的构造函数(虽然某些序列化器可能会正常工作)。 下面是一个用于测试的 Eto 对象的定义。
[EventName("helloEvent")]
public class HelloEto
{
public string Who { get; set; }
public DateTime When { get; set; }
public string ToWho { get; set; }
}
默认情况下,事件名将事件名称将是事件类的全名,我们可以通过 EventNameattribute 特性指定事件名称。
分布式事件的发布通过 IDistributedEventBus 接口,只需将其注入到相应的类中使用即可,使用方式和本地事件总线一样。
public class HelloWorldService : ITransientDependency
{
private readonly IDistributedEventBus _distributedEventBus;
public HelloWorldService(IDistributedEventBus distributedEventBus)
{
_distributedEventBus = distributedEventBus;
}
public async Task SayHelloAsync()
{
await _distributedEventBus.PublishAsync(new HelloEto
{
Who = "Jerry",
When = DateTime.Now,
ToWho = "Jack"
});
}
}
以上代码写在 AbpDistributeSample 项目中,这里是事件发布的进程。
2.2 订阅
事件的订阅也和本地事件总线类似,这里通过实现了 IDistributedEventHandler<TEvent>
接口的处理器来处理事件,当前像上一章本地事件总线中讲到的,我们也可以通过 IDistributedEventBus 来自己订阅事件。
public class HelloDistribuedEventHandler : IDistributedEventHandler<HelloEto>, ITransientDependency
{
public Task HandleEventAsync(HelloEto eventData)
{
Console.WriteLine($"{eventData.Who} Say Hello To {eventData.ToWho} at { eventData.When.ToString("yyyy-MM-dd HH:mm:ss") }");
return Task.CompletedTask;
}
}
一个事件处理程序可以同时处理多种事件,只需要实现多个针对不同 ETO 的 IDistributedEventHandler 泛型接口即可。
之后通过 vs 设置多项目启动:
应用启动之后,可以看到控制台的输出如下,一个简单的基于消息队列的跨进程通讯已经完成:
同时在 RabbitMQ 上可以看到已经连接上来了2个消费者,每个进程既作为生产者,也作为消费者:
通过源码可以看到,分布式事件总线中会进行初始化,其实就是根据配置连接了 RabbitMQ 队列,并创建了一个消费者自动订阅消息队列上的事件,当接受到消息之后会从消息中获取消息的类型和具体的数据,触发消息执行处理程序,如果事件处理程序成功执行(没有抛出任何异常),它将向消息代理发送确认(ACK)。
这里的 EventTypes 其实就是维护消息类型和它的类名的对应关系的一个字典,在 SubscribeHandlers 中初始化消息类型和执行器对应关系的时候维护的,这个就和前一篇的本地事件总线大同小异了。
而消息的发布过程就更简单了,也就是将消息序列化,发送到 RabbitMQ 队列中。
ABP 分布式事件总线从 5.0 版本开始,还加入了 Inbox 、Outbox 机制,我们可以通过 AbpDistributedEventBusOptions 选项进行配置,例如:
Configure<AbpDistributedEventBusOptions>(option =>
{
option.Inboxes.Configure(config => config.UseDbContext<TDbContext>());
option.Outboxes.Configure(config => config.UseDbContext<TDbContext>());
});
从配置方式也可以看出,这里是借助了数据库的,实际上 Inbox 就是从消息队列接收到消息之后,将消息先存入数据库中,没有直接执行。
而在应用启动的时候,消息队列模块会注册一个 InboxProcessManager,实际上这就是一个后台工作者,这里面又用到 IInboxProcessor
在 InboxProcessor 中再通过定时器,每个一段时间从数据库中读取消息真正地去执行,并且将数据库中的记录删除。
OutBox 的机制也是一样的,这样做大概就是起到缓冲的作用,避免短时间内大量的消息对消息队列或者我们的消费者造成冲击导致应用崩溃,而是将这些消息的发送、执行均匀地处理。
2.3 事务和异常处理
分布式事件总线默认实现是 LocalDistributedEventBus
,在没有集成 RabbitMQ 等第三方消息队列中间件,并且没有使用发件箱/收件箱模式的情况,事件发布和事件订阅是在同一个进程的。如果当前事件发布的模块使用了工作单元,事件总线是在和发布事件的同一工作单元范围内执行事件处理程序,如果事件处理程序抛出异常,那么相关的工作单元(数据库事务)将被回滚。这样,我们的应用程序逻辑和事件处理逻辑就具有事务性(原子)。如果想忽略事件处理程序中的错误,则必须在处理程序中使用try-catch
块,并且不应该重新抛出异常。
当我们切换到真正的分布式事件总线提供程序时,事件处理程序将在不同的进程/应用程序中执行。在这种情况下,实现事务性事件发布的唯一方法是使用发件箱/收件箱模式。这篇文章上面的内容简单地提了一下发件箱/收件箱,之后还会有文章详细梳理它的工作模式。
如果在未真正使用分布式事件总线的情况下,想在工作单元中立即发布事件,而不是等到工作单元中的逻辑执行完成之后再发布,可以在使用IDistributedEventBus.PublishAsync
方法时将onUnitOfWorkComplete
设置为false
。如果接入 RabbitMQ 等第三方消息队列实现了分布式事件总线,想要立即发布事件,则还得将 useOutbox
设置为 false。
3. 自己扩展的分布式事件总线实现
要注意的一点是,ABP 框架的分布式事件总线只能配置使用一个队列,这就意味着如果一个进程需要和多个进程进行通讯时,是无法通过不同的队列进行区分的。当多个进程都连接到同一个队列中时,我们无法控制一个进程发送的消息会被哪个进程所消费,无法单独通知某一个消费者,这是需要注意的。
有一种折中的方式,那就是如果一个事件只想发送给某一个进程,那就只在这个进程中实现其处理程序,其他没有实现处理程序的进程接收到消息之后,因为没有处理会抛出异常,消息重新返回消息队列中。这种方式依旧是存在一些问题,只是将工作中的一点经验供大家参考一下。
public interface IWantDistributedEventBus
{
Task PublishAsync(Type eventType, object eventData, string queueName);
}
public class RabbitMqWantDistributedEventBus : IWantDistributedEventBus, ISingletonDependency
{
protected ILocalEventBus LocalEventBus { get; }
protected IConnectionPool ConnectionPool { get; }
protected WantRabbitMqEventBusOptions WantRabbitMqEventBusOptions { get; }
protected AbpLocalEventBusOptions AbpLocalEventBusOptions { get; }
protected IRabbitMqSerializer Serializer { get; }
protected IRabbitMqMessageConsumer Consumer { get; private set; }
protected IRabbitMqMessageConsumerFactory MessageConsumerFactory { get; }
protected ConcurrentDictionary<string, Type> EventTypes { get; }
public RabbitMqWantDistributedEventBus(
IConnectionPool connectionPool,
IOptions<WantRabbitMqEventBusOptions> options,
IOptions<AbpLocalEventBusOptions> localEventBusOptions,
IRabbitMqSerializer serializer,
IRabbitMqMessageConsumerFactory rabbitMqMessageConsumerFactory,
ILocalEventBus localEventBus)
{
ConnectionPool = connectionPool;
WantRabbitMqEventBusOptions = options.Value;
Serializer = serializer;
MessageConsumerFactory = rabbitMqMessageConsumerFactory;
LocalEventBus = localEventBus;
AbpLocalEventBusOptions = localEventBusOptions.Value;
EventTypes = new ConcurrentDictionary<string, Type>();
}
/// <summary>
/// 队列消费者初始化
/// </summary>
public void Initialize()
{
foreach (var queueName in WantRabbitMqEventBusOptions.ConsumeQueueNames)
{
Consumer = MessageConsumerFactory.Create(
new ExchangeDeclareConfiguration(
WantRabbitMqEventBusOptions.ExchangeName,
type: "direct",
durable: true
),
new QueueDeclareConfiguration(
queueName,
durable: true,
exclusive: false,
autoDelete: false
),
WantRabbitMqEventBusOptions.ConnectionName
);
Consumer.BindAsync(queueName);
Consumer.OnMessageReceived(ProcessEventAsync);
}
LoadEventTypes(AbpLocalEventBusOptions.Handlers);
}
/// <summary>
/// 消息消费逻辑
/// </summary>
/// <param name="message">从队列接收到的消息</param>
/// <returns></returns>
private async Task ProcessEventAsync(IModel channel, BasicDeliverEventArgs ea)
{
var msgData = (MessageData)Serializer.Deserialize(ea.Body.ToArray(), typeof(MessageData));
var eventName = msgData.Type;
var eventType = EventTypes.GetOrDefault(eventName);
// eventType为空, 即不存在类型对应的Handler,消息不应该被消费
if (eventType == null)
{
// return;
throw new NotFoundHandlerException($"不存在{eventName}消息Handler!");
}
// 通过消息类型转发LocalHandler进行具体逻辑处理
var eventData = Serializer.Deserialize(Serializer.Serialize(msgData.Data), eventType);
await LocalEventBus.PublishAsync(eventType, eventData);
}
/// <summary>
/// 根据现有注册的Handler构建消息类型集合
/// </summary>
/// <param name="handlers">当前应用中Handler类型集合</param>
/// <returns></returns>
public Task LoadEventTypes(ITypeList<IEventHandler> handlers)
{
foreach (var handler in handlers)
{
var interfaces = handler.GetInterfaces();
foreach (var @interface in interfaces)
{
if (!typeof(IEventHandler).GetTypeInfo().IsAssignableFrom(@interface))
{
continue;
}
var genericArgs = @interface.GetGenericArguments();
if (genericArgs.Length == 1)
{
var eventTypeName = genericArgs[0].FullName;
if (!EventTypes.ContainsKey(eventTypeName))
{
EventTypes[eventTypeName] = genericArgs[0];
}
}
}
}
return Task.CompletedTask;
}
/// <summary>
/// 发布消息
/// </summary>
/// <param name="eventType">消息类型</param>
/// <param name="eventData">消息内容</param>
/// <param name="queueName">队列名称</param>
/// <returns></returns>
public Task PublishAsync(Type eventType, object eventData, string queueName)
{
var msg = new MessageData { Type = eventType.FullName, Data = eventData };
var body = Serializer.Serialize(msg);
using (var channel = ConnectionPool.Get(WantRabbitMqEventBusOptions.ConnectionName).CreateModel())
{
channel.ExchangeDeclare(
WantRabbitMqEventBusOptions.ExchangeName,
"direct",
durable: true
);
var properties = channel.CreateBasicProperties();
properties.DeliveryMode = RabbitMqConsts.DeliveryModes.Persistent;
var queue = channel.QueueDeclare(queueName, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueName, WantRabbitMqEventBusOptions.ExchangeName, queueName);
channel.BasicPublish(
exchange: WantRabbitMqEventBusOptions.ExchangeName,
routingKey: queueName,
mandatory: true,
basicProperties: properties,
body: body
);
}
return Task.CompletedTask;
}
}
public class WantRabbitMqEventBusOptions
{
public string ConnectionName { get; set; }
public string ClientName { get; set; }
public string ExchangeName { get; set; }
public IList<string> ConsumeQueueNames { get; }
public WantRabbitMqEventBusOptions()
{
ConsumeQueueNames = new List<string>();
}
}
public class MessageData
{
public string Type { get; set; }
public object Data { get; set; }
}
[DependsOn(typeof(AbpRabbitMqModule))]
[DependsOn(typeof(AbpEventBusModule))]
public class WantAbpEventBusRabbitMqModule : AbpModule
{
public override void ConfigureServices(ServiceConfigurationContext context)
{
var configuration = context.Services.GetConfiguration();
Configure<WantRabbitMqEventBusOptions(configuration.GetSection("RabbitMQ:EventBus"));
}
public override void OnApplicationInitialization(ApplicationInitializationContext context)
{
context.ServiceProvider
.GetRequiredService<RabbitMqWantDistributedEventBus>()
.Initialize();
}
}
以上就是 ABP 框架下分布式事件总线的基本知识点,其中也提到了 发件箱/收件箱 等新特性,ABP 框架经过长时间的发展,基本与 .NET 版本同步更新,到现在 9.0 版本,各种组件的特性也在逐步迭代更新。分布式事件总线除了这篇文章中提到的基本内容之外,还有一些新特性,后续会有一篇文章专门讲一下这些特性。
参考文档:
ABP 官方文档 - 分布式事件总线