当前位置: 首页 > article >正文

消息队列高手课总结笔记——基础篇速通

第一章,整体认识

一,解决的问题(使用场景)

主要:
1,异步处理
2,流量控制
3,服务解耦
补充:
1,作为发布/订阅系统实现一个微服务级系统间的观察者模式;
2,连接流计算任务和数据;
3,用于将消息广播给大量接收者。 

消息队列的问题和局限性:
1,延迟问题
2,增加了系统复杂度
3,可能产生数据不一致
所以说,没有最好的架构,只有最适合的架构

二,有哪些常用的消息队列

1,如果说,消息队列并不是你将要构建系统的主角之一,你对消息队列功能和性能都没有很高的要求,只需要一个开箱即用易于维护的产品,我建议你使用RabbitMQ。
2,如果你的系统使用消息队列主要场景是处理在线业务,比如在交易系统中用消息队列传递订单,那RocketMQ的低延迟和金融级的稳定性是你需要的。
3,如果你需要处理海量的消息,像收集日志、监控信息或是前端的埋点这类数据,或是你的应用场景大量使用了大数据、流计算相关的开源产品,那Kafka是最适合你的消息队列。
不过很多大厂都会自己根据业务特点改良,自研内部的消息队列,比如美团的makfa,京东的JMQ,蚂蚁的SOFAMQ、MsgBroker。


三,消息模型

每种消息队列都有自己的一套消息模型,像队列(Queue)、主题(Topic)或是分区(Partition)这些名词概念,在每个消息队列模型中都会涉及一些,含义还不太一样。


早期的消息队列是按照队列的数据结构设计的:

但这不能支持多个生产者,如果为每个消费者都创建一个队列,让生产者发送多份,既浪费资源,又违背了解耦的初衷,于是演化出了发布-订阅模型:

现代的消息队列产品大多是这个消息模型。

①RabbitMQ的消息模型

它是少数的依然使用队列模型的产品之一,通过Exchange上配置的策略来决定将消息投递到哪些队列中,或者发送到多个队列。

②RocketMQ的消息模型

