飞书文档
解决缓存与数据源数据不一致的方案有很多, 各有优缺点; 1.0、旁路缓存策略, 直接同步更新 读取流程:
查询缓存。如果缓存命中,则直接返回结果。 如果缓存未命中,则查询数据库。 将数据库查询到的数据写入缓存,并设置一个过期时间。 写入流程: 更新数据库中的数据。 删除缓存中的数据缓存。 缺点:
缓存穿透 (高并发情况下如果缓存未命中,需要查询数据库并回写缓存,这可能在高并发下场景增加数据库的负载)
高并发环境下可能导致数据不一致,因为在删除Redis时,如果有其他请求线程已经读取了MySQL旧数据,然后缓存进去, 就会出现脏数据问题。 伪代码: vodi updateData(id, newData) { // 更新MySQL mysql.update(“UPDATE table SET data = ? WHERE id = ?”, newData, id);
// 更新Redis redis.del(“data:” + id); } 2.0、延迟双删策略 读取流程:
查询缓存。如果缓存命中,则直接返回结果。 如果缓存未命中,则查询数据库。 将数据库查询到的数据写入缓存,并设置一个过期时间。 写入流程: 删除缓存中的数据缓存。 更新数据库中的数据。 延迟1~3S再删除缓存中的数据缓存。 缺点:
短时间的数据不一致, 但最终实现的效果是最终数据一致性, 不适合对实时性要求极高的场景。
延迟删除不保证成功 (如需保证测需要重试策略) 伪代码: void updateData(id, newData) { // 删除Redis缓存 redis.delete(“data:” + id);
// 更新MySQL mysql.update(“UPDATE table SET data = ? WHERE id = ?”, newData, id);
// 延时任务,延时后再次删除Redis缓存 scheduleTask(() => { redis.delete(“data:” + id); }, delayTime); // delayTime为延迟时间,例如1000毫秒 } 3.0、定时+增量更新策略 读取流程:
查询缓存。如果缓存命中,则直接返回结果。 如果缓存未命中,则查询数据库。 将数据库查询到的数据写入缓存,并设置一个过期时间。 缓存写入流程: 固定时间去拉取增量数据, 比如比标记时间大的数据。 更新进缓存里 优点:
减少负载:相比实时同步,定时+增量更新可以减少对数据库和缓存的压力,因为它不会在每次数据变更时立即进行同步。
灵活性高:可以根据业务需求调整定时任务的频率和增量更新的策略。 缺点:
根据配置定时时间长短造成时间的数据不一致, 但最终实现的效果是最终数据一致性, 不适合对实时性要求极高的场景。
需要针对不同的业务实现一套不同的定时+增量更新策略 伪代码: void syncCache() { // 获取上次同步的时间戳 Timestamp lastSyncTime = getLastSyncTime();
// 查询自上次同步以来有变更的数据 List updatedData = mysql.query( “SELECT * FROM table WHERE update_time > ?”, lastSyncTime);
// 更新缓存 for (Data data : updatedData) { redis.set(“data:” + data.id, data); }
// 更新最后同步时间 updateLastSyncTime(currentTime()); } 4.0、锁机制更新策略 读取流程:
查询缓存。如果缓存命中,则直接返回结果。 如果缓存未命中, 向Redis上读锁。 查询数据库。 将数据库查询到的数据写入缓存,并设置一个过期时间 & 释放读锁。 写入流程: 向Redis上写锁。 更新数据库中的数据。 删除缓存中的数据缓存。 释放写锁 优点:
数据强一致性。 缺点:
性能比其他方式差。 伪代码: void save() { // 获取写锁 RedissonClient client = RedissonClientUtil.getClient(“”); RReadWriteLock readWriteLock = client.getReadWriteLock(“myLock”); RLock writeLock = readWriteLock.writeLock(); // 加锁 writeLock.lock(); // 查询自上次同步以来有变更的数据 List updatedData = mysql.query( “SELECT * FROM table WHERE update_time > ?”, lastSyncTime); // 删除缓存 redis.del(“data:” + data.id);
// 最后释放锁 writeLock.unlock(); } 5.0、异步更新策略 Binlog 订阅方式, 这种策略已经和业务解耦开; 读取流程:
查询缓存。如果缓存命中,则直接返回结果。 缓存未命中查询数据库。 将数据库查询到的数据写入缓存,并设置一个过期时间 数据源写入流程: 更新数据库中的数据。 缓存处理中间件: 接收Binlog 订阅发送过来的数据; 进行缓存的处理 (可以更新也可以删除) 优点:
数据最终一致性。 和业务解耦, 业务代码不用再考虑数据源数据更新后缓存怎么更新。 缺点: 有延迟情况; 实现成本与维护成本大 大概流程如下图 [图片] 但是基础组件并不满足于公司复杂多变的业务场景, 需要基于基础组件实现定制化的异步更新策略; 基于异步更新策略的架构设计方案 CDC (Change data capture) [图片] 不想引入mq?试试debezium小型项目当中,没有引入消息中间件,也不想引入,但需要解耦异步,那怎么办呢?CDC组件是 - 掘金 从选项来看, Flink CDC 太重量级; sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除, canal只能对MYSQL进行CDC监控。有很大的局限性。 综上所述 debezium 较为合适; Debezium 介绍 Debezium 构建在Apache Kafka的基础之上,并提供了一套与Kafka Connect兼容的连接器。每个连接器都与特定的数据库管理系统(DBMS)协同工作。连接器通过检测发生的变化来记录 DBMS 中数据变化的历史,并将每个变化事件的记录流式传输到 Kafka 主题。然后,消费应用程序可以从 Kafka 主题中读取由此产生的事件记录; 通过利用 Kafka 可靠的流平台,Debezium 使应用程序能够正确、完整地消费数据库中发生的变化。即使应用程序意外停止或失去连接,也不会错过中断期间发生的事件。应用程序重新启动后,会从中断的位置继续读取主题。 使用 Debezium 需要三个独立的服务:ZooKeeper、Kafka 和 Debezium 连接器服务, 但是同时Debezium也支持并非每个应用程序都需要这种级别的容错性和可靠性,它们可能不希望依赖外部的 Kafka 代理集群和 Kafka 连接服务。相反,有些应用程序更愿意将 Debezium 连接器直接嵌入应用程序空间。它们仍然需要相同的数据变更事件,但更希望连接器能直接将其发送到应用程序,而不是在 Kafka 内持久化 对应文档说明: Debezium Engine 下面采用轻量级方式集成 Debezium 连接器直接嵌入应用程序空间 1.0、依赖关系 Springboot 2.7.5, jdk8版本 [图片] [图片] 从上图看 debezium从1.6版本开始就需要 >= jdk11了; 那么我们只能使用1.5版本
[图片] 使用 1.5.4.Final jdk8支持的最新版本 <version.debezium>1.5.4.Final</version.debezium>
io.debezium debezium-api ${version.debezium} io.debezium debezium-embedded ${version.debezium} io.debezium debezium-connector-mysql ${version.debezium} 2.0 简单demo 暂时无法在飞书文档外展示此内容 在MySQL5.6与MySQL5.7版本上运行是正常的 3.0、架构设计图 暂时无法在飞书文档外展示此内容 3.1、整个架构分为 四 部分 1. Debezium Engine连接器管理应用 2. Debezium Engine连接器应用 3. Debezium 消息重试应用 4. Debezium Engine连接器消费应用 3.2、Debezium Engine连接器管理应用 (web页面可以先不做) 主体功能: 对外web服务, 用于管理连接器, 将连接器数据可视化等; > 和Debezium Engine连接器应用交互 管理连接器: (新增并启动连接器、删除并停止连接器(也会删除连接器相关offset文件)) 可视化的数据: (连接器相关信息、连接器运行状态 连接器已处理事件数量、失败数量、待重试数量、等) 3.3、Debezium Engine连接器应用 主体功能分为: 1. 连接器的启动/运行/停止 (分为项目启动时候 / 接收管理应用请求的的时候) 2. 事件的数据解析/发送下游 3. 监控指标收集 4. 异常事件重试&记录&告警 CREATE DATABASE debezium_config_manage; 相关的表: debezium_connect CREATE TABLE `debezium_connect_base_info` ( `id` bigint(20) unsigned NOT NULL COMMENT '不用自增(使用顺序雪花)', `business_describe` varchar(255) NOT NULL DEFAULT '' COMMENT '连接器业务描述', `connect_db_type` varchar(30) NOT NULL DEFAULT '' COMMENT '连接DB类型(MySQL数据库:MySQL)', `connect_name` varchar(255) NOT NULL DEFAULT '' COMMENT '连接器名称', `connect_status` varchar(20) NOT NULL DEFAULT 'wait' COMMENT '连接器状态(run:运行中、wait:待运行、stop:停止)', `current_finish_num` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '当前完成数 (非实时)', `total_finish_num` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '总完成数 (非实时)', `wait_finish_retry_num` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '待完成重试数 (实时)', `total_finish_retry_num` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '总完成重试数 (实时)', `is_delete` tinyint(1) NOT NULL DEFAULT 0 COMMENT '删除状态(1:删除,0:未删除)', `last_operator_id` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '最后操作人ID', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `version` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '版本号', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注', PRIMARY KEY (`id`), CONSTRAINT UNIQUE KEY `idx_connect_name_connect_db_type` (`connect_name`,`connect_db_type`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='连接器基础信息表'; debezium_connect_param CREATE TABLE `debezium_connect_param` ( `id` bigint unsigned NOT NULL AUTO_INCREMENT COMMENT '主键ID', `connect_base_info_id` bigint unsigned NOT NULL DEFAULT '0' COMMENT '连接器基础表ID', `database_table_list` text COMMENT '监控的多个的数据库和表 (account.user)格式 , 逗号分隔', `database_history_file_filename` varchar(100) NOT NULL DEFAULT '' COMMENT 'dbhistory文件名称 (唯一值), 不包含路径(x.txt)', `offset_storage_file_filename` varchar(100) NOT NULL DEFAULT '' COMMENT 'offset文件名称 (唯一值), 不包含路径(x.dat)', `database_server_id` bigint unsigned NOT NULL DEFAULT '0' COMMENT '连接Debezium 服务的 id(唯一值)', `connect_param_json` text COMMENT '剩余连接器JSON参数', `is_delete` tinyint(1) NOT NULL DEFAULT '0' COMMENT '删除状态(1:删除0:未删除)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注', PRIMARY KEY (`id`), UNIQUE KEY `idx_database_history_file_filename` (`database_history_file_filename`), UNIQUE KEY `idx_database_server_id` (`database_server_id`), UNIQUE KEY `idx_offset_storage_file_filename` (`offset_storage_file_filename`), KEY `idx_connect_base_info_id` (`connect_base_info_id`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='连接器参数表'; debezium_message_invoke CREATE TABLE `debezium_message_invoke` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `connect_base_info_id` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '连接器基础表ID', `invoke_type` varchar(50) NOT NULL DEFAULT '' COMMENT '调用方式 (http、MQ)', `http_invoke_address` varchar(255) NOT NULL DEFAULT '' COMMENT 'http的调用地址', `mq_invoke_address` varchar(255) NOT NULL DEFAULT '' COMMENT 'mq的调用route、topic (JSON方式)', `invoke_extra_mark` varchar(255) NOT NULL DEFAULT '' COMMENT '调用额外带上的标识', `message_max_retry` int(20) unsigned NOT NULL DEFAULT 0 COMMENT '消息最大重试次数', `message_retry_interval` int(20) unsigned NOT NULL DEFAULT 0 COMMENT '消息重试间隔(单位分钟)', `is_delete` tinyint(1) NOT NULL DEFAULT 0 COMMENT '删除状态(1:删除0:未删除)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注', CONSTRAINT PRIMARY KEY (`id`), KEY `idx_connect_base_info_id` (`connect_base_info_id`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='连接器消息调用方式'; debezium_connect_operation_record CREATE TABLE `debezium_connect_operation_record` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `connect_base_info_id` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '连接器基础表ID', `operator_id` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '操作人ID', `operator_type` varchar(50) NOT NULL DEFAULT '' COMMENT '操作人类型 (user:用户, system:系统)', `operation_type` varchar(50) NOT NULL DEFAULT '' COMMENT '操作类型 (add:添加连接器, update:修改连接器、del:删除连接器, updateCount:更新统计信息)', `operation_content` TINYTEXT COMMENT '操作JSON参数', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注', CONSTRAINT PRIMARY KEY (`id`), KEY `idx_connect_base_info_id` (`connect_base_info_id`), KEY `idx_operator_id_operator_type` (`operator_id`,`operator_type`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='连接器操作记录表'; debezium_connect_message_retry CREATE TABLE `debezium_connect_message_retry` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `connect_base_info_id` bigint(20) unsigned NOT NULL DEFAULT 0 COMMENT '连接器基础表ID', `operator_content` TINYTEXT COMMENT '需重试的JSON参数', `next_retry_time` datetime NOT NULL COMMENT '下一次重试时间', `message_current_retry` int(20) unsigned NOT NULL DEFAULT 0 COMMENT '消息当前重试次数', `message_status` varchar(255) NOT NULL DEFAULT 'wait' COMMENT '消息状态(wait:待执行, run:执行中, finish:执行完成)' `message_retry_status` varchar(255) NOT NULL DEFAULT '' COMMENT '消息重试状态(success:成功, fail:失败)' `is_delete` tinyint(1) NOT NULL DEFAULT 0 COMMENT '删除状态(1:删除0:未删除)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注' CONSTRAINT PRIMARY KEY (`id`), KEY `idx_connect_base_info_id` (`connect_base_info_id`), KEY `idx_next_retry_time` (`next_retry_time`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='连接器消息重试表'; 无人为介入流程是: 项目启动 > 从数据库拉取所有未删除的连接器 > 进行启动 > 并启动一个间隔一分钟统计一次数据的监控指标线程 监控binlog信息 > 解析成为可读的请求消息 > 根据连接器调用方式进行调用下游 当消息调用失败, 则进行简单重试 3次 > 继续失败后 > 构建重试对象插入到重试表 (让重试服务进行拉取重试) 项目停止 > 停止所有的连接器 从Debezium Engine连接器管理应用来的请求流程大概是: 接收请求 > 校验请求合法性 > 校验请求参数合法性 > 根据类型做启动/停止对应连接器; PS: 在上面的各个重点操作节点 (都会进行机器人告警/日志打印) 日志级别是可动态调整的; [图片] 3.4、Debezium 消息重试应用 该应用主体功能就是 做多次失败的消息的重试; 项目启动之后, 启动一个线程池 + 一个线程组, 不断间隔定时循环 > 拉取debezium_connect_message_retry 表中未删除的数据, 进行关联查询相关信息, 进行消息重试; [图片] 3.5、Debezium Engine连接器消费应用 主要功能是: 接收连接器上游的调用, 解析统一的调用参数; 然后做自己的业务, 比如说是 做Redis的缓存更新, 如果进行Redis的处理失败了则进行简单的重试; 比如接收下面图片的参数 [图片] [图片] [图片] [图片] 4.0、剩余待解决的问题 (会议上记录) 4.1、Debezium Engine连接器高可用 (无需考虑高可用) 如果要实现连接器高可用情况, 多节点部署, 就需要考虑解决消息只消费一次的情况与性能情况; 4.2、多个 Debezium Engine连接器 对数据库的性能影响 (无影响) Debezium does not impact source database performance [图片] Debezium 官方不推荐对同一个数据源使用多个连接器,建议通过调整连接器配置来满足需求; 但是存在如果声音一个engine的情况, 监听延迟会变长; 4.2、 (会议上补充) 做成2个应用 [图片] ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~end~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ 4.3、(会议上补充) 消费应用增加 表 与 Redis keys的 映射关系 CREATE TABLE `auto_handle_redis_mapping` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `handle_table` varchar(255) NOT NULL DEFAULT '' COMMENT '处理的表名称', `handle_key` varchar(2555) NOT NULL DEFAULT '' COMMENT '处理的Redis_key集, 逗号分隔(例如:device_cache_id_{#deviceId})', `extra_groovy_script_file_path` TINYTEXT COMMENT '额外groovy脚本执行文件', `is_delete` tinyint(1) NOT NULL DEFAULT 0 COMMENT '删除状态(1:删除0:未删除)', `create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `remark` varchar(255) NOT NULL DEFAULT '' COMMENT '备注' CONSTRAINT PRIMARY KEY (`id`), KEY `idx_message_handle_table` (`handle_table`,`handle_key`), KEY `idx_create_time` (`create_time`), KEY `idx_update_time` (`update_time`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=DYNAMIC COMMENT='自动处理Redis删除逻辑表与keys映射'; 4.4、(会议上补充) 消费服务增加延迟双删除 4.5、 (会议上补充) 业务的缓存处理 5.0、Redis容量估算 集成demo遇到的错误: 启动失败 [图片] 检索后和尝试发现是MySQL连接版本不对, 默认的版本是 8.0.31 降级为8.0.27还是不行, 最终降级为8.0.21正常! io.debezium debezium-connector-mysql ${version.debezium} mysql mysql-connector-java mysql mysql-connector-java 8.0.21 Interrupted while emitting initial DROP TABLE events [图片] 原因被主线程关闭了, 解决方法开启新线程 debezium # offset 文件不生成 参考的Spring boot 整合demo https://juejin.cn/post/7348003004122939429 原因有个坑, GitHub源码没有调用该方法, 少了下图标记调用的方法, offset文件就不会生成! [图片]
应用到生产的源码: (加密) 暂时无法在飞书文档外展示此内容 暂时无法在飞书文档外展示此内容 采用kafka方式 (暂不研究): ZooKeeper服务, 目前我们已经在使用, 如果要独立出来, 可以单独配置一台 (生产 : 1核心1GB内存配置) Kafka服务 (生产集群, 配置4核心8G内存配置, 硬盘SSD) Debezium连接器服务 (生产集群, 配置4核心8G内存配置, 硬盘SSD) 测试环境 (可以将 三个服务部署在同一台机子, 只需要2核心4G) 运行启动步骤: MySQL启用binlog 1、启动ZK 2、启动kafka 3、启动Debezium连接器
1