当前位置: 首页 > article >正文

Java学习笔记(二十五)

1 Kafka Raft 简单介绍

Kafka Raft (KRaft) 是 Kafka 引入的一种新的分布式共识协议,用于取代之前依赖的 Apache ZooKeeper 集群管理机制。从 Kafka 2.8 开始,Kafka 开始支持基于 KRaft 的独立模式,计划在未来完全移除 ZooKeeper 的依赖。


1.1 KRaft 的核心概念

  1. Raft 共识协议

    • KRaft 基于 Raft 共识协议,确保在分布式环境中,多个副本能够在网络分区或节点故障的情况下保持数据一致性。
    • Raft 协议的核心是通过选举产生一个 Leader,Leader 负责处理所有写入请求,并将变更复制到其他节点。
  2. Controller 节点

    • 在 KRaft 中,Kafka 的元数据由一个或多个 Controller 管理。
    • Controller 使用 Raft 协议管理元数据日志,确保所有 Broker 的元数据一致性。
  3. 元数据日志

    • KRaft 使用 Raft 日志来存储 Kafka 的元数据(如 Topic 配置、分区分配、ACL 等),并在所有 Controller 节点中保持一致。
    • 元数据日志是一个顺序写日志,具有高效和一致的特性。
  4. 去 ZooKeeper 化

    • KRaft 的主要目标是移除对 ZooKeeper 的依赖,将所有元数据管理整合到 Kafka 自身,从而简化部署和运维。

1.2 KRaft 的优点

  1. 简化架构

    • 去除了对 ZooKeeper 的依赖,Kafka 不再需要维护独立的 ZooKeeper 集群。
  2. 提高可扩展性

    • 元数据分布式管理可以支持更大的集群规模和更多的分区数量。
  3. 一致性增强

    • Raft 协议保证了严格的分布式一致性。
  4. 性能优化

    • KRaft 的 Controller 实现了更高效的元数据操作,降低了延迟。
  5. 部署简单

    • 部署 Kafka 时只需配置 Kafka 节点本身,无需额外部署 ZooKeeper。

1.3 KRaft 的运行机制

  1. Leader 选举

    • Raft 协议在 Controller 节点间选举出一个 Leader,所有元数据的修改都由 Leader 处理。
    • 其他 Controller 节点作为 Follower,接收 Leader 的更新。
  2. 日志复制

    • Leader 将所有元数据更新以日志形式写入本地磁盘,并复制到其他 Follower 节点。
  3. 一致性保证

    • Raft 协议通过日志的提交和确认机制,确保所有 Controller 节点的数据一致。

1.4 KRaft 的部署模式

  1. 单节点模式

    • 适用于开发和测试环境,只有一个 Controller 节点。
  2. 多节点模式

    • 适用于生产环境,多个 Controller 节点组成一个 Raft 集群,通过分布式协议管理元数据。

1.5 当前进展

  • KRaft 从 Kafka 2.8 开始提供实验性支持。
  • 从 Kafka 3.3 开始,KRaft 已经能够在大部分生产环境中替代 ZooKeeper。
  • Kafka 未来计划完全移除对 ZooKeeper 的支持,让 KRaft 成为唯一的元数据管理机制。

1.6 总结

KRaft 是 Kafka 在架构上的一次重要升级,旨在通过 Raft 协议替代 ZooKeeper,简化部署、提升性能和一致性,为 Kafka 在超大规模集群环境中的应用铺平道路。

2 ack-mode 介绍

Spring Kafka 中的 ack-mode 配置用于指定消息确认 (Acknowledgment) 的策略,即消费者在接收到消息后,何时向 Kafka Broker 确认消费完成。这是一个关键设置,可以影响消息的可靠性和系统性能。

以下是 ack-mode 的详细介绍:


2.1 支持的确认模式

ack-mode 提供以下几种确认模式:

模式描述推荐场景
RECORD每处理一条消息后立即发送确认。高可靠性需求,逐条处理消息的场景
BATCH每批拉取的消息处理完成后发送确认。高吞吐量,批量处理消息的场景
TIME根据固定时间间隔发送确认,与消息处理无直接关联。希望定时确认,但消息处理较快的场景
COUNT达到指定的消息数量后发送确认,与消息处理无直接关联。希望固定批量确认的场景
COUNT_TIME基于消息数量和时间间隔的组合条件,任一条件满足即可触发确认。需要兼顾吞吐量和延迟的场景
MANUAL需要开发者手动调用 Acknowledgment.acknowledge() 方法确认消息。手动精确控制确认逻辑的场景
MANUAL_IMMEDIATEMANUAL 类似,但立即将确认发送到 Kafka,而不是批量发送。手动控制确认逻辑,要求低延迟的场景

2.2 默认值

  • 如果未显式设置 ack-mode,默认值为 BATCH
  • 对应的配置路径:
    spring:
      kafka:
        listener:
          ack-mode: BATCH
    

2.3 不同模式的详细说明

(1) RECORD
  • 行为:每处理一条消息后,立即发送确认。
  • 优点
    • 确保每条消息的确认独立,最高的消息处理可靠性。
    • 如果消费者崩溃,只有未处理的消息会重新投递。
  • 缺点
    • 性能较低,因为每条消息都需要发送一次确认。
  • 适用场景:关键任务需要确保消息处理可靠性,例如支付系统或交易系统。

(2) BATCH
  • 行为:处理完拉取的消息批次后,统一发送确认。
  • 优点
    • 提高吞吐量,减少网络开销。
    • 更适合批量处理的场景,例如将多个消息一次性写入数据库。
  • 缺点
    • 如果消费者崩溃,该批次的所有消息都会重新投递。
  • 适用场景:对吞吐量要求高,但允许部分消息重复处理的场景。

(3) TIME
  • 行为:定期发送确认,与消息的处理逻辑无关。
  • 优点
    • 控制确认的时间间隔,适用于消息处理速度波动的场景。
  • 缺点
    • 如果定时时间过长,消费者崩溃会导致更多消息重新投递。
  • 适用场景:希望定时确认,但处理逻辑简单快速。

(4) COUNT
  • 行为:当处理的消息数达到指定数量后,发送确认。
  • 优点
    • 控制确认的消息数量,适合对吞吐量有一定要求的场景。
  • 缺点
    • 如果批量大小过大,崩溃会导致更多消息重新投递。
  • 适用场景:高吞吐量需求,允许一定范围内的重复处理。

(5) COUNT_TIME
  • 行为:同时支持基于数量和时间间隔的确认逻辑,满足任一条件即可发送确认。
  • 优点
    • 综合数量和时间的优点,适合动态吞吐量的场景。
  • 缺点
    • 较复杂,需要合理配置数量和时间的阈值。
  • 适用场景:希望兼顾性能和延迟控制的场景。

(6) MANUAL
  • 行为:由监听器方法中显式调用 Acknowledgment.acknowledge() 方法进行确认。
  • 优点
    • 最大的控制灵活性,可根据自定义逻辑决定是否确认消息。
  • 缺点
    • 更高的开发成本,增加出错风险(如未调用 acknowledge)。
  • 适用场景:需要精确控制消息确认时间点或条件。

(7) MANUAL_IMMEDIATE
  • 行为:与 MANUAL 类似,但确认消息立即发送到 Kafka,不会等到当前批次处理完成。
  • 优点
    • 降低延迟。
  • 缺点
    • 牺牲一定的吞吐量。
  • 适用场景:需要实时性高的确认逻辑,例如低延迟报警系统。

2.4 配置方式

可以通过 application.yml 或代码配置:

2.4.1 YAML 配置
spring:
  kafka:
    listener:
      ack-mode: MANUAL
    consumer:
      enable-auto-commit: false
2.4.2 Java 配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

2.5 如何选择适合的 ack-mode

  • 高可靠性:选择 RECORDMANUAL
  • 高吞吐量:选择 BATCHCOUNT
  • 实时性需求:选择 MANUAL_IMMEDIATE
  • 动态场景:选择 COUNT_TIME