讲完了RabbitMQ的消息模型,我们再来看看RocketMQ。RocketMQ使用的消息模型是标准的发布-订阅模型,在RocketMQ的术语表中,生产者、消费者和主题与我在上面讲的发布-订阅模型中的概念是完全一样的。
但是,在RocketMQ也有队列(Queue)这个概念,并且队列在RocketMQ中是一个非常重要的概念,那队列在RocketMQ中的作用是什么呢?这就要从消息队列的消费机制说起。
几乎所有的消息队列产品都使用一种非常朴素的“请求-确认”机制,确保消息不会在传递过程中由于网络或服务器故障丢失。具体的做法也非常简单。在生产端,生产者先将消息发送给服务端,也就是Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。
如果生产者没有收到服务端的确认或者收到失败的响应,则会重新发送消息;在消费端,消费者在收到消息并完成自己的消费业务逻辑(比如,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,否则它会给消费者重新发送这条消息,直到收到对应的消费成功确认。
这个确认机制很好地保证了消息传递过程中的可靠性,但是,引入这个机制在消费端带来了一个不小的问题。什么问题呢?为了确保消息的有序性,在某一条消息被成功消费之前,下一条消息是不能被消费的,否则就会出现消息空洞,违背了有序性这个原则。
也就是说,每个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就没法通过水平扩展消费者的数量来提升消费端总体的消费性能。为了解决这个问题,RocketMQ在主题下面增加了队列的概念。
每个主题包含多个队列,通过多个队列来实现多实例并行生产和消费。需要注意的是,RocketMQ只在队列上保证消息的有序性,主题层面是无法保证消息的严格顺序的。
RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被Consumer Group1消费过,也会再给Consumer Group2消费。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。如果一条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。
在Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。这个消费位置是非常重要的概念,我们在使用消息队列的时候,丢消息的原因大多是由于消费位置处理不当导致的。
RocketMQ的消息模型中,比较关键的概念就是这些了。为了便于你理解,我画了下面这张图:

③Kafka的消息模型

我们再来看看另一种常见的消息队列Kafka,Kafka的消息模型和RocketMQ是完全一样的,我刚刚讲的所有RocketMQ中对应的概念,和生产消费过程中的确认机制,都完全适用于Kafka。唯一的区别是,在Kafka中,队列这个概念的名称不一样,Kafka中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

第二章,常见问题

一,如何实现分布式事务

在实际应用中,比较常见的分布式事务实现有2PC(Two-phase Commit,也叫二阶段提交)、TCC(Try-Confirm-Cancel)和事务消息。每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。
事务消息适用的场景主要是那些需要异步更新数据,并且对数据实时性要求不太高的场景。比如我们在开始时提到的那个例子,在创建订单后,如果出现短暂的几秒,购物车里的商品没有被及时清空,也不是完全不可接受的,只要最终购物车的数据和订单数据保持一致就可以了。

消息队列是如何实现分布式事务的?

事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能。
回到订单和购物车这个例子,我们一起来看下如何用消息队列来实现分布式事务。

首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。
如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka和RocketMQ给出了2种不同的解决方案。
Kafka的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ则给出了另外一种解决方案。

RocketMQ中的分布式事务实现

在RocketMQ中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。
在我们这个例子中,反查本地事务的逻辑也很简单,我们只要根据消息中的订单ID,在订单库中查询这个订单是否存在即可,如果订单存在则返回成功,否则返回失败。RocketMQ会自动根据事务反查的结果提交或者回滚事务消息。
这个反查本地事务的实现,并不依赖消息的发送方,也就是订单服务的某个实例节点上的任何数据。这种情况下,即使是发送事务消息的那个订单服务节点宕机了,RocketMQ依然可以通过其他订单服务的节点来执行反查,确保事务的完整性。
综合上面讲的通用事务消息的实现和RocketMQ的事务反查机制,使用RocketMQ事务消息功能实现分布式事务的流程如下图:

二,如何确保消息不会丢失

检测消息丢失的方法

我们说,用消息队列最尴尬的情况不是丢消息,而是消息丢了还不知道。一般而言,一个新的系统刚刚上线,各方面都不太稳定,需要一个磨合期,这个时候,特别需要监控到你的系统中是否有消息丢失的情况。
如果是IT基础设施比较完善的公司,一般都有分布式链路追踪系统,使用类似的追踪系统可以很方便地追踪每一条消息。如果没有这样的追踪系统,这里我提供一个比较简单的方法,来检查是否有消息丢失的情况。
我们可以利用消息队列的有序性来验证是否有消息丢失。原理非常简单,在Producer端,我们给每个发出的消息附加一个连续递增的序号,然后在Consumer端来检查这个序号的连续性。
如果没有消息丢失,Consumer收到消息的序号必然是连续递增的,或者说收到的消息,其中的序号必然是上一条消息的序号+1。如果检测到序号不连续,那就是丢消息了。还可以通过缺失的序号来确定丢失的是哪条消息,方便进一步排查原因。
大多数消息队列的客户端都支持拦截器机制,你可以利用这个拦截器机制,在Producer发送消息之前的拦截器中将序号注入到消息中,在Consumer收到消息的拦截器中检测序号的连续性,这样实现的好处是消息检测的代码不会侵入到你的业务代码中,待你的系统稳定后,也方便将这部分检测的逻辑关闭或者删除。
如果是在一个分布式系统中实现这个检测方法,有几个问题需要你注意。
首先,像Kafka和RocketMQ这样的消息队列,它是不保证在Topic上的严格顺序的,只能保证分区上的消息是有序的,所以我们在发消息的时候必须要指定分区,并且,在每个分区单独检测消息序号的连续性。
如果你的系统中Producer是多实例的,由于并不好协调多个Producer之间的发送顺序,所以也需要每个Producer分别生成各自的消息序号,并且需要附加上Producer的标识,在Consumer端按照每个Producer分别来检测序号的连续性。
Consumer实例的数量最好和分区数量一致,做到Consumer和分区一一对应,这样会比较方便地在Consumer内检测消息序号的连续性。

确保消息可靠传递

讲完了检测消息丢失的方法,接下来我们一起来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。
你可以看下这个图,一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。

1. 生产阶段

在这个阶段,从消息在Producer创建出来,经过网络传输发送到Broker端。 •存储阶段: 在这个阶段,消息在Broker端存储,如果是集群,消息会在这个阶段被复制到其他的副本上。 •消费阶段: 在这个阶段,Consumer从Broker上拉取消息,经过网络传输发送到Consumer上。 1. 生产阶段
在生产阶段,消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。
只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
你在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以Kafka为例,我们看一下如何可靠地发送消息:
同步发送时,只要注意捕获异常即可。

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("消息发送成功。");
} catch (Throwable e) {
    System.out.println("消息发送失败!");
    System.out.println(e);
}

