当前位置: 首页 > article >正文

提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

提升 Spring Boot 系统性能:高效处理实时数据流的 BufferTrigger 使用详解

在现代应用中,特别是像社交平台、金融系统等高并发场景下,如何高效地处理大量实时数据成为了系统设计的一个关键问题。BufferTrigger 是由快手开源的一个工具,专为解决大数据流处理场景中的缓冲与批量处理问题。本文将详细讲解如何在 Spring Boot 项目中使用 BufferTrigger,帮助你提高系统的吞吐量与响应速度,减少 I/O 操作,从而提升整体性能。

BufferTrigger 简介:如何高效处理实时数据流

快手开源的 BufferTrigger 是一个用于数据处理,它主要用于实时数据流处理场景。BufferTrigger 的主要作用是为了解决在大数据流处理中常见的问题:如何高效地对连续的数据流进行缓冲,并在满足一定条件时触发下游计算或存储操作。

使用 BufferTrigger 优势如下:

  1. 提高效率:通过批量处理数据而不是逐条处理,可以显著减少 I/O 操作的次数,从而提升整体处理效率。
  2. 资源优化:对于一些需要消耗较多计算资源的操作(如写入数据库、调用外部服务等),通过累积一批数据后再执行一次这样的操作,可以更有效地利用系统资源。
  3. 简化逻辑:对于开发者而言,使用 BufferTrigger 可以帮助简化代码逻辑,将注意力集中在业务逻辑上而不是复杂的缓冲控制逻辑上。
  4. 灵活配置:支持多种触发策略(比如基于时间窗口、基于数据量大小等),使得用户可以根据具体应用场景灵活选择最合适的触发方式。
  5. 易于集成:设计上考虑了与现有数据处理框架的良好兼容性,使得它可以方便地与其他组件一起工作,在现有的技术栈中引入该功能变得更加容易。

如何添加依赖:快速集成到 Spring Boot 项目

只需要在 pom.xml 中添加以下依赖,即可将 BufferTrigger 集成到你的 Spring Boot 项目中:

 <properties>
        // 省略...
        <buffertrigger.version>0.2.21</buffertrigger.version>
    </properties>

    <!-- 统一依赖管理 -->
    <dependencyManagement>
        <dependencies>
            // 省略...

            <!-- 快手 Buffer Trigger -->
            <dependency>
                <groupId>com.github.phantomthief</groupId>
                <artifactId>buffer-trigger</artifactId>
                <version>${buffertrigger.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

快手 BufferTrigger 使用讲解

  • 核心概念
    1. 缓冲队列:BufferTrigger 会维护一个内部缓冲区,用来缓存从外部接收的数据。它允许指定缓存队列的最大容量,当达到上限时会根据预设的触发策略进行数据的批量处理。
    2. 触发策略:触发策略是指何时将缓存的数据批量提交进行处理。常见的触发策略有:
      • 基于数据量:当缓存的数据达到指定大小时触发处理。
      • 基于时间窗口:每隔一定时间就触发一次处理。
      • 混合触发:同时满足数据量和时间条件时触发。
    3. 数据消费:通过 BufferTrigger 提供的消费者回调机制,开发者可以自定义数据消费的逻辑。一般情况下,消费的过程是对缓存的数据进行处理、存储或其他操作。
    4. 批量处理:将一批数据聚合后一起处理,而不是一条一条地处理。这样能够减少 I/O 操作的次数,从而提高系统的吞吐量。

使用案例

在许多社交平台上,网红或明星的粉丝数通常会发生频繁的波动。比如,当一个网红被大量用户关注或取消关注时,这些信息会通过消息队列(如 RocketMQ)快速传递,系统需要高效地处理这些变化,并更新到缓存或数据库中。在这种场景下,如果每次有粉丝关注或取消关注时都进行一次 I/O 操作,会导致系统的负载过大,尤其是在并发请求较高时。

为了提高系统的性能,减少频繁的 I/O 操作,通常采用 批量处理 的方式来对消息进行合并和延迟处理。BufferTrigger 就是为了应对这种高并发和实时性要求的场景,它能够将多条消息缓存起来,当满足触发条件时(比如缓存队列达到一定大小或时间窗口到期),将这些消息批量处理,从而减少与缓存系统的交互次数,提升系统的吞吐量和响应速度。

@Component
@RocketMQMessageListener(consumerGroup = "xiaohashu_group_" + MQConstants.TOPIC_COUNT_FANS, // Group 组
        topic = MQConstants.TOPIC_COUNT_FANS // 主题 Topic
)
@Slf4j
public class CountFansConsumer implements RocketMQListener<String> {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    private BufferTrigger<String> bufferTrigger = BufferTrigger.<String>batchBlocking()
            .bufferSize(50000) // 缓存队列的最大容量
            .batchSize(1000)   // 一批次最多聚合 1000 条
            .linger(Duration.ofSeconds(1)) // 多久聚合一次
            .setConsumerEx(this::consumeMessage)
            .build();

    @Resource
    private RocketMQTemplate rocketMQTemplate;



    @Override
    public void onMessage(String body) {
        // 往 bufferTrigger 中添加元素
        bufferTrigger.enqueue(body);
    }

    private void consumeMessage(List<String> bodys) {
        log.info("==> 聚合消息, size: {}", bodys.size());
        log.info("==> 聚合消息, {}", JsonUtils.toJsonString(bodys));
        // List<String> 转 List<CountFollowUnfollowMqDTO>
        List<CountFollowUnfollowMqDTO> countFollowUnfollowMqDTOS = bodys.stream()
                .map(body -> JsonUtils.parseObject(body, CountFollowUnfollowMqDTO.class)).toList();

        //按照用户进行一个分组
        Map<Long, List<CountFollowUnfollowMqDTO>> groupMap  =
                countFollowUnfollowMqDTOS.stream()
                        .collect(Collectors.groupingBy(CountFollowUnfollowMqDTO::getUserId));

        // 按组汇总数据,统计出最终的计数
        // key 为目标用户ID, value 为最终操作的计数
        Map<Long, Integer> countMap = Maps.newHashMap();

        for (Map.Entry<Long, List<CountFollowUnfollowMqDTO>> entry : groupMap.entrySet()) {
            List<CountFollowUnfollowMqDTO> list = entry.getValue();
            // 最终的计数值,默认为 0
            int finalCount = 0;
            for (CountFollowUnfollowMqDTO countFollowUnfollowMqDTO : list) {
                Integer type = countFollowUnfollowMqDTO.getType();
                FollowUnfollowTypeEnum followUnfollowTypeEnum = FollowUnfollowTypeEnum.valueOf(type);
                // 若枚举为空,跳到下一次循环
                if (Objects.isNull(followUnfollowTypeEnum)) {
                    continue;
                }

                switch (followUnfollowTypeEnum) {
                    case FOLLOW -> finalCount += 1;
                    case UNFOLLOW -> finalCount -= 1;
                }

            }
            // 将分组后统计出的最终计数,存入 countMap 中
            countMap.put(entry.getKey(), finalCount);
        }
        log.info("## 聚合后的计数数据: {}", JsonUtils.toJsonString(countMap));
        // 更新 Redis
        countMap.forEach((k, v) -> {
            // Redis Key
            String redisKey = RedisKeyConstants.buildCountUserKey(k);
            // 判断 Redis 中 Hash 是否存在
            boolean isExisted = redisTemplate.hasKey(redisKey);

            // 若存在才会更新
            // (因为缓存设有过期时间,考虑到过期后,缓存会被删除,这里需要判断一下,存在才会去更新,而初始化工作放在查询计数来做)
            if (isExisted) {
                // 对目标用户 Hash 中的粉丝数字段进行计数操作
                redisTemplate.opsForHash().increment(redisKey, RedisKeyConstants.FIELD_FANS_TOTAL, v);
            }
        });

        // 发送 MQ, 计数数据落库
        // 构建消息体 DTO
        Message<String> message = MessageBuilder.withPayload(JsonUtils.toJsonString(countMap))
                .build();

        // 异步发送 MQ 消息,提升接口响应速度
        rocketMQTemplate.asyncSend(MQConstants.TOPIC_COUNT_FANS_2_DB, message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("==> 【计数服务:粉丝数入库】MQ 发送成功,SendResult: {}", sendResult);
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("==> 【计数服务:粉丝数入库】MQ 发送异常: ", throwable);
            }
        });
    }

}