通过配置合适的 ack-mode,可以在性能与可靠性之间找到平衡点。

3 MyBatis 二级缓存的生命周期


3.1 什么是 MyBatis 二级缓存?

MyBatis 提供了两级缓存机制:

  1. 一级缓存(L1 Cache): 作用范围是 单个 SqlSession,默认开启。
  2. 二级缓存(L2 Cache): 作用范围是 SqlSession,基于 Mapper namespace 共享,需要手动配置。

二级缓存的特点:

  • 作用范围:Mapper 映射文件级别(namespace 级别)
  • 生命周期比一级缓存更长,可以在不同的 SqlSession 之间共享数据。
  • 默认情况下是不开启的,需在 mapper.xml 中显式配置。

3.2 二级缓存的生命周期

MyBatis 的二级缓存生命周期主要与 SqlSession 的生命周期、事务提交、缓存失效策略等因素相关。生命周期如下:

  1. 初始化阶段(创建缓存)

    • 当 MyBatis 加载 mapper.xml 映射文件 并解析 <cache> 标签时,二级缓存被创建并注册。
    • 在整个 MyBatis 应用的生命周期内,二级缓存会持续存在,除非显式清除。
  2. 存储数据(数据写入缓存)

    • SqlSession 执行 SQL 查询时,查询结果会先存入 一级缓存(当前 SqlSession 内存储)
    • 只有在 SqlSession.close() 时,才会将一级缓存的数据同步到二级缓存。
    • 如果启用了二级缓存,不同 SqlSession 之间可以共享数据。
  3. 数据读取(从缓存获取数据)

    • 当新的 SqlSession 执行查询时,会首先检查二级缓存,如果命中缓存,则直接返回结果,而不会访问数据库。
    • 如果缓存未命中,才会查询数据库,并将结果存入二级缓存。
  4. 数据清除(失效或更新)

    • 当执行 INSERTUPDATEDELETE 操作后,默认情况下会清空二级缓存(保持数据一致性)。
    • 可以配置 缓存刷新策略,如基于时间自动刷新。
    • 调用 session.commit()session.rollback() 时,会清空相应命名空间下的二级缓存。
    • 手动清空缓存:
      sqlSession.clearCache(); // 清空一级缓存
      sqlSessionFactory.getConfiguration().getCache("namespace").clear(); // 清空二级缓存
      
  5. 销毁阶段(缓存被清理)

    • 当 MyBatis 关闭或应用停止时,二级缓存的所有数据都会被销毁。
    • 映射文件重新加载时,旧的缓存实例会被清空。

3.3 二级缓存的存储时机

二级缓存的存入遵循以下原则:

  1. SqlSession 关闭时

    • SqlSession.close() 被调用时,当前会话的一级缓存数据被存入二级缓存。
    • 例如:
      SqlSession session1 = sqlSessionFactory.openSession();
      User user1 = session1.selectOne("com.example.mapper.UserMapper.selectUser", 1);
      session1.close();  // 关闭时,数据进入二级缓存
      
  2. 多次会话共享

    • 关闭第一个 SqlSession 后,第二个 SqlSession 执行相同查询时,会从二级缓存获取数据。
    • 例如:
      SqlSession session2 = sqlSessionFactory.openSession();
      User user2 = session2.selectOne("com.example.mapper.UserMapper.selectUser", 1); // 直接从缓存取数据
      session2.close();
      

3.4 二级缓存的配置

步骤 1:在 mybatis-config.xml 中开启二级缓存支持

<configuration>
    <settings>
        <setting name="cacheEnabled" value="true"/>
    </settings>
</configuration>

步骤 2:在 mapper.xml 文件中启用二级缓存

<mapper namespace="com.example.mapper.UserMapper">
    <cache 
        eviction="LRU" 
        flushInterval="60000" 
        size="1024" 
        readOnly="true"/>
</mapper>
  • eviction="LRU":使用最近最少使用(LRU)算法清理缓存。
  • flushInterval="60000":每 60 秒清空缓存。
  • size="1024":最多缓存 1024 条数据。
  • readOnly="true":缓存为只读,提高性能,但不允许修改数据。

