【分布式事务】二、NET8分布式事务实践: DotNetCore.CAP 框架 、 消息队列(RabbitMQ)、 数据库(MySql、MongoDB)
介绍
[CAP]是一个用来解决微服务或者分布式系统中分布式事务问题的一个开源项目解决方案,
同样可以用来作为 EventBus 使用
- github地址:https://github.com/dotnetcore/CAP
- 官网地址: https://cap.dotnetcore.xyz/
- 官网文档:https://cap.dotnetcore.xyz/userguide/zh/cap/idempotence/
eShopOnContainer选择
环境准备
- 安装 MySql 参考: 七、阿里云 Linux CentOs7安装MySql
- 安装Docker 参考:六、Docker安装
- Docker 安装配置 MongDB 集群 参考:十、Linux Docker 安装配置 MongoDB集群
- Docker 安装 RabbitMQ 参考: 九、Linux Docker 安装 RabbitMQ
NET8 集成 DotNetCore.CAP
1、新建分布式项目
Nuget引用
DotNetCore.CAP
DotNetCore.CAP.Dashboard #consul监听
CAP提供了Kafka、RabbitMQ消息队列
DotNetCore.CAP.Kafka
DotNetCore.CAP.RabbitMQ
CAP提供了SqlServer、MySql、PostgreSql、MongoDB 的扩展作为数据库存储
DotNetCore.CAP.SqlServer
DotNetCore.CAP.MySql
DotNetCore.CAP.PostgreSql
DotNetCore.CAP.MongoDB
我是在以前的 Abp.Vnext 项目上做的,其它框架其实也差不多,项目结构如下图:
2、用户服务(LAbpVnext.WebApi)
连接 127.0.0.1
userdb 数据库
(1)、UsersController
代码
using DotNetCore.CAP;
using LAbpVnext.Application;
using Microsoft.AspNetCore.Mvc;
namespace LAbpVnext.WebApi.Controllers
{
[ApiController]
[Route("[controller]")]
public class UsersController : ControllerBase
{
private static string _publishName = "RabbitMQ.MySql.OrderService";
private readonly ICapPublisher _iCapPublisher;
private readonly ILogger<UsersController> _logger;
private readonly IUserAppService _userAppService;
public UsersController(ICapPublisher capPublisher, ILogger<UsersController> logger, IUserAppService userAppService)
{
_iCapPublisher = capPublisher;
_logger = logger;
_userAppService = userAppService;
}
[HttpGet("Login")]
public ActionResult Login()
{
return Ok();
}
/// <summary>
/// 获取所有用户
/// </summary>
/// <returns></returns>
[HttpGet("GetAll")]
public async Task<List<UserDto>> GetAll()
{
var users = await _userAppService.GetAll();
return users;
}
/// <summary>
/// 事务测试
/// </summary>
/// <returns></returns>
[HttpGet("TestTransaction")]
public async Task<UserDto> TestTransaction()
{
var user = await _userAppService.GetById(1);
Console.WriteLine($"【用户】发布任务成功!{
DateTime.Now.ToString()}");
await _iCapPublisher.PublishAsync(_publishName, user);
return user;
}
}
}
(2)、LAbpVnextUserApiModule
代码
using DotNetCore.CAP.Messages;
using LAbpVnext.Application;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.OpenApi.Models;
using Volo.Abp;
using Volo.Abp.AspNetCore.Mvc;
using Volo.Abp.Autofac;
using Volo.Abp.Modularity;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
namespace LAbpVnext.WebApi
{
[DependsOn(
typeof(AbpAspNetCoreMvcModule),
typeof(AbpAutofacModule),
typeof(LAbpVnextApplicationModule)
)]
public class LAbpVnextUserApiModule : AbpModule
{
/// <summary>
/// 依赖注入容器
/// </summary>
/// <param name="context"></param>
public override void ConfigureServices(ServiceConfigurationContext context)
{
var basePath = AppContext.BaseDirectory;
var configuration = context.Services.GetConfiguration();
//引入配置文件
var _config = new ConfigurationBuilder()
.SetBasePath(basePath)
.AddJsonFile("appsettings.json", optional: true, reloadOnChange: true)
.Build();
// Add services to the container.
context.Services.AddControllers();
#region 添加 CAP
context.Services.AddCap(x =>
{
x.UseMySql(_config.GetConnectionString("Default"));//指定CAP的发布数据库地址
x.UseRabbitMQ(c => {
c.HostName = _config["RabbitMQ:Host"];
c.UserName = _config["RabbitMQ:User"];
c.Password = _config["RabbitMQ:Pwd"];
});//
x.FailedRetryCount = 10;//
x.FailedRetryInterval = 60;//
x.FailedThresholdCallback = failed => {
Console.WriteLine($"MessageType {
failed.MessageType} 失败了, 重试了 {
x.FailedRetryCount} 次, 消息名称: {
failed.Message.GetName()}");
};
#region 注册Consul可视化
//旧:DiscoveryOptions;新:ConsulDiscoveryOptions
//DiscoveryOptions discoveryOptions = new DiscoveryOptions();
//this.Configuration.Bind(discoveryOptions);
//x.UseDashboard();
//ConsulDiscoveryOptions discoveryOptions = new ConsulDiscoveryOptions();
//configuration.Bind(discoveryOptions);
//x.UseConsulDiscovery(d =>
//{
// d.DiscoveryServerHostName = discoveryOptions.DiscoveryServerHostName;
// d.DiscoveryServerPort = discoveryOptions.DiscoveryServerPort;
// d.CurrentNodeHostName = discoveryOptions.CurrentNodeHostName;
// d.CurrentNodePort = discoveryOptions.CurrentNodePort;
// d.NodeId = discoveryOptions.NodeId;
// d.NodeName = discoveryOptions.NodeName;
// d.MatchPath = discoveryOptions.MatchPath;
//});
#endregion
});
#endregion
#region 添加swagger注释
context.Services.AddSwaggerGen(c =>
{
c.SwaggerDoc("v1", new OpenApiInfo
{
Version = "v1",
Title = "LAbpVnext.UserApi"
});
var xmlPath = Path.Combine(basePath, "LAbpVnext.UserApi.xml");
c.IncludeXmlComments(xmlPath, true);
//var xmlDomainPath = Path.Combine(basePath, "HuaWeiServer.Domain.xml");
//c.IncludeXmlComments(xmlDomainPath, true);
c.AddSecurityDefinition("Bearer", new OpenApiSecurityScheme
{
Description = "Value: Bearer {token}",
Name = "Authorization",
In = ParameterLocation.Header,
Type = SecuritySchemeType.ApiKey,
Scheme = "Bearer"
});
c.AddSecurityRequirement(new OpenApiSecurityRequirement()
{
{
new OpenApiSecurityScheme
{
Reference = new OpenApiReference
{
Type = ReferenceType.SecurityScheme,
Id = "Bearer"
},Scheme = "oauth2",Name = "Bearer",In = ParameterLocation.Header,
},new List<string>()
}<