代码解析

  • BufferTrigger 的构建:通过 .batchBlocking() 创建一个 BufferTrigger 实例,该实例设置了缓存队列的最大容量、每批次最多处理的消息数量、以及聚合的时间窗口等配置。
  • enqueue(body):每接收到一条消息,就将消息加入到缓冲队列中,BufferTrigger 会根据设定的策略决定何时批量处理这些数据。
  • consumeMessage(List<String> bodys):当数据满足触发条件时(如缓存队列满或时间窗口到期),consumeMessage 会被调用,处理聚合后的数据。

http://www.kler.cn/a/565399.html

相关文章:

  • JavaScript系列(92)--前端监控体系
  • Vue 3 + Vite 项目配置访问地址到服务器某个文件夹的解决方案
  • 全面解析:如何查找电脑的局域网与公网IP地址‌
  • reallocate() 和 allocate() 的区别
  • vue实现根据点击或滑动展示对应高亮
  • Django ORM 的常用字段类型、外键关联的跨表引用技巧,以及 `_` 和 `__` 的使用场景
  • 基于EasyExcel封装的Excel工具类,支持高效导出和读取操作
  • Qt for Android下QMessageBox背景黑色、文字点击闪烁
  • JSX 实现列表渲染
  • react绑定ref调用
  • 转化率(漏斗分析)——mysql计算过程
  • Amazon Neptune深度解析:高性能图形分析和无服务器数据库的场景化实践与技术优
  • XFeat:轻量级的深度学习图像特征匹配
  • 机器学习数学基础:34.二列相关教程
  • Redis 面试
  • 矩阵的 正定(Positive Definite)与负定(Negative Definite):从Fisher信息矩阵看“曲率”的秘密
  • smolagents学习笔记系列(十)Examples - Web Browser Automation with Agents
  • Linux设备驱动开发-Pinctrl子系统使用详解
  • 导入 Excel 规则批量修改或删除 Word 内容
  • 【Linux】进程间通信——命名管道