当前位置: 首页 > article >正文

使用redis实现与flink窗口同样的消息聚合处理效果

背景

最近做了一个需求,其中有个业务场景是,当商家店铺配置发生变动时,要刷新整个店铺下的所有相关数据。

实现分析

如果不考虑性能相关的所有因素,实现起来其实没什么,监听各种配置消息变更,然后加锁刷新数据就好。但是在业务需求中,最大的难点就是性能和数据量,一百条数据和一万条数据的处理难度级别是相差非常巨大的。
本需求下,店铺下的相关数据可能很多,超过上千条也有可能(虽然数据上千条,但是逻辑比较重,mysql估计就得读写十万次了),而且变更场景来源非常多,也就导致在某些时刻变更会非常频繁,可能一分钟就有100次变更。基于以上信息,必须要做对处理逻辑进行性能优化。

其实对于数据变更短时间内非常频繁的场景,有一个比较通用的解法,就是用flink转一道,窗口函数聚合一下。比如说设置一分钟的窗口,将一分钟内的所有变更消息聚合成一条再发送个业务代码进行后续处理。这样就可以显著的降低数据库qps,当然缺点就是时效性会比较差,窗口时间需要权衡一下,flink sql示例代码如下

GROUP BY TUMBLE(event_time, INTERVAL '1' MINUTE), shop_id

但是flink转一道这个事情还是比较考验公司的基础设施完善性的,排查问题也会多了很多弯路,比如说flink有没有收到我发的消息,flink最终发的消息我有没有收到等等。。所以借这次机会,我使用了redis做了一个轻量级的消息窗口聚合机制,来给大家分享一下。

	public void handlerMsg() {
		.....
		.....
		
		boolean canProcess = canMsgProcess(shopId, msgTime, topic, channel, msgId);
		if (!canProcess) {
			return;
		}

		// 如果当前时间大于消息时间五分钟,允许放行,否则重试
		if (currentTime > msgTime + 1000 * 300) {
			// 业务操作 业务操作要加锁和做幂等
		} else {
			throw new RequeueException("等待五分钟后执行");
		}
	}
	
	private boolean canMsgProcess(Long shopId, Long timestamp, String topic, String channel, Long msgId) {
		// 消息时间戳转 以五分钟为单位
		Long timePart = timestamp / (1000 * 300);

		String key = "xxxx_" + "业务名称_" +  shopId + "_t" + timePart;
		String value = redisClient.opsForString().get(key);

		// 消息唯一标识
		String msgUniqueKey = msgId + "_" + topic + "_" + channel;
		if (StringUtils.isBlank(value)) {
			// 如果不存在,就把本消息id设置上去,本消息放行
			redisClient.opsForString().set(key, msgUniqueKey);
			return true;
		}

		if (msgUniqueKey.equals(value)) {
			return true;
		} else {
			log.info("已经有消息在等待执行,本消息跳过 existMsg = {}", value);
			return false;
		}
	}

整体来看就是用了redis的String数据结构,key = 消息业务时间的时间段,value = 消息唯一标识。这个key含义就是保存了指定时间间隔内,哪个消息允许执行。
唯一的要求就是消息中必须有发送时间字段,个人建议所有的消息体都应该加上发送时间,这是一个很有用的字段。
而且对redis的操作不需要使用lua保证原子性,不用担心key并发修改,不用担心缓存丢失,非常简单,有redis就行(因为有消息重试在,感兴趣的话可以想一下异常场景会怎么样)

————————————————

                        版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。

原文链接:https://blog.csdn.net/lvqinglou/article/details/136584954


http://www.kler.cn/a/549565.html

相关文章:

  • GitLab CI/CD 的配置详解:从零开始使用 .gitlab-ci.yml 文件
  • BEV:车轮接地点车辆修正3D框位置精度
  • OpenCV机器学习(2)提升算法类cv::ml::Boost
  • 第25周Java主流框架实战-springboot入门 4.配置详解
  • 第 16 天:游戏 UI(UMG)开发,打造主菜单 血条!
  • ​矩阵元素的“鞍点”​
  • newgrp docker需要每次刷新问题
  • 使用bitnamiredis-sentinel部署Redis 哨兵模式
  • Android 13 通过修改 AOSP 禁用扬声器
  • 练习题 - DRF 3.x Parsers 解析器使用示例和配置方法
  • openGauss 3.0 数据库在线实训课程16:学习逻辑结构:表管理4
  • R 语言科研绘图第 24 期 --- 直方图-高亮
  • Vue CLI 配置与插件
  • 机器学习:集成学习和随机森林
  • 解锁二进制数组:JS、TS、ArkTS 解析
  • MySQL DELETE 语句
  • WPS的AI助手进化跟踪(灵犀+插件)
  • 人工智能 - 大脑神经网络与机器神经网络的区别
  • Deepseek R1模型本地化部署与API实战指南:释放企业级AI生产力
  • 数据库系统原理——第十一章并发控制复习题