提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解
提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解
在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具,专为解决大数据流处理场景中的缓冲与批量处理问题。本文将详细讲解如何在 Spring Boot 项目中使用 BufferTrigger,帮助你提高系统的吞吐量与响应速度,减少 I/O 操作,从而提升整体性能。
BufferTrigger 简介:如何高效处理实时数据流
快手开源的 BufferTrigger 是一个用于数据处理,它主要用于实时数据流处理场景。BufferTrigger 的主要作用是为了解决在大数据流处理中常见的问题:如何高效地对连续的数据流进行缓冲,并在满足一定条件时触发下游计算或存储操作。
使用 BufferTrigger 优势如下:
- 提高效率:通过批量处理数据而不是逐条处理,可以显著减少 I/O 操作的次数,从而提升整体处理效率。
- 资源优化:对于一些需要消耗较多计算资源的操作(如写入数据库、调用外部服务等),通过累积一批数据后再执行一次这样的操作,可以更有效地利用系统资源。
- 简化逻辑:对于开发者而言,使用 BufferTrigger 可以帮助简化代码逻辑,将注意力集中在业务逻辑上而不是复杂的缓冲控制逻辑上。
- 灵活配置:支持多种触发策略(比如基于时间窗口、基于数据量大小等),使得用户可以根据具体应用场景灵活选择最合适的触发方式。
- 易于集成:设计上考虑了与现有数据处理框架的良好兼容性,使得它可以方便地与其他组件一起工作,在现有的技术栈中引入该功能变得更加容易。
如何添加依赖:快速集成到 Spring Boot 项目
只需要在 pom.xml
中添加以下依赖,即可将 BufferTrigger 集成到你的 Spring Boot 项目中:
<properties>
// 省略...
<buffertrigger.version>0.2.21</buffertrigger.version>
</properties>
<!-- 统一依赖管理 -->
<dependencyManagement>
<dependencies>
// 省略...
<!-- 快手 Buffer Trigger -->
<dependency>
<groupId>com.github.phantomthief</groupId>
<artifactId>buffer-trigger</artifactId>
<version>${buffertrigger.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
快手 BufferTrigger 使用讲解
-
核心概念
- 缓冲队列:BufferTrigger 会维护一个内部缓冲区,用来缓存从外部接收的数据。它允许指定缓存队列的最大容量,当达到上限时会根据预设的触发策略进行数据的批量处理。
- 触发策略:触发策略是指何时将缓存的数据批量提交进行处理。常见的触发策略有:
- 基于数据量:当缓存的数据达到指定大小时触发处理。
- 基于时间窗口:每隔一定时间就触发一次处理。
- 混合触发:同时满足数据量和时间条件时触发。
- 数据消费:通过 BufferTrigger 提供的消费者回调机制,开发者可以自定义数据消费的逻辑。一般情况下,消费的过程是对缓存的数据进行处理、存储或其他操作。
- 批量处理:将一批数据聚合后一起处理,而不是一条一条地处理。这样能够减少 I/O 操作的次数,从而提高系统的吞吐量。
使用案例
在许多社交平台上,网红或明星的粉丝数通常会发生频繁的波动。比如,当一个网红被大量用户关注或取消关注时,这些信息会通过消息队列(如 RocketMQ)快速传递,系统需要高效地处理这些变化,并更新到缓存或数据库中。在这种场景下,如果每次有粉丝关注或取消关注时都进行一次 I/O 操作,会导致系统的负载过大,尤其是在并发请求较高时。
为了提高系统的性能,减少频繁的 I/O 操作,通常采用 批量处理 的方式来对消息进行合并和延迟处理。BufferTrigger 就是为了应对这种高并发和实时性要求的场景,它能够将多条消息缓存起来,当满足触发条件时(比如缓存队列达到一定大小或时间窗口到期),将这些消息批量处理,从而减少与缓存系统的交互次数,提升系统的吞吐量和响应速度。
@Component
@RocketMQMessageListener(consumerGroup = "xiaohashu_group_" + MQConstants.TOPIC_COUNT_FANS, // Group 组
topic = MQConstants.TOPIC_COUNT_FANS // 主题 Topic
)
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> {
@Resource
private RedisTemplate<String, Object> redisTemplate;
private BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
.bufferSize(50000) // 缓存队列的最大容量
.batchSize(1000) // 一批次最多聚合 1000 条
.linger(Duration.ofSeconds(1)) // 多久聚合一次
.setConsumerEx(this::consumeMessage)
.build();
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public void onMessage(String body) {
// 往 bufferTrigger 中添加元素
bufferTrigger.enqueue(body);
}
private void consumeMessage(List<String> bodys) {
log.info("==> 聚合消息, size: {}", bodys.size());
log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));
// List<String> 转 List<CountFollowUnfollowMqDTO>
List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream()
.map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();
//按照用户进行一个分组
Map<Long, List<CountFollowUnfollowMqDTO>> groupMap =
countFollowUnfollowMqDTOS.stream()
.collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getUserId));
// 按组汇总数据,统计出最终的计数
// key 为目标用户ID, value 为最终操作的计数
Map<Long, Integer> countMap = Maps.newHashMap();
for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {
List<CountFollowUnfollowMqDTO> list = entry.getValue();
// 最终的计数值,默认为 0
int finalCount = 0;
for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {
Integer type = countFollowUnfollowMqDTO.getType();
FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);
// 若枚举为空,跳到下一次循环
if (Objects.isNull(followUnfollowTypeEnum)) {
continue;
}
switch (followUnfollowTypeEnum) {
case FOLLOW -> finalCount += 1;
case UNFOLLOW -> finalCount -= 1;
}
}
// 将分组后统计出的最终计数,存入 countMap 中
countMap.put(entry.getKey(), finalCount);
}
log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));
// 更新 Redis
countMap.forEach((k, v) -> {
// Redis Key
String redisKey = RedisKeyConstants.buildCountUserKey(k);
// 判断 Redis 中 Hash 是否存在
boolean isExisted = redisTemplate.hasKey(redisKey);
// 若存在才会更新
// (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
if (isExisted) {
// 对目标用户 Hash 中的粉丝数字段进行计数操作
redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);
}
});
// 发送 MQ, 计数数据落库
// 构建消息体 DTO
Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
.build();
// 异步发送 MQ 消息,提升接口响应速度
rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("==> 【计数服务:粉丝数入库】MQ 发送成功,SendResult: {}", sendResult);
}
@Override
public void onException(Throwable throwable) {
log.error("==> 【计数服务:粉丝数入库】MQ 发送异常: ", throwable);
}
});
}
}
代码解析
BufferTrigger
的构建:通过.batchBlocking()
创建一个BufferTrigger
实例,该实例设置了缓存队列的最大容量、每批次最多处理的消息数量、以及聚合的时间窗口等配置。enqueue(body)
:每接收到一条消息,就将消息加入到缓冲队列中,BufferTrigger
会根据设定的策略决定何时批量处理这些数据。consumeMessage(List<String> bodys)
:当数据满足触发条件时(如缓存队列满或时间窗口到期),consumeMessage
会被调用,处理聚合后的数据。