1 分布式异步的问题
对于一个业务线的处理,如果是一个完整的处理,应该是消息正 常进入队列,同时消息正常被消费掉。
问题来了:
生产者发送消息,在传输过程中,消息丢失了,咋办?
消息发到RabbitMq队列,RabbitMq宕机了,咋办?
消费者在消费消息的时候,消费异常了,咋办?
方案思路
1、要保证消息一定能够正常的发到队列中去。
2、要保证入队的消息,一定不能丢失。
3、要保证一定是正常消费的消息,才会从队列中删除。
2 消息入队-确认-Confirm模式
Confirm方式有三种模式: 1、普通Confirm模式 2、批量Confirm模式 3、异步Confirm模式
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
#region 声明路由和队列
channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true,
exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(type: ExchangeType.Fanout,
exchange: "EnteringQueueTransExChange", durable: true,
autoDelete: false, arguments: null);
channel.QueueBind(queue: "EnteringQueueTrans",
exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
#endregion
//开启Confirm模式
channel.ConfirmSelect();
try
{
for (int i = 1; i <= 10; i++)
{
string msg = $"Confirm 模式===消息入队确认=={i}";
byte[] bytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
}
//WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
bool isOk = channel.WaitForConfirms();
if (isOk)//判断 channel.WaitForConfirms() 是否已经发送到队列中去---返回bool值
{
Console.WriteLine("消息已发送~~");
}
//执行这句话---如果成功了, 就成功了,如果失败,就抛出异常
channel.WaitForConfirmsOrDie();
}
catch (Exception)
{
//如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
// 配置重试下
throw;
}
finally
{
//关闭通道
channel.Close();
connection.Close();
}
}
}
3 消息入队-确认-事务支持
channel.txSelect(): 将当前信道设置成事务模式
channel.txCommit(): 用于提交事务
channel.txRollback(): 用于回滚事务
通过事务实现机制,只有消息成功被rabbitmq服务器接收, 事务才能提交成功,否则便可在捕获异常之后进行回滚,然后进行消息重发。
但是事务非常影响rabbitmq的性能。还有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息。
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
#region 声明路由和队列
channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true,
exclusive: false, autoDelete: false, arguments: null);
channel.ExchangeDeclare(type: ExchangeType.Fanout,
exchange: "EnteringQueueTransExChange", durable: true,
autoDelete: false, arguments: null);
channel.QueueBind(queue: "EnteringQueueTrans",
exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
#endregion
//将信道设置为事务模式
channel.TxSelect();
try
{
for (int i = 1; i <= 10; i++)
{
string msg = $"事务模式===消息入队确认=={i}";
byte[] bytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
}
//事务提交 前面写入的消息,一定是在这句话成功执行难后,才会写入到队列中去
channel.TxCommit();
}
catch (Exception)
{
//如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
channel.TxRollback();
throw;
}
finally
{
//关闭通道
channel.Close();
connection.Close();
}
}
}
4 消息入队后—消息持久化
在前边已经介绍了exchange和queue的持久化,把exchange和 queue的durable属性设置为true.
exchange和queue也会恢复。我们需要注意的是:如果queue设置 durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内 的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法 很简单:将exchange和queue都设置durable=true,然后在消息发 布的时候设置persistent=true即可。
服务器宕机,出现异常的时候, 消息固化到硬盘上----硬盘存储是 可以断电的;
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码
factory.Port = 5672;
using (var connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
#region 声明路由和队列
//支持持久化队列:durable: true
channel.QueueDeclare(queue: "PersistenceQueue", durable: true,
exclusive: false, autoDelete: false, arguments: null);
//支持持久化交换机durable: true
channel.ExchangeDeclare(type: ExchangeType.Fanout,
exchange: "PersistenceQueueExChange", durable: true,
autoDelete: false, arguments: null);
channel.QueueBind(queue: "PersistenceQueue",
exchange: "PersistenceQueueExChange", routingKey: string.Empty);
#endregion
//表达发送的是持久化消息
var props = channel.CreateBasicProperties();
props.Persistent = true; //配置的这句话就是--消息是可以支持持久化的
for (int i = 1; i <= 1000; i++)
{
string msg = $"持久化消息--持久化队列===消息入队确认=={i}";
byte[] bytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish("PersistenceQueueExChange", string.Empty, props, bytes);
}
}
}