mysql数据同步ES方案---DTS
在上一篇文章中,我通过一个简单的例子实现了如何通过 Canal 实现 MySQL 数据到 Elasticsearch 的同步,以满足增量捕获和实时同步的需求。然而实际情况中,比如在我之前工作的公司,为了减少运维工作量和代码操作的复杂性,我们通常选择使用云数据库。比如,我们的 MySQL 使用的是阿里云的云数据库 RDS 实例,Elasticsearch 则采用阿里云的云检索服务。所以这次文章中,我将基于阿里云的两个云数据库进行数据同步,利用阿里云的数据传输服务(DTS)来实现。DTS 的优点在于其高集成度和便捷性,使得数据同步更加简便高效。
准备工作
一、云数据库 RDS 实例配置及命令
-
登录阿里云控制台并创建 RDS MySQL 实例:
- 打开阿里云控制台,导航到 云数据库 RDS 服务页面。
- 按需选择 RDS MySQL 实例类型,配置实例的 规格、存储空间 和 网络。
- 创建实例后,进入实例详情页面,配置 数据库白名单,允许 DTS 和 Elasticsearch 实例访问 RDS。
- 设置好白名单后,创建一个具有足够权限的数据库用户账号,此账号将在数据同步过程中被 DTS 使用。
-
登录 RDS MySQL 实例并创建模拟数据表:
-
使用创建的账号登录 RDS MySQL 实例(可使用 MySQL 客户端、Navicat 等工具)。
-
创建一个日志表,用于模拟数据同步工作。表结构如下:
CREATE TABLE sync_log ( id INT AUTO_INCREMENT PRIMARY KEY, log_message VARCHAR(255), log_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP );
-
该表包含三个字段:
id
(主键)、log_message
(日志内容),和log_time
(记录时间),用于测试数据同步到 Elasticsearch 的效果。
-
-
插入初始数据:
-
为确保同步任务有数据可测试,插入一些测试日志数据:
INSERT INTO sync_log (log_message) VALUES ('Log entry 1'), ('Log entry 2'), ('Log entry 3');
-
执行以上 SQL 后,可以使用
SELECT * FROM sync_log;
命令查看表中的数据,确认插入成功。这些数据将通过 DTS 同步到 Elasticsearch,以验证同步配置的效果。
-
至此,云数据库 RDS 的准备工作已完成。
二、Elasticsearch 实例配置及命令
-
创建 Elasticsearch 实例:
- 在阿里云控制台选择 Elasticsearch 服务,创建一个新的实例。
- 配置实例规格、存储空间和访问策略,确保允许 DTS 和 RDS 实例访问此 Elasticsearch 实例。
-
关于索引创建:
- 如果需要 MySQL 表的字段结构与 Elasticsearch 索引保持一致,可以在 DTS 配置中选择 全表同步,DTS 会自动根据 MySQL 表结构创建对应的 Elasticsearch 索引和字段映射。
- 因此,无需手动创建索引或定义字段结构,DTS 同步后将自动完成索引创建。
-
验证索引创建:
-
同步任务完成后,可以在 Kibana 的 Discover 页面中查看自动生成的索引。
-
使用以下命令检查索引结构,确认字段设置正确:
GET sync_log/_mapping
-
完成这些准备工作后,接下来,可以使用 DTS 启动 MySQL 数据到 Elasticsearch 的同步任务。
数据同步前:MySQL 与 Elasticsearch 数据比对
在开始数据同步之前,我们可以先查看 MySQL 中的全量数据,并确认 Elasticsearch 中 sync_log
索引是否存在或为空。以下是具体步骤:
-
查看 MySQL 中的全量数据:
-
在 MySQL 控制台中运行以下命令,查看
sync_log
表的所有数据:SELECT * FROM sync_log;
-
-
查看 Elasticsearch 中的
sync_log
索引数据:-
确保
sync_log
索引在数据同步前不存在该索引或为空,以便清晰地观察同步效果。在 Kibana 的 Dev Tools 中使用以下命令,查询sync_log
索引中的前十条数据:GET /sync_log/_search { "size": 10 }
-
开始数据同步:将 MySQL 数据同步到 Elasticsearch
-
进入阿里云数据同步服务(DTS)控制台:
- 登录阿里云控制台,在顶部导航栏中找到并点击 数据传输服务(Data Transmission Service) ,进入 DTS 产品界面。
- 在 DTS 控制台页面,点击 数据同步,进入数据同步任务的管理页面。
-
创建同步任务:
- 在 数据同步 页面,点击 创建同步任务 按钮,开始配置新的同步任务。
-
- 弹出任务配置页面,按照以下步骤完成配置。
-
配置源数据库(RDS MySQL) :
- 在 源库信息 部分,选择 RDS实例 作为源库类型。
- 选择你创建的 RDS MySQL 实例,并填写相关的连接信息,如数据库名称和访问权限账号。
-
- 测试连接:点击 测试连接 按钮,确认源数据库连接无误后,继续下一步。
-
配置目标数据库(Elasticsearch) :
- 在 目标库信息 部分,选择 Elasticsearch 作为目标库类型。
- 输入 Elasticsearch 实例的连接信息,包括 访问地址、端口、以及对应的 访问权限账号和密码。
- 测试连接:点击 测试连接 按钮,确保 DTS 可以正常访问目标 Elasticsearch 实例。
-
选择同步对象:
-
配置完源和目标数据库信息后,进入 对象配置 配置页面。
-
在源数据库结构中选择要同步的表,例如
sync_log
表。 -
确保勾选了 全量同步 和 增量同步,以便将 MySQL 中的历史数据和后续新增数据都同步到 Elasticsearch。
-
-
高级设置(可选):
- 如果有特殊需求,可以在 高级设置 中配置数据转换、字段映射等细节。
- 确认所有设置无误后,点击 下一步 继续。
-
启动同步任务:
-
配置完成后,点击 预检查并启动,DTS 将对同步任务进行预检查,以确保配置正确。
-
预检查通过后,点击 购买 按钮,选择合适的数据链路并启动
备注: 这里是测试 故选择最低链路规格。实际要根据具体数据同步量选择合适的链路规格
-
返回 任务列表 页面,找到创建的同步任务,并监控任务状态,确认同步任务已正常启动并运行。
-
同步任务完成后,登录 Kibana 控制台。进入 Discover 页面,搜索并查看
sync_log
索引。通过使用以下命令,查询sync_log
索引中的前十条数据:GET /sync_log/_search { "size": 10 }
-
通过上述截图我们可以看到通过dts我们已经把mysql指定表中的数据和结构已经全部同步到Elasticsearch里面。
增量数据同步测试
-
向 MySQL 中插入增量数据:
-
在 MySQL 中,向
sync_log
表中插入一条新的数据,以测试增量同步是否正常工作。例如,执行以下 SQL 命令:INSERT INTO sync_log (log_message) VALUES ('Incremental log entry');
-
插入完成后,可以使用
SELECT * FROM sync_log;
命令查看新数据是否已添加到表中。
-
-
登录 Kibana 检查增量同步数据**:
-
等待 DTS 将增量数据同步到 Elasticsearch。
-
登录 Kibana 控制台,进入 Discover 页面,查看
sync_log
索引。 -
搜索或刷新数据,确认
sync_log
索引中是否包含刚刚插入的增量数据,确保 DTS 正确完成了增量同步。
-
通过上述截图我们可以看到 通过dts数据传输 mysql的数据变动也会同步到Elasticsearch里面。
总结
通过上面的步骤,我们成功地使用阿里云的数据传输服务(DTS)将 MySQL 数据同步到了 Elasticsearch,实现了全量和增量数据的同步。DTS 自动创建了与 MySQL 表结构一致的 Elasticsearch 索引,大大简化了配置过程,同时确保了数据的实时性和一致性。在实际操作中,DTS 提供了便捷的任务管理和监控功能,使我们能够随时查看同步进度并快速验证数据同步效果。