当前位置: 首页 > article >正文

java八股文-消息队列

一、MQ基础篇

1. 什么是消息队列?

消息队列(MQ)是分布式系统中实现异步通信的中间件,解耦生产者和消费者。 

2. 使用场景有哪些?

  • 异步处理(如注册后发送邮件)
  • 系统解耦(不同服务通过MQ通信)
  • 流量削峰(应对突发流量)
  • 日志收集(如Kafka处理日志流)

3.消息队列的优缺点?

优点:解耦、异步、削峰、扩展性。
缺点:系统复杂度增加、消息一致性问题、运维成本高。 

4. BrokerProducerConsumerTopicPartition 分别是什么?

  •  Broker:消息队列服务器,负责接收、存储和转发消息。
  • Producer:消息生产者,负责创建并发送消息到Broker。
  • Consumer:消息消费者,从Broker订阅并拉取消息进行处理。
  • Topic:消息的逻辑分类单元(类似“频道”),Producer发送到指定Topic,Consumer订阅该Topic。
  • Partition:Topic的物理分片,每个Partition独立存储和有序,实现并行处理与扩展。

 二、主流MQ对比

1.Kafka vs RocketMQ vs RabbitMQ 区别?

特性KafkaRocketMQRabbitMQ
设计目标高吞吐、日志处理金融级可靠性企业级消息路由
吞吐量最高(百万级/秒)高(十万级/秒)中等(万级/秒)
消息顺序分区有序全局有序队列有序
事务消息不支持支持不支持
延迟消息不支持支持插件支持

2. 选型总结

  • Kafka 适用于高吞吐量、流式数据处理的场景。
  • RabbitMQ 适用于任务调度、企业级消息通信和复杂的消息路由。
  • RocketMQ 适用于电商、金融等高可用、高吞吐量的分布式系统。

3.Kafka 为什么不适合金融级业务?

1. 消息顺序性与事务一致性

  • 事务一致性:Kafka 本身并不直接支持分布式事务。在金融级业务中,通常需要确保多个消息处理的原子性和一致性(例如支付系统中的事务处理)。虽然 Kafka 支持消息的 "Exactly Once" 语义,但这通常需要额外的设计与配置,且依赖于外部的事务管理机制。

  • 消息顺序性:Kafka 在保证分区内的顺序消费方面表现较好,但如果需要跨多个分区处理消息的顺序性,Kafka 就不具备天然的支持。在一些金融系统中,特别是在需要严格的消息顺序保证的情况下,Kafka 可能无法完全满足需求。

2. 消息丢失与确认机制

  • 消息丢失风险:Kafka 的设计目标是高吞吐量,并通过日志存储来确保消息的持久化。然而,在极端情况下,如果系统出现故障,消息丢失的风险仍然存在。虽然 Kafka 提供了副本机制和持久化保证,但对于金融业务来说,尤其是在涉及资金流转等关键业务时,任何消息丢失都可能带来严重的后果。

  • 确认机制:Kafka 在消费端的确认机制(consumer ack)相对简单,依赖于 offset 管理。如果消费者没有正确地处理消息(如没有及时提交 offset),可能会导致重复消费或丢失消息,这在金融系统中是不可接受的。

3. 强一致性保证

  • 最终一致性:Kafka 的设计更多关注“最终一致性”而非强一致性。虽然 Kafka 可以保证高可用性,但它的设计并不适合要求强一致性的场景,尤其是在金融系统中,往往需要保证多个操作(如转账、支付)在同一时间点的一致性,而 Kafka 的最终一致性模型可能无法满足这类需求。

  • 数据延迟问题:在一些高频交易或实时结算的金融应用中,Kafka 的消息传递和处理延迟可能不符合严格的实时性要求,尤其是在高负载情况下。

