当前位置: 首页 > article >正文

RabbitMQ-消息入队

1 分布式异步的问题

对于一个业务线的处理,如果是一个完整的处理,应该是消息正 常进入队列,同时消息正常被消费掉。

问题来了:

生产者发送消息,在传输过程中,消息丢失了,咋办?

消息发到RabbitMq队列,RabbitMq宕机了,咋办?

消费者在消费消息的时候,消费异常了,咋办?

方案思路

1、要保证消息一定能够正常的发到队列中去。

2、要保证入队的消息,一定不能丢失。

3、要保证一定是正常消费的消息,才会从队列中删除。

2 消息入队-确认-Confirm模式

Confirm方式有三种模式: 1、普通Confirm模式 2、批量Confirm模式 3、异步Confirm模式

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 
        channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true, 
            exclusive: false, autoDelete: false, arguments: null);
        channel.ExchangeDeclare(type: ExchangeType.Fanout, 
            exchange: "EnteringQueueTransExChange", durable: true, 
            autoDelete: false, arguments: null);
        channel.QueueBind(queue: "EnteringQueueTrans",
            exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
        #endregion

        //开启Confirm模式
        channel.ConfirmSelect();
        try
        {
            for (int i = 1; i <= 10; i++)
            {
                string msg = $"Confirm 模式===消息入队确认=={i}";
                byte[] bytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
            }

            //WaitForConfirms确认消息(可以同时确认多条消息)是否发送成功
            bool isOk = channel.WaitForConfirms();
            if (isOk)//判断 channel.WaitForConfirms() 是否已经发送到队列中去---返回bool值
            {
                Console.WriteLine("消息已发送~~");
            }
            //执行这句话---如果成功了, 就成功了,如果失败,就抛出异常 
            channel.WaitForConfirmsOrDie();
        }
        catch (Exception)
        {
            //如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
            // 配置重试下
            throw;
        }
        finally
        {
            //关闭通道
            channel.Close();
            connection.Close();
        }

    }
}

3 消息入队-确认-事务支持

channel.txSelect(): 将当前信道设置成事务模式

channel.txCommit(): 用于提交事务

channel.txRollback(): 用于回滚事务

通过事务实现机制,只有消息成功被rabbitmq服务器接收, 事务才能提交成功,否则便可在捕获异常之后进行回滚,然后进行消息重发。

但是事务非常影响rabbitmq的性能。还有就是事务机制是阻塞的过程,只有等待服务器回应之后才会处理下一条消息。

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 
        channel.QueueDeclare(queue: "EnteringQueueTrans", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        channel.ExchangeDeclare(type: ExchangeType.Fanout, 
            exchange: "EnteringQueueTransExChange", durable: true,
            autoDelete: false, arguments: null);

        channel.QueueBind(queue: "EnteringQueueTrans",
            exchange: "EnteringQueueTransExChange",routingKey: string.Empty);
        #endregion

        //将信道设置为事务模式
        channel.TxSelect();
        try
        {
            for (int i = 1; i <= 10; i++)
            {
                string msg = $"事务模式===消息入队确认=={i}";
                byte[] bytes = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish("EnteringQueueTransExChange", string.Empty, null, bytes);
            }
            //事务提交 前面写入的消息,一定是在这句话成功执行难后,才会写入到队列中去
            channel.TxCommit();  

        }
        catch (Exception)
        {
            //如果消息发送给交换机的过程出现异常,则捕捉并进行回滚
            channel.TxRollback();
            throw;
        }
        finally
        {
            //关闭通道
            channel.Close();
            connection.Close();
        }

    }
}

4 消息入队后—消息持久化

在前边已经介绍了exchange和queue的持久化,把exchange和 queue的durable属性设置为true.

exchange和queue也会恢复。我们需要注意的是:如果queue设置 durable=true,rabbitmq服务重启后队列虽然会存在,但是队列内 的消息会丢全部丢失。那么怎么实现消息的持久化呢?实现的方法 很简单:将exchange和queue都设置durable=true,然后在消息发 布的时候设置persistent=true即可。

服务器宕机,出现异常的时候, 消息固化到硬盘上----硬盘存储是 可以断电的;

ConnectionFactory factory = new ConnectionFactory();
factory.HostName = UrlConfig.Rabbitmq_Url;//RabbitMQ服务在本地运行
factory.UserName = UrlConfig.User;//用户名
factory.Password = UrlConfig.Password;//密码 
factory.Port = 5672;

using (var connection = factory.CreateConnection())
{
    using (IModel channel = connection.CreateModel())
    {
        #region 声明路由和队列 

        //支持持久化队列:durable: true
        channel.QueueDeclare(queue: "PersistenceQueue", durable: true,
            exclusive: false, autoDelete: false, arguments: null);

        //支持持久化交换机durable: true
        channel.ExchangeDeclare(type: ExchangeType.Fanout,
            exchange: "PersistenceQueueExChange", durable: true,
            autoDelete: false, arguments: null);

        channel.QueueBind(queue: "PersistenceQueue",
            exchange: "PersistenceQueueExChange", routingKey: string.Empty);
        #endregion

        //表达发送的是持久化消息
        var props = channel.CreateBasicProperties();
        props.Persistent = true;  //配置的这句话就是--消息是可以支持持久化的

        for (int i = 1; i <= 1000; i++)
        {
            string msg = $"持久化消息--持久化队列===消息入队确认=={i}";
            byte[] bytes = Encoding.UTF8.GetBytes(msg);
            channel.BasicPublish("PersistenceQueueExChange", string.Empty, props, bytes);
        }
    }
}

http://www.kler.cn/a/504205.html

相关文章:

  • Elasticsearch技术标准解析与实践案例
  • ip属地是根据手机号还是位置
  • 穷举vs暴搜vs深搜vs回溯vs剪枝系列一>优美的排列
  • 图解Git——分支开发工作流《Pro Git》
  • 分多个AndroidManifest.xml来控制项目编译
  • 《JavaWeb开发-javascript基础》
  • HarmonyOS NEXT应用开发边学边玩系列:从零实现一影视APP (二、首页轮播图懒加载的实现)
  • SQL刷题快速入门(二)
  • ClickHouse-CPU、内存参数设置
  • 在Linux系统中无网络安装Nginx并配置负载均衡
  • 41_Lua函数
  • uniapp小程序开发,配置开启小程序右上角三点的分享功能
  • 【搭建JavaEE】(1)maven仓库安装配置
  • Vue.js前端框架教程16:Element UI的el-dialog组件
  • WordEmbeddingPositionEmbedding
  • uni-app的学习
  • MySQL:内置函数
  • SQL Server 查看数据库表使用空间 系统表
  • 刀客doc:快手的商业化架构为什么又调了?
  • 6.1 MySQL数字函数和条件函数
  • 开源项目stable-diffusion-webui部署及生成照片
  • electron打包不成功,放置安装包到C盘缓存
  • 论文阅读:EasySplat: View-Adaptive Learning makes 3D Gaussian Splatting Easy
  • 了解如何学习自然语言处理技术
  • 企业级信息系统开发讲课笔记4.12 Spring Boot默认缓存管理
  • CHAIN OF RESPONSIBILITY(职责链)—对象行为型模式