【分布式事务】二、NET8分布式事务实践: DotNetCore.CAP 框架 、 消息队列(RabbitMQ)、 多类型数据库(MySql、MongoDB)
介绍
DotNetCore.CAP简称CAP, [CAP]是一个用来解决微服务或者分布式系统中分布式事务问题的一个开源项目解决方案,
同样可以用来作为 EventBus 使用,CAP 拥有自己的特色,它不要求使用者发送消息或者处理消息的时候实现或者继承任何接口,拥有非常高的灵活性。我们一直坚信约定大于配置,所以CAP使用起来非常简单,对于新手非常友好,并且拥有轻量级。
CAP 采用模块化设计,具有高度的可扩展性。你有许多选项可以选择,包括消息队列,存储,序列化方式等,系统的许多元素内容可以替换为自定义实现。
- github地址:https://github.com/dotnetcore/CAP
- 官网地址: https://cap.dotnetcore.xyz/
- 官网文档:https://cap.dotnetcore.xyz/user-guide/zh/getting-started/quick-start/
eShopOnContainer选择
环境准备
- 安装 MySql 参考: 七、阿里云 Linux CentOs7安装MySql
- 安装Docker 参考:六、Docker安装
- Docker 安装配置 MongDB 集群 参考:十、Linux Docker 安装配置 MongoDB集群
- Docker 安装 RabbitMQ 参考: 九、Linux Docker 安装 RabbitMQ
本项目源码地址:分布式事务架构 DotNetCore.CAP
一、NET8 集成 DotNetCore.CAP
1、新建分布式项目
Nuget引用
DotNetCore.CAP
DotNetCore.CAP.Dashboard #consul监听
CAP提供了Kafka、RabbitMQ消息队列
DotNetCore.CAP.Kafka
DotNetCore.CAP.RabbitMQ
CAP提供了SqlServer、MySql、PostgreSql、MongoDB 的扩展作为数据库存储
DotNetCore.CAP.SqlServer
DotNetCore.CAP.MySql
DotNetCore.CAP.PostgreSql
DotNetCore.CAP.MongoDB
我是在以前的 Abp.Vnext 项目上做的,其它框架其实也差不多,项目结构如下图:
2、用户服务(LAbpVnext.WebApi)
连接 127.0.0.1
userdb MySql数据库
(1)、UsersController.cs
代码
using DotNetCore.CAP;
using LAbpVnext.Application;
using Microsoft.AspNetCore.Mvc;
namespace LAbpVnext.WebApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class UsersController : ControllerBase
{
private static string _publishName = "RabbitMQ.MySql.OrderService";
private readonly ICapPublisher _iCapPublisher;
private readonly ILogger<UsersController> _logger;
private readonly IUserAppService _userAppService;
public UsersController(ICapPublisher capPublisher, ILogger<UsersController> logger, IUserAppService userAppService)
{
_iCapPublisher = capPublisher;
_logger = logger;
_userAppService = userAppService;
}
[HttpGet("Login")]
public ActionResult Login()
{
return Ok();
}
/// <summary>
/// 获取所有用户
/// </summary>
/// <returns></returns>
[HttpGet("GetAll")]
public async Task<List<UserDto>> GetAll()
{
var users = await _userAppService.GetAll();
return users;
}
/// <summary>
/// 事务测试
/// </summary>
/// <returns></returns>
[HttpGet("TestTransaction")]
public async Task<UserDto> TestTransaction()
{
var user = await _userAppService.GetById(1);
Console.WriteLine($"【用户】发布任务成功!{
DateTime.Now.ToString()}");
await _iCapPublisher.PublishAsync(_publishName, user);
return user;
}
}
}
(2)、LAbpVnextUserApiModule.cs
代码
using DotNetCore.CAP.Messages;
using LAbpVnext.Application;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.OpenApi.Models;
using Volo.Abp;
using Volo.Abp.AspNetCore.Mvc;
using Volo.Abp.Autofac;
using Volo.Abp.Modularity;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
namespace LAbpVnext.WebApi
{
[DependsOn(
typeof(AbpAspNetCoreMvcModule),
typeof(AbpAutofacModule),
typeof(LAbpVnextApplicationModule)
)]
public class LAbpVnextUserApiModule : AbpModule
{
/// <summary>
/// 依赖注入容器
/// </summary>
/// <param name="context"></param>
public override void ConfigureServices(ServiceConfigurationContext context)
{
var basePath = AppContext.BaseDirectory;
var configuration = context.Services.GetConfiguration();
//引入配置文件
var _config = new ConfigurationBuilder()
.SetBasePath(basePath)
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
// Add services to the container.
context.Services.AddControllers();
#region 添加 CAP
context.Services.AddCap(x =>
{
x.UseMySql(_config.GetConnectionString("Default"));//指定CAP的发布数据库地址
x.UseRabbitMQ(c => {
c.HostName = _config["RabbitMQ:Host"];
c.UserName = _config["RabbitMQ:User"];
c.Password = _config["RabbitMQ:Pwd"];
});//
x.FailedRetryCount = 10;//
x.FailedRetryInterval = 60;//
x.FailedThresholdCallback = failed => {
Console.WriteLine($"MessageType {
failed.MessageType} 失败了, 重试了 {
x.FailedRetryCount} 次, 消息名称: {
failed.Message.GetName()}");
};
#region 注册Consul可视化
//旧:DiscoveryOptions;新:ConsulDiscoveryOptions
//DiscoveryOptions discoveryOptions = new DiscoveryOptions();
//this.Configuration.Bind(discoveryOptions);
//x.UseDashboard();
//ConsulDiscoveryOptions discoveryOptions = new ConsulDiscoveryOptions();
//configuration.Bind(discoveryOptions);
//x.UseConsulDiscovery(d =>
//{
// d.DiscoveryServerHostName = discoveryOptions.DiscoveryServerHostName;
// d.DiscoveryServerPort = discoveryOptions.DiscoveryServerPort;
// d.CurrentNodeHostName = discoveryOptions.CurrentNodeHostName;
// d.CurrentNodePort = discoveryOptions.CurrentNodePort;
// d.NodeId = discoveryOptions.NodeId;
// d.NodeName = discoveryOptions.NodeName;
// d.MatchPath = discoveryOptions.MatchPath;
//});
#endregion
});
#endregion
#region 添加swagger注释
context.Services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Version = "v1",
Title = "LAbpVnext.UserApi"
});
var xmlPath = Path.Combine(basePath, "LAbpVnext.UserApi.xml");
c.IncludeXmlComments(xmlPath, true);
//var xmlDomainPath = Path.Combine(basePath, "HuaWeiServer.Domain.xml");
//c.IncludeXmlComments(xmlDomainPath, true);
c.AddSecurityDefinition("Bearer", new OpenApiSecurityScheme
{
Description = "Value: Bearer {token}",
Name = "Authorization",
In = ParameterLocation.Header,
Type = SecuritySchemeType.ApiKey,
Scheme = "Bearer"
});
c.AddSecurityRequirement(new OpenApiSecurityRequirement()
{
{
new OpenApiSecurityScheme
{
Reference = new OpenApiReference
{
Type = ReferenceType.SecurityScheme,
Id = "Bearer"
},Scheme = "oauth2",Name = "Bearer",In = ParameterLocation.Header,
},