异步发送时,则需要在回调方法里进行检查。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("消息发送成功。");
    } else {
        System.out.println("消息发送失败!");
        System.out.println(exception);
    }
});
2. 存储阶段

在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
如果对消息的可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。
对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。例如,在RocketMQ中,需要将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘。
如果是Broker是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2个以上的节点,再给客户端回复发送确认响应。这样当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失。后面我会专门安排一节课,来讲解在集群模式下,消息队列是如何通过消息复制来确保消息的可靠性的。

3. 消费阶段

消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
你在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。

小结
这个过程可以分为分三个阶段,每个阶段都需要正确的编写代码并且设置正确的配置项,才能配合消息队列的可靠性机制,确保消息不会丢失。

•在生产阶段,你需要捕获消息发送的错误,并重发消息。

•在存储阶段,你可以通过配置刷盘和复制相关的参数,让消息写入到多个副本的磁盘上,来确保消息不会因为某个Broker宕机或者磁盘损坏而丢失。

•在消费阶段,你需要在处理完全部消费业务逻辑之后,再发送消费确认。 你在理解了这几个阶段的原理后,如果再出现丢消息的情况,应该可以通过在代码中加一些日志的方式,很快定位到是哪个阶段出了问题,然后再进一步深入分析,快速找到问题原因。

三,如何处理消费过程中的重复消息?


主要思路是用幂等性解决重复消息问题。


1. 利用数据库的唯一约束实现幂等

这个限制实现的方法非常多,最简单的是我们在数据库中建一张转账流水表,这个表有三个字段:转账单ID、账户ID和变更金额,然后给转账单ID和账户ID这两个字段联合起来创建一个唯一约束,这样对于相同的转账单ID和账户ID,表里至多只能存在一条记录。


2. 为更新的数据设置前置条件

比如,刚刚我们说过,“将账户X的余额增加100元”这个操作并不满足幂等性,我们可以把这个操作加上一个前置条件,变为:“如果账户X当前的余额为500元,将余额加100元”,这个操作就具备了幂等性。对应到消息队列中的使用时,可以在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变更操作。
但是,如果我们要更新的数据不是数值,或者我们要做一个比较复杂的更新操作怎么办?用什么作为前置判断条件呢?更加通用的方法是,给你的数据增加一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,如果不一致就拒绝更新数据,更新数据的同时将版本号+1,一样可以实现幂等更新。


3. 记录并检查操作

如果上面提到的两种实现幂等方法都不能适用于你的场景,我们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操作,也称为“Token机制或者GUID(全局唯一ID)机制”,实现的思路特别简单:在执行数据更新操作之前,先检查一下是否执行过这个更新操作。
具体的实现方法是,在发送消息时,给每条消息指定一个全局唯一的ID,消费时,先根据这个ID检查这条消息是否有被消费过,如果没有消费过,才更新数据,然后将消费状态置为已消费。
原理和实现是不是很简单?其实一点儿都不简单,在分布式系统中,这个方法其实是非常难实现的。首先,给每个消息指定一个全局唯一的ID就是一件不那么简单的事儿,方法有很多,但都不太好同时满足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,然后更新数据并且设置消费状态”中,三个操作必须作为一组操作保证原子性,才能真正实现幂等,否则就会出现Bug。
比如说,对于同一条消息:“全局ID为8,操作为:给ID为666账户增加100元”,有可能出现这样的情况:

•t0时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“账户增加100元”;

•t1时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,因为这个时刻,Consumer A还未来得及更新消息执行状态。 这样就会导致账户被错误地增加了两次100元,这是一个在分布式系统中非常容易犯的错误,一定要引以为戒。

