Java学习笔记(二十五)
1 Kafka Raft 简单介绍
Kafka Raft (KRaft) 是 Kafka 引入的一种新的分布式共识协议,用于取代之前依赖的 Apache ZooKeeper 集群管理机制。从 Kafka 2.8 开始,Kafka 开始支持基于 KRaft 的独立模式,计划在未来完全移除 ZooKeeper 的依赖。
1.1 KRaft 的核心概念
-
Raft 共识协议:
- KRaft 基于 Raft 共识协议,确保在分布式环境中,多个副本能够在网络分区或节点故障的情况下保持数据一致性。
- Raft 协议的核心是通过选举产生一个 Leader,Leader 负责处理所有写入请求,并将变更复制到其他节点。
-
Controller 节点:
- 在 KRaft 中,Kafka 的元数据由一个或多个 Controller 管理。
- Controller 使用 Raft 协议管理元数据日志,确保所有 Broker 的元数据一致性。
-
元数据日志:
- KRaft 使用 Raft 日志来存储 Kafka 的元数据(如 Topic 配置、分区分配、ACL 等),并在所有 Controller 节点中保持一致。
- 元数据日志是一个顺序写日志,具有高效和一致的特性。
-
去 ZooKeeper 化:
- KRaft 的主要目标是移除对 ZooKeeper 的依赖,将所有元数据管理整合到 Kafka 自身,从而简化部署和运维。
1.2 KRaft 的优点
-
简化架构:
- 去除了对 ZooKeeper 的依赖,Kafka 不再需要维护独立的 ZooKeeper 集群。
-
提高可扩展性:
- 元数据分布式管理可以支持更大的集群规模和更多的分区数量。
-
一致性增强:
- Raft 协议保证了严格的分布式一致性。
-
性能优化:
- KRaft 的 Controller 实现了更高效的元数据操作,降低了延迟。
-
部署简单:
- 部署 Kafka 时只需配置 Kafka 节点本身,无需额外部署 ZooKeeper。
1.3 KRaft 的运行机制
-
Leader 选举:
- Raft 协议在 Controller 节点间选举出一个 Leader,所有元数据的修改都由 Leader 处理。
- 其他 Controller 节点作为 Follower,接收 Leader 的更新。
-
日志复制:
- Leader 将所有元数据更新以日志形式写入本地磁盘,并复制到其他 Follower 节点。
-
一致性保证:
- Raft 协议通过日志的提交和确认机制,确保所有 Controller 节点的数据一致。
1.4 KRaft 的部署模式
-
单节点模式:
- 适用于开发和测试环境,只有一个 Controller 节点。
-
多节点模式:
- 适用于生产环境,多个 Controller 节点组成一个 Raft 集群,通过分布式协议管理元数据。
1.5 当前进展
- KRaft 从 Kafka 2.8 开始提供实验性支持。
- 从 Kafka 3.3 开始,KRaft 已经能够在大部分生产环境中替代 ZooKeeper。
- Kafka 未来计划完全移除对 ZooKeeper 的支持,让 KRaft 成为唯一的元数据管理机制。
1.6 总结
KRaft 是 Kafka 在架构上的一次重要升级,旨在通过 Raft 协议替代 ZooKeeper,简化部署、提升性能和一致性,为 Kafka 在超大规模集群环境中的应用铺平道路。
2 ack-mode 介绍
Spring Kafka 中的 ack-mode
配置用于指定消息确认 (Acknowledgment) 的策略,即消费者在接收到消息后,何时向 Kafka Broker 确认消费完成。这是一个关键设置,可以影响消息的可靠性和系统性能。
以下是 ack-mode
的详细介绍:
2.1 支持的确认模式
ack-mode
提供以下几种确认模式:
模式 | 描述 | 推荐场景 |
---|---|---|
RECORD | 每处理一条消息后立即发送确认。 | 高可靠性需求,逐条处理消息的场景 |
BATCH | 每批拉取的消息处理完成后发送确认。 | 高吞吐量,批量处理消息的场景 |
TIME | 根据固定时间间隔发送确认,与消息处理无直接关联。 | 希望定时确认,但消息处理较快的场景 |
COUNT | 达到指定的消息数量后发送确认,与消息处理无直接关联。 | 希望固定批量确认的场景 |
COUNT_TIME | 基于消息数量和时间间隔的组合条件,任一条件满足即可触发确认。 | 需要兼顾吞吐量和延迟的场景 |
MANUAL | 需要开发者手动调用 Acknowledgment.acknowledge() 方法确认消息。 | 手动精确控制确认逻辑的场景 |
MANUAL_IMMEDIATE | 与 MANUAL 类似,但立即将确认发送到 Kafka,而不是批量发送。 | 手动控制确认逻辑,要求低延迟的场景 |
2.2 默认值
- 如果未显式设置
ack-mode
,默认值为BATCH
。 - 对应的配置路径:
spring: kafka: listener: ack-mode: BATCH
2.3 不同模式的详细说明
(1) RECORD
- 行为:每处理一条消息后,立即发送确认。
- 优点:
- 确保每条消息的确认独立,最高的消息处理可靠性。
- 如果消费者崩溃,只有未处理的消息会重新投递。
- 缺点:
- 性能较低,因为每条消息都需要发送一次确认。
- 适用场景:关键任务需要确保消息处理可靠性,例如支付系统或交易系统。
(2) BATCH
- 行为:处理完拉取的消息批次后,统一发送确认。
- 优点:
- 提高吞吐量,减少网络开销。
- 更适合批量处理的场景,例如将多个消息一次性写入数据库。
- 缺点:
- 如果消费者崩溃,该批次的所有消息都会重新投递。
- 适用场景:对吞吐量要求高,但允许部分消息重复处理的场景。
(3) TIME
- 行为:定期发送确认,与消息的处理逻辑无关。
- 优点:
- 控制确认的时间间隔,适用于消息处理速度波动的场景。
- 缺点:
- 如果定时时间过长,消费者崩溃会导致更多消息重新投递。
- 适用场景:希望定时确认,但处理逻辑简单快速。
(4) COUNT
- 行为:当处理的消息数达到指定数量后,发送确认。
- 优点:
- 控制确认的消息数量,适合对吞吐量有一定要求的场景。
- 缺点:
- 如果批量大小过大,崩溃会导致更多消息重新投递。
- 适用场景:高吞吐量需求,允许一定范围内的重复处理。
(5) COUNT_TIME
- 行为:同时支持基于数量和时间间隔的确认逻辑,满足任一条件即可发送确认。
- 优点:
- 综合数量和时间的优点,适合动态吞吐量的场景。
- 缺点:
- 较复杂,需要合理配置数量和时间的阈值。
- 适用场景:希望兼顾性能和延迟控制的场景。
(6) MANUAL
- 行为:由监听器方法中显式调用
Acknowledgment.acknowledge()
方法进行确认。 - 优点:
- 最大的控制灵活性,可根据自定义逻辑决定是否确认消息。
- 缺点:
- 更高的开发成本,增加出错风险(如未调用
acknowledge
)。
- 更高的开发成本,增加出错风险(如未调用
- 适用场景:需要精确控制消息确认时间点或条件。
(7) MANUAL_IMMEDIATE
- 行为:与
MANUAL
类似,但确认消息立即发送到 Kafka,不会等到当前批次处理完成。 - 优点:
- 降低延迟。
- 缺点:
- 牺牲一定的吞吐量。
- 适用场景:需要实时性高的确认逻辑,例如低延迟报警系统。
2.4 配置方式
可以通过 application.yml
或代码配置:
2.4.1 YAML 配置
spring:
kafka:
listener:
ack-mode: MANUAL
consumer:
enable-auto-commit: false
2.4.2 Java 配置
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
2.5 如何选择适合的 ack-mode
- 高可靠性:选择
RECORD
或MANUAL
。 - 高吞吐量:选择
BATCH
或COUNT
。 - 实时性需求:选择
MANUAL_IMMEDIATE
。 - 动态场景:选择
COUNT_TIME
。
通过配置合适的 ack-mode
,可以在性能与可靠性之间找到平衡点。
3 MyBatis 二级缓存的生命周期
3.1 什么是 MyBatis 二级缓存?
MyBatis 提供了两级缓存机制:
- 一级缓存(L1 Cache): 作用范围是 单个
SqlSession
,默认开启。 - 二级缓存(L2 Cache): 作用范围是 跨
SqlSession
,基于 Mappernamespace
共享,需要手动配置。
二级缓存的特点:
- 作用范围:Mapper 映射文件级别(namespace 级别)。
- 生命周期比一级缓存更长,可以在不同的
SqlSession
之间共享数据。 - 默认情况下是不开启的,需在
mapper.xml
中显式配置。
3.2 二级缓存的生命周期
MyBatis 的二级缓存生命周期主要与 SqlSession
的生命周期、事务提交、缓存失效策略等因素相关。生命周期如下:
-
初始化阶段(创建缓存)
- 当 MyBatis 加载
mapper.xml
映射文件 并解析<cache>
标签时,二级缓存被创建并注册。 - 在整个 MyBatis 应用的生命周期内,二级缓存会持续存在,除非显式清除。
- 当 MyBatis 加载
-
存储数据(数据写入缓存)
- 当
SqlSession
执行 SQL 查询时,查询结果会先存入 一级缓存(当前SqlSession
内存储)。 - 只有在
SqlSession.close()
时,才会将一级缓存的数据同步到二级缓存。 - 如果启用了二级缓存,不同
SqlSession
之间可以共享数据。
- 当
-
数据读取(从缓存获取数据)
- 当新的
SqlSession
执行查询时,会首先检查二级缓存,如果命中缓存,则直接返回结果,而不会访问数据库。 - 如果缓存未命中,才会查询数据库,并将结果存入二级缓存。
- 当新的
-
数据清除(失效或更新)
- 当执行
INSERT
、UPDATE
或DELETE
操作后,默认情况下会清空二级缓存(保持数据一致性)。 - 可以配置 缓存刷新策略,如基于时间自动刷新。
- 调用
session.commit()
或session.rollback()
时,会清空相应命名空间下的二级缓存。 - 手动清空缓存:
sqlSession.clearCache(); // 清空一级缓存 sqlSessionFactory.getConfiguration().getCache("namespace").clear(); // 清空二级缓存
- 当执行
-
销毁阶段(缓存被清理)
- 当 MyBatis 关闭或应用停止时,二级缓存的所有数据都会被销毁。
- 映射文件重新加载时,旧的缓存实例会被清空。
3.3 二级缓存的存储时机
二级缓存的存入遵循以下原则:
-
SqlSession
关闭时- 当
SqlSession.close()
被调用时,当前会话的一级缓存数据被存入二级缓存。 - 例如:
SqlSession session1 = sqlSessionFactory.openSession(); User user1 = session1.selectOne("com.example.mapper.UserMapper.selectUser", 1); session1.close(); // 关闭时,数据进入二级缓存
- 当
-
多次会话共享
- 关闭第一个
SqlSession
后,第二个SqlSession
执行相同查询时,会从二级缓存获取数据。 - 例如:
SqlSession session2 = sqlSessionFactory.openSession(); User user2 = session2.selectOne("com.example.mapper.UserMapper.selectUser", 1); // 直接从缓存取数据 session2.close();
- 关闭第一个
3.4 二级缓存的配置
步骤 1:在 mybatis-config.xml
中开启二级缓存支持
<configuration>
<settings>
<setting name="cacheEnabled" value="true"/>
</settings>
</configuration>
步骤 2:在 mapper.xml
文件中启用二级缓存
<mapper namespace="com.example.mapper.UserMapper">
<cache
eviction="LRU"
flushInterval="60000"
size="1024"
readOnly="true"/>
</mapper>
eviction="LRU"
:使用最近最少使用(LRU)算法清理缓存。flushInterval="60000"
:每 60 秒清空缓存。size="1024"
:最多缓存 1024 条数据。readOnly="true"
:缓存为只读,提高性能,但不允许修改数据。
步骤 3:确保实体类实现 Serializable
接口
public class User implements Serializable {
private static final long serialVersionUID = 1L;
private int id;
private String name;
// getters and setters
}
3.5 二级缓存的失效场景
二级缓存的数据不会永久有效,以下操作会导致缓存失效:
-
数据变更时(默认)
- 执行
INSERT
、UPDATE
、DELETE
语句后,二级缓存会被清除。 - 可以配置
flushCache="false"
避免清空:<select id="selectUser" resultType="User" flushCache="false"> SELECT * FROM user WHERE id = #{id} </select>
- 执行
-
事务提交时
- 当
SqlSession.commit()
被调用时,二级缓存会刷新。
- 当
-
手动清除缓存
- 调用
sqlSession.clearCache()
会清空一级缓存。 - 调用
sqlSessionFactory.getConfiguration().getCache("namespace").clear();
清空二级缓存。
- 调用
3.6 二级缓存与一级缓存的对比
特性 | 一级缓存(L1 Cache) | 二级缓存(L2 Cache) |
---|---|---|
范围 | 单个 SqlSession | 多个 SqlSession ,namespace 级别共享 |
生命周期 | SqlSession 关闭时失效 | 直到手动清除或策略触发 |
存储位置 | 内存(线程局部变量) | 内存、磁盘或其他第三方缓存 |
默认是否开启 | 开启 | 需手动配置 |
影响范围 | 仅当前事务 | 整个 MyBatis 应用 |
清空触发条件 | commit() 、rollback() 、close() | commit() 、手动清理、超时、更新数据 |
3.7 常见问题及解决方案
问题 | 可能原因 | 解决方案 |
---|---|---|
二级缓存未生效 | 未在 mapper.xml 中启用缓存 | 确保 <cache/> 配置正确 |
更新数据后缓存仍然生效 | 缓存未正确清理 | 确保 flushCache="true" 正确配置 |
数据一致性问题 | 多线程环境导致缓存不同步 | 使用 readOnly="true" 或手动清理 |
Serializable 错误 | 实体类未实现 Serializable 接口 | 确保所有缓存对象实现序列化 |
3.8总结
-
二级缓存的生命周期:
- 在 MyBatis 启动时初始化。
- 在
SqlSession.close()
之后数据才存入二级缓存。 - 数据变更或提交事务后缓存可能失效。
-
使用建议:
- 适用于数据访问频繁但变化少的场景。
- 配置合适的刷新策略以保证数据一致性。