步骤 3:确保实体类实现 Serializable 接口

public class User implements Serializable {
    private static final long serialVersionUID = 1L;
    private int id;
    private String name;
    // getters and setters
}

3.5 二级缓存的失效场景

二级缓存的数据不会永久有效,以下操作会导致缓存失效:

  1. 数据变更时(默认)

    • 执行 INSERTUPDATEDELETE 语句后,二级缓存会被清除。
    • 可以配置 flushCache="false" 避免清空:
      <select id="selectUser" resultType="User" flushCache="false">
          SELECT * FROM user WHERE id = #{id}
      </select>
      
  2. 事务提交时

    • SqlSession.commit() 被调用时,二级缓存会刷新。
  3. 手动清除缓存

    • 调用 sqlSession.clearCache() 会清空一级缓存。
    • 调用 sqlSessionFactory.getConfiguration().getCache("namespace").clear(); 清空二级缓存。

3.6 二级缓存与一级缓存的对比

特性一级缓存(L1 Cache)二级缓存(L2 Cache)
范围单个 SqlSession多个 SqlSession,namespace 级别共享
生命周期SqlSession 关闭时失效直到手动清除或策略触发
存储位置内存(线程局部变量)内存、磁盘或其他第三方缓存
默认是否开启开启需手动配置
影响范围仅当前事务整个 MyBatis 应用
清空触发条件commit()rollback()close()commit()、手动清理、超时、更新数据

3.7 常见问题及解决方案

问题可能原因解决方案
二级缓存未生效未在 mapper.xml 中启用缓存确保 <cache/> 配置正确
更新数据后缓存仍然生效缓存未正确清理确保 flushCache="true" 正确配置
数据一致性问题多线程环境导致缓存不同步使用 readOnly="true" 或手动清理
Serializable 错误实体类未实现 Serializable 接口确保所有缓存对象实现序列化

3.8总结

  1. 二级缓存的生命周期:

    • 在 MyBatis 启动时初始化。
    • SqlSession.close() 之后数据才存入二级缓存。
    • 数据变更或提交事务后缓存可能失效。
  2. 使用建议:

    • 适用于数据访问频繁但变化少的场景。
    • 配置合适的刷新策略以保证数据一致性。

http://www.kler.cn/a/522626.html

相关文章:

  • FreeRTOS学习 --- 动态任务创建和删除的详细过程
  • “AI视频智能分析系统:让每一帧视频都充满智慧
  • 十年筑梦,再创鲸彩!庆祝和鲸科技十周年
  • 一个简单的自适应html5导航模板
  • 出现 Error processing condition on org.springframework.cloud.openfeign 解决方法
  • 14-6-2C++STL的list
  • Python面向对象编程实战:构建强大的 `Person` 类
  • CSS知识总结
  • zookeeper-3.8.3-基于ACL的访问控制
  • 私域流量池构建与转化策略:以开源链动2+1模式AI智能名片S2B2C商城小程序为例
  • Hive详细讲解-调优分区表速通
  • The Simulation技术浅析(二):模型技术
  • Python爬虫获取custom-1688自定义API操作接口
  • 【异步编程基础】FutureTask基本原理与异步阻塞问题
  • constexpr 实现编译时加密
  • Spark入门(Python)
  • python基础语法(4) ----- 学习笔记分享
  • 基于SpringBoot的网上摄影工作室开发与实现 | 含论文、任务书、选题表
  • 【JavaSE】String类常用字符串方法总结
  • Django-Admin WebView 集成项目技术规范文档 v2.1
  • 【2024年华为OD机试】 (C卷,100分)- 用户调度问题(JavaScriptJava PythonC/C++)
  • games101-(2)线性代数
  • LosslessScaling-学习版[steam价值30元的游戏无损放大/补帧工具]
  • Unexpected WSL error Error code: Wsl/Service/0x8007273的解决
  • 【creo】CREO配置快捷键方式和默认单位
  • DataWhale组队学习 fun-transformer task5