对于这个问题,当然我们可以用事务来实现,也可以用锁来实现,但是在分布式系统中,无论是分布式事务还是分布式锁都是比较难解决问题。

四,消息积压了该如何处理?

先讲讲优化性能来避免消息积压
在使用消息队列的系统中,对于性能的优化,主要体现在生产者和消费者这一收一发两部分的业务逻辑中。对于消息队列本身的性能,你作为使用者,不需要太关注。为什么这么说呢?
主要原因是,对于绝大多数使用消息队列的业务来说,消息队列本身的处理能力要远大于业务系统的处理能力。主流消息队列的单个节点,消息收发的性能可以达到每秒钟处理几万至几十万条消息的水平,还可以通过水平扩展Broker的实例数成倍地提升处理能力。
而一般的业务系统需要处理的业务逻辑远比消息队列要复杂,单个节点每秒钟可以处理几百到几千次请求,已经可以算是性能非常好的了。所以,对于消息队列的性能优化,我们更关注的是,在消息的收发两端,我们的业务代码怎么和消息队列配合,达到一个最佳的性能。

1. 发送端性能优化

发送端业务代码的处理性能,实际上和消息队列的关系不大,因为一般发送端都是先执行自己的业务逻辑,最后再发送消息。如果说,你的代码发送消息的性能上不去,你需要优先检查一下,是不是发消息之前的业务逻辑耗时太多导致的。
对于发送消息的业务逻辑,只需要注意设置合适的并发和批量大小,就可以达到很好的发送性能。为什么这么说呢?
我们之前的课程中讲过Producer发送消息的过程,Producer发消息给Broker,Broker收到消息后返回确认响应,这是一次完整的交互。假设这一次交互的平均时延是1ms,我们把这1ms的时间分解开,它包括了下面这些步骤的耗时:

•发送端准备数据、序列化消息、构造请求等逻辑的时间,也就是发送端在发送网络请求之前的耗时;

•发送消息和返回响应在网络传输中的耗时; •Broker处理消息的时延。 如果是单线程发送,每次只发送1条消息,那么每秒只能发送 1000ms / 1ms * 1条/ms = 1000条 消息,这种情况下并不能发挥出消息队列的全部实力。
无论是增加每次发送消息的批量大小,还是增加并发,都能成倍地提升发送性能。至于到底是选择批量发送还是增加并发,主要取决于发送端程序的业务性质。简单来说,只要能够满足你的性能要求,怎么实现方便就怎么实现。
比如说,你的消息发送端是一个微服务,主要接受RPC请求处理在线业务。很自然的,微服务在处理每次请求的时候,就在当前线程直接发送消息就可以了,因为所有RPC框架都是多线程支持多并发的,自然也就实现了并行发送消息。并且在线业务比较在意的是请求响应时延,选择批量发送必然会影响RPC服务的时延。这种情况,比较明智的方式就是通过并发来提升发送性能。
如果你的系统是一个离线分析系统,离线系统在性能上的需求是什么呢?它不关心时延,更注重整个系统的吞吐量。发送端的数据都是来自于数据库,这种情况就更适合批量发送,你可以批量从数据库读取数据,然后批量来发送消息,同样用少量的并发就可以获得非常高的吞吐量。


2. 消费端性能优化

使用消息队列的时候,大部分的性能问题都出现在消费端,如果消费的速度跟不上发送端生产消息的速度,就会造成消息积压。如果这种性能倒挂的问题只是暂时的,那问题不大,只要消费端的性能恢复之后,超过发送端的性能,那积压的消息是可以逐渐被消化掉的。
要是消费速度一直比生产速度慢,时间长了,整个系统就会出现问题,要么,消息队列的存储被填满无法提供服务,要么消息丢失,这对于整个系统来说都是严重故障。
所以,我们在设计系统的时候,一定要保证消费端的消费性能要高于生产端的发送性能,这样的系统才能健康的持续运行。
消费端的性能优化除了优化消费业务逻辑以外,也可以通过水平扩容,增加消费端的并发数来提升总体的消费性能。特别需要注意的一点是,在扩容Consumer的实例数量的同时,必须同步扩容主题中的分区(也叫队列)数量,确保Consumer的实例数和分区数量是相等的。如果Consumer的实例数量超过分区数量,这样的扩容实际上是没有效果的。原因我们之前讲过,因为对于消费者来说,在每个分区上实际上只能支持单线程消费。
我见到过很多消费程序,他们是这样来解决消费慢的问题的:

它收消息处理的业务逻辑可能比较慢,也很难再优化了,为了避免消息积压,在收到消息的OnMessage方法中,不处理任何业务逻辑,把这个消息放到一个内存队列里面就返回了。然后它可以启动很多的业务线程,这些业务线程里面是真正处理消息的业务逻辑,这些线程从内存队列里取消息处理,这样它就解决了单个Consumer不能并行消费的问题。
这个方法是不是很完美地实现了并发消费?请注意,这是一个非常常见的错误方法! 为什么错误?因为会丢消息。如果收消息的节点发生宕机,在内存队列中还没来及处理的这些消息就会丢失。关于“消息丢失”问题,你可以回顾一下《如何确保消息不会丢失》。


还有一种消息积压的情况是,日常系统正常运转的时候,没有积压或者只有少量积压很快就消费掉了,但是某一个时刻,突然就开始积压消息并且积压持续上涨。这种情况下需要你在短时间内找到消息积压的原因,迅速解决问题才不至于影响业务。
要么是发送变快了,要么是消费变慢了。
大部分消息队列都内置了监控的功能,只要通过监控数据,很容易确定是哪种原因。如果是单位时间发送的消息增多,比如说是赶上大促或者抢购,短时间内不太可能优化消费端的代码来提升消费性能,唯一的方法是通过扩容消费端的实例数来提升总体的消费能力。
如果短时间内没有足够的服务器资源进行扩容,没办法的办法是,将系统降级,通过关闭一些不重要的业务,减少发送方发送的数据量,最低限度让系统还能正常运转,服务一些重要业务。

答疑解惑

详解RocketMQ和Kafka的消息模型

假设有一个主题MyTopic,我们为主题创建5个队列,分布到2个Broker中。

先说消息生产这一端,假设我们有3个生产者实例:Produer0,Produer1和Producer2。
这3个生产者是如何对应到2个Broker的,又是如何对应到5个队列的呢?这个很简单,不用对应,随便发。每个生产者可以在5个队列中轮询发送,也可以随机选一个队列发送,或者只往某个队列发送,这些都可以。比如Producer0要发5条消息,可以都发到队列Q0里面,也可以5个队列每个队列发一条。
然后说消费端,很多同学没有搞清楚消费组、消费者和队列这几个概念的对应关系。
每个消费组就是一份订阅,它要消费主题MyTopic下,所有队列的全部消息。注意,队列里的消息并不是消费掉就没有了,这里的“消费”,只是去队列里面读了消息,并没有删除,消费完这条消息还是在队列里面。
多个消费组在消费同一个主题时,消费组之间是互不影响的。比如我们有2个消费组:G0和G1。G0消费了哪些消息,G1是不知道的,也不用知道。G0消费过的消息,G1还可以消费。即使G0积压了很多消息,对G1来说也没有任何影响。
然后我们再说消费组的内部,一个消费组中可以包含多个消费者的实例。比如说消费组G1,包含了2个消费者C0和C1,那这2个消费者又是怎么和主题MyTopic的5个队列对应的呢?
由于消费确认机制的限制,这里面有一个原则是,在同一个消费组里面,每个队列只能被一个消费者实例占用。至于如何分配,这里面有很多策略,我就不展开说了。总之保证每个队列分配一个消费者就行了。比如,我们可以让消费者C0消费Q0,Q1和Q2,C1消费Q3和Q4,如果C0宕机了,会触发重新分配,这时候C1同时消费全部5个队列。
再强调一下,队列占用只是针对消费组内部来说的,对于其他的消费组来说是没有影响的。比如队列Q2被消费组G1的消费者C1占用了,对于消费组G2来说,是完全没有影响的,G2也可以分配它的消费者来占用和消费队列Q2。
最后说一下消费位置,每个消费组内部维护自己的一组消费位置,每个队列对应一个消费位置。消费位置在服务端保存,并且,消费位置和消费者是没有关系的。每个消费位置一般就是一个整数,记录这个消费组中,这个队列消费到哪个位置了,这个位置之前的消息都成功消费了,之后的消息都没有消费或者正在消费。
我把咱们这个例子的消费位置整理成下面的表格,便于你理解。

你可以看到,这个表格中并没有消费者这一列,也就是说消费者和消费位置是没有关系的。

如何实现单个队列的并行消费?

如果不要求严格顺序,如何实现单个队列的并行消费?关于这个问题,有很多的实现方式,在JMQ(京东自研的消息队列产品)中,它实现的思路是这样的。
比如说,队列中当前有10条消息,对应的编号是0-9,当前的消费位置是5。同时来了三个消费者来拉消息,把编号为5、6、7的消息分别给三个消费者,每人一条。过了一段时间,三个消费成功的响应都回来了,这时候就可以把消费位置更新为8了,这样就实现并行消费。
这是理想的情况。还有可能编号为6、7的消息响应回来了,编号5的消息响应一直回不来,怎么办?这个位置5就是一个消息空洞。为了避免位置5把这个队列卡住,可以先把消费位置5这条消息,复制到一个特殊重试队列中,然后依然把消费位置更新为8,继续消费。再有消费者来拉消息的时候,优先把重试队列中的那条消息给消费者就可以了。
这是并行消费的一种实现方式。需要注意的是,并行消费开销还是很大的,不应该作为一个常规的,提升消费并发的手段,如果消费慢需要增加消费者的并发数,还是需要扩容队列数。

如何保证消息的严格顺序?

很多同学在留言中问,怎么来保证消息的严格顺序?我们多次提到过,主题层面是无法保证严格顺序的,只有在队列上才能保证消息的严格顺序。

如果说,你的业务必须要求全局严格顺序,就只能把消息队列数配置成1,生产者和消费者也只能是一个实例,这样才能保证全局严格顺序。

大部分情况下,我们并不需要全局严格顺序,只要保证局部有序就可以满足要求了。比如,在传递账户流水记录的时候,只要保证每个账户的流水有序就可以了,不同账户之间的流水记录是不需要保证顺序的。

如果需要保证局部严格顺序,可以这样来实现。在发送端,我们使用账户ID作为Key,采用一致性哈希算法计算出队列编号,指定队列来发送消息。一致性哈希算法可以保证,相同Key的消息总是发送到同一个队列上,这样可以保证相同Key的消息是严格有序的。如果不考虑队列扩容,也可以用队列数量取模的简单方法来计算队列编号。


http://www.kler.cn/a/537338.html

相关文章:

  • git代理设置
  • 区块链技术:Facebook 重塑社交媒体信任的新篇章
  • C++版本DES加密/解密
  • AI驱动的智能流程自动化是什么
  • Linux 内核模块 | 加载 / 添加 / 删除 / 优先级
  • 自动化测试、压力测试、持续集成
  • 初始JavaEE篇 —— Spring Web MVC入门(下)
  • 详解SQLAlchemy的函数relationship
  • .net的一些知识点6
  • 【ESP32cam人脸识别开门及服务器端实战源码】
  • 回退 android studio emulator 的版本
  • 51单片机之使用Keil uVision5创建工程以及使用stc-isp进行程序烧录步骤
  • 【测试开发】Python+Django实现接口测试工具
  • docker 网络详解
  • 基于 llama-Factory 动手实践 Llama 全参数 SFT 和 LoRA SFT
  • 【C++】C++对C语言的扩充
  • 台湾精锐APEX减速机在半导体制造设备中的应用案例
  • matlab simulink 三级倒立摆LQR控制
  • 【GoLang】切片的面试知识点
  • 【Python深入浅出】Python3中os模块:开启系统交互的万能钥匙
  • 【Spring Boot】网页五子棋项目中遇到的困难及解决方法
  • 进阶数据结构——链式前向星
  • k8s集群外exporter怎么使用Prometheus监控
  • SQLAlchemy-2.0中模型定义和alembic的数据库迁移工具
  • 大语言模型遇上自动驾驶:AsyncDriver如何巧妙解决推理瓶颈?
  • 某咨询大数据解决方案介绍(32页PPT)