4. 复杂的消息路由与支持

  • 路由机制的复杂性:Kafka 的消息发布/订阅模型相对简单,没有像 RabbitMQ 那样复杂的交换机、路由键和队列绑定机制。金融业务中,可能需要更细粒度的消息路由和控制,而 Kafka 在这方面的灵活性不如 RabbitMQ。

5. 缺乏完整的事务性支持

  • Kafka 本身并不内置对“事务性”的完整支持。在一些金融系统中,可能会有跨多个消息队列或不同业务组件的事务处理需求,确保所有操作要么全成功,要么全失败。虽然 Kafka 最近版本支持了事务性消息,但是它并没有像传统的数据库管理系统那样强大的事务支持,因此在一些严格的金融业务中可能无法完全满足需求。

总结:

Kafka 的高吞吐量和分布式特性使其在大数据流处理和实时日志分析等场景中非常出色,但它在处理金融级业务时可能存在一些问题,特别是与消息的强一致性、事务管理、可靠性和精确的顺序消费等方面的要求不完全匹配。在金融级别的应用中,通常更偏向于使用如 RabbitMQRocketMQ 这样的系统,它们提供更高的可靠性保证、更灵活的事务支持和更强的消息保障机

4. RocketMQ 的 消息存储结构?(CommitLog+ConsumeQueue)

1. CommitLog

  • 作用:CommitLog 是 RocketMQ 存储所有消息的主要日志文件,类似于 Kafka 的日志存储。所有消息都首先被写入 CommitLog,然后再根据需要将其消费到 ConsumeQueueIndex(可选)中。CommitLog 是消息持久化的核心组件,保证了消息的顺序和高吞吐量。

  • 特点

    • 顺序存储:CommitLog 中的消息是按顺序存储的,采用 append-only 的方式。这意味着新消息总是附加到文件的末尾,而不是插入到文件的中间,这样可以最大化磁盘的写入效率。
    • 文件分段:CommitLog 文件以固定大小(如 1GB 或 2GB)的日志文件进行切分。当文件达到指定大小时,会创建一个新的文件来存储后续的消息。
    • 消息格式:每条消息在 CommitLog 中都有一个固定的格式,包括消息的物理偏移量、消息体、消息属性等。

2. ConsumeQueue

  • 作用:ConsumeQueue 是用于高效消费消息的辅助结构,它将 CommitLog 中的消息按照消费者的消费需求进行组织。ConsumeQueue 是一种索引结构,存储每条消息在 CommitLog 中的位置(即偏移量),并为每个消费者队列提供消息消费的入口。

  • 特点

    • 索引结构:ConsumeQueue 中存储的是 CommitLog 中消息的物理偏移量和消息的相关信息(如消息大小、消息标识等)。消费者可以通过 ConsumeQueue 查找消息的位置,而不需要扫描整个 CommitLog 文件。
    • 高效消费:ConsumeQueue 帮助消费者快速定位消息,避免了对 CommitLog 的全局扫描,提高了消费效率。每个 Consumer Group 都有独立的 ConsumeQueue,用于追踪该 Consumer Group 对应的消息消费进度。
    • 固定长度:每条消息在 ConsumeQueue 中占用固定的空间(如 20 字节),存储 CommitLog 偏移量和其他元数据。这个结构使得 ConsumeQueue 可以非常快速地处理大量的消息。

 

 

 

三、消息队列设计原理 

1. 消息如何存储?(以Kafka为例)

  • 消息按Topic分区存储,每个分区是一个有序的日志文件(Segment)。
  • 通过顺序写入磁盘 + 零拷贝技术提升性能。

2. 推(Push)模式 vs 拉(Pull)模式? 

推模式(如RabbitMQ):服务端主动推消息,实时性高,但可能造成消费者压力。
拉模式(如Kafka):消费者主动拉取,可控性强,但可能有延迟。

四、消息可靠性

1. 如何保证消息不丢失?

  • 生产者:开启确认机制(如Kafka的ACK=all,RocketMQ的事务消息)。
  • Broker:消息持久化(磁盘存储)+ 多副本同步。
  • 消费者:手动提交Offset(避免自动提交导致丢失)。

2. 如何保证消息幂等性?

  • 生产者幂等:唯一ID(如RocketMQ的Message ID)。
  • 消费者幂等:数据库唯一键、Redis去重、乐观锁。

3. 消息发送失败后的 重试机制 如何设计?

 

在消息队列系统中,设计一个可靠的消息发送失败后的重试机制是确保消息最终能够被正确消费的关键。对于 RocketMQ 等消息队列系统,重试机制的设计涉及多个层面,包括如何检测消息发送失败、如何管理重试次数、如何防止消息的无限重试以及如何保证消息的幂等性。以下是一个设计消息发送失败后的重试机制的思路:

1. 失败检测机制

  • 网络问题:发送消息时,如果由于网络问题、服务不可用等原因导致消息无法被成功发送,首先需要进行重试。
  • 消息发送返回的状态码:在发送消息时,如果返回的是发送失败的状态码(例如 RocketMQ 的 SEND_FAILED 状态),需要执行重试操作。
  • 消息发送超时:如果消息发送过程中发生超时(如超出 Broker 规定的时间范围),也应该视为发送失败,并触发重试。

2. 重试策略

为了避免消息发送失败后无限重试,可以采用以下几种常见的重试策略:

a. 固定间隔重试

在这种策略下,消息发送失败后,系统会按照固定的时间间隔进行重试,直到达到最大重试次数。比如,可以设置每次重试的间隔为 1 秒、5 秒等。

  • 优点:实现简单,容易控制重试次数和间隔。
  • 缺点:固定间隔可能无法适应某些场景,例如网络拥堵或者服务负载过高时,固定间隔可能无法有效减轻压力。

b. 指数退避重试(Exponential Backoff)

指数退避是常见的重试策略。它在每次失败后会逐渐增加重试的时间间隔。例如,第一次重试等待 1 秒,第二次重试等待 2 秒,第三次重试等待 4 秒,依此类推。

  • 优点:通过渐增的时间间隔避免短时间内的过度重试,可以有效减轻系统负担,避免突发流量导致系统崩溃。
  • 缺点:需要设置合理的最大等待时间,避免重试等待时间过长。

c. 抖动(Jitter)

在重试过程中加入抖动,防止多个失败的客户端在同一时刻发起重试,造成集中请求。这通常与指数退避结合使用,使得每次重试的间隔随机化。

  • 优点:有效避免多客户端重试冲突,减轻负载。
  • 缺点:可能稍微增加系统的复杂性。

3. 最大重试次数

为了防止消息进入死循环,应该设置最大重试次数。超过最大重试次数后,消息应当被认为是“不可恢复”的,系统应当采取其他措施,如:

  • 死信队列:当消息经过最大重试次数后,可以将其转移到一个死信队列(Dead Letter Queue, DLQ),供人工检查或后续处理。
  • 报警机制:当消息达到最大重试次数并进入死信队列时,系统应当触发报警,通知相关人员进行问题排查。

4. 幂等性保障

在重试过程中,确保消息的幂等性非常重要。如果消息已经处理过,但由于网络故障等原因导致消息发送失败并被重试,重试时会导致重复消费。因此,在设计时要确保消息处理的幂等性。

  • 幂等操作:在消费端设计幂等操作,确保多次消费相同消息不会产生不同的结果。
  • 去重机制:可以在消息中加入唯一标识(如消息 ID、事务 ID 等),并在消费端维护去重记录,确保重复消息不会被再次消费。

5. 延迟队列机制

除了直接进行重试外,延迟队列也可以作为一种有效的补充机制。

  • 失败的消息可以被放入一个延迟队列中,经过一定的时间后再进行重新尝试。这种方法能够有效避免大量并发重试。
  • 通过设置延迟时间,可以让重试的消息“错开”其他消息的重试时机,从而避免集中重试的压力。

6. 多重备份机制

如果使用的是分布式消息队列(如 RocketMQ),可以采用消息备份机制。在某个 Broker 宕机或消息发送失败时,系统可以将消息备份到其他节点进行重新发送,从而增加消息的可靠性。

7. 全局重试管理

对于大规模分布式系统,通常会有一个全局的重试管理系统来控制所有消息的重试策略。这个系统会监控所有发送失败的消息,自动进行调度和重试,防止消息处理时出现瓶颈。

8. 日志与监控

  • 日志记录:每次消息发送失败、重试成功或者失败,系统都应该记录相关日志。通过日志,开发人员可以了解消息重试的情况,并进一步调优系统。
  • 监控与告警:系统需要对重试情况进行监控,设定阈值,当重试次数过多时触发报警,便于及时响应。

9. RocketMQ 中的重试机制

RocketMQ 本身也提供了一些内建的重试机制:

  • Producer 端的重试:在消息发送失败时,Producer 会根据配置的重试次数进行重试。可通过 retryTimesWhenSendFailed 配置来设置最大重试次数。
  • Consumer 端的重试:如果消费失败,RocketMQ 会将消息放回到消息队列,直到消息成功消费或者达到最大重试次数。

总结

设计一个消息发送失败后的重试机制时,关键是要确保以下几点:

  • 避免无限重试:通过最大重试次数或死信队列来避免。
  • 幂等性保障:保证即使消息被多次发送,系统的状态也不会受到不良影响。
  • 优化重试策略:采用指数退避、抖动等策略来减少系统的压力。
  • 监控与报警:对失败重试进行实时监控,及时处理异常情况。

4. 消费者宕机时,如何保证消息不丢失?(手动ACK+消费状态持久化)

  • 确保消息在队列中持久化,直到被消费者成功消费。
  • 使用重试机制、延迟队列和死信队列处理消费者宕机后的消息恢复。
  • 设计幂等性消费逻辑,避免重复消费导致的问题。
  • 通过消费者的高可用性和偏移量机制,确保消费者能够从断点恢复消费。

 

 五、消息顺序性

1. 如何保证消息顺序消费?

  • 全局有序:单分区/队列(牺牲并发)。
  • 分区有序:同一业务Key哈希到同一分区(如订单ID),消费者单线程处理该分区。

六、消息积压 & 延迟 

1. 消息积压如何处理?

  • 临时扩容:增加消费者实例或分区数。
  • 异步处理:降级非核心业务,批量消费。
  • 修复消费者:排查消费端性能问题。

2. 如何实现延迟消息?

  • RabbitMQ:通过死信队列(DLX)+ TTL实现。
  • RocketMQ:内置延迟级别(18种预设时间)。
  • Kafka:需自行实现时间轮或外部存储。

七、高可用 & 高性能 

1. Kafka的高可用设计?

  • 分区副本机制:每个分区有多个副本,Leader负责读写,Follower同步数据。
  • ISR机制:仅同步副本(In-Sync Replicas)参与选举,保障一致性。

2. 如何提升MQ性能?

  • 批量发送/压缩:减少网络IO。
  • 异步刷盘:Broker异步持久化消息。
  • 分片存储:水平扩展Topic分区数。

八、高频场景题

 

1.如何设计一个消息队列?

  • 核心组件:Producer、Broker(存储+路由)、Consumer。
  • 关键问题:网络协议、存储设计、高可用、消息可靠性。

2. MQ如何实现分布式事务?

  • 本地消息表:业务DB记录消息状态,定时任务补偿。
  • RocketMQ事务消息:两阶段提交(Half消息 + 回调检查)。

3. 订单超时未支付如何用延迟消息实现?

3.1. 设计需求分析

  • 场景:用户在一定时间内未完成支付,系统需要自动关闭订单。
  • 延迟时间:订单创建时设置一个超时时间(如 30 分钟),当超过该时间后,系统自动处理未支付的订单。
  • 处理操作:超时未支付的订单会被标记为“已超时”或直接关闭。

3.2. 延迟消息的基本原理

延迟消息是指将消息发送到消息队列后,系统会延迟一段时间才消费该消息。通常可以通过两种方式实现:

  • 定时消息:通过消息队列的延迟机制(如 Kafka、RocketMQ、RabbitMQ 等支持的定时消息功能)。
  • 延迟队列:通过将消息存放到一个专门的延迟队列中,等待指定的时间后再进行消费。

3.3. 使用延迟消息实现订单超时未支付

3.3.1: 定义延迟消息内容

每个订单都会发送一条延迟消息,这条消息包含:

  • 订单 ID:标识该订单。
  • 订单超时时间:订单创建时间 + 超时时长(例如 30 分钟后)。
  • 处理逻辑:如果超时未支付,进行相关处理(如关闭订单)。

3.3.2: 发送延迟消息

通过消息队列发送一条延迟消息,设置消息的延迟时间为订单超时的时长。

  • RocketMQ: RocketMQ 支持消息延迟,消息生产者可以通过设置 delayTimeLevel 来实现延迟。

    例如,设置消息延迟 30 分钟:

    Message msg = new Message("OrderTopic", "order_timeout_tag", "orderID", orderMessage.getBytes()); msg.setDelayTimeLevel(3); // 3代表30分钟后发送 producer.send(msg);

3.3.3: 消费延迟消息

消费者在延迟时间到达后,会消费这条消息。消费时需要检查该订单是否已经支付。

  • 如果订单支付成功,则消息被忽略,不做任何处理。
  • 如果订单仍未支付,则执行超时处理逻辑,例如关闭订单、发送通知等。

3.3.4: 超时处理

在消费延迟消息时,如果订单没有支付,系统可以执行以下操作:

  • 关闭订单:标记订单为超时关闭。
  • 发送提醒:提醒用户支付失败或订单已经超时。
  • 退回库存:如果订单超时,可能需要将商品库存恢复。
public void handleOrderTimeout(String orderId) { Order order = orderService.getOrder(orderId); if (order != null && !order.isPaid()) { orderService.closeOrder(orderId); // 关闭订单 inventoryService.restoreInventory(orderId); // 恢复库存 notificationService.sendTimeoutNotification(orderId); // 发送通知 } }

3.4. 示例流程图

  1. 用户下单,系统创建订单并记录创建时间。
  2. 系统向消息队列发送一条延迟消息,设置延迟时间为订单的超时时长(例如 30 分钟)。
  3. 延迟时间到达时,消息队列将消息交给消费者。
  4. 消费者检查订单是否已支付:
    • 如果已支付,消息被丢弃,不做处理。
    • 如果未支付,执行超时处理(关闭订单、恢复库存等)。

 

 


http://www.kler.cn/a/557160.html

相关文章:

  • AtCoder Beginner Contest (ABC)394(ABCD)
  • MongoDB学习
  • Python爬虫selenium验证-中文识别点选+图片验证码案例
  • vue中的watch 和 computed 的区别
  • 软件架构设计:软件工程
  • Golang通过 并发计算平方 示例演示并发
  • 【Postgresql】Linux 部署 Postgresql 数据库 (图文教程)
  • 直角三角堰计算公式
  • 开发指南103-jpa的find**/get**全解
  • 数据结构:队列queue和栈stack
  • UE5中按钮圆角,设置边框
  • CSDN如何设置付费专栏
  • [ TypeScript ] “undefined extends xxx“ 总是为 true 的 bug
  • 深入解析C++函数指针与指针函数:从原理到实战
  • bind()的概念和使用案例
  • USC安防平台之视频切片
  • 驱动的三个框架
  • 52类110个主流Java组件和框架
  • IEEE官方期刊缩写查询pdf分享
  • 蓝桥杯每日一题--第一周(包含五题)