搭建 canal 监控mysql数据到 elasticsearch 中(本机到远端sql)
搭建 canal 监控mysql数据到 elasticsearch 中(本机到远端sql)
需求:
要将 MySQL 数据库 info 中的 notice 和 result 表的增、删、改操作同步到 Elasticsearch 的 notice 和 result 索引,您需要正确配置 MySQL、Canal 、Canal Adapter 、 Elasticsearch 和 kibana
系统 | rocky9.2 |
---|---|
IP | 192.168.174.136 |
IP | 192.168.174.137 |
服务/版本 |
---|
mysql:8.0.26 - - 192.168.174.137 |
Canal:1.1.8 - - 192.168.174.136 |
canal.adapter:1.1.8 - - 192.168.174.136 |
Elasticsearch:8.15.0 - - 192.168.174.136 |
kibana:8.15.0 - - 192.168.174.136 |
搭建mysql - Elasticsearch - kibana 参考文章
https://blog.csdn.net/yhl18931306541/article/details/141678279?spm=1001.2014.3001.5501
下载安装 Canal
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.adapter-1.1.8-SNAPSHOT.tar.gz
wget https://github.com/alibaba/canal/releases/download/canal-1.1.8-alpha-2/canal.deployer-1.1.8-SNAPSHOT.tar.gz
解压canal 与 canal-adapter
mkdir /usr/local/canal.adapter
mkdir /usr/local/canal
tar -xf canal.adapter-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal.adapter
tar -xf canal.deployer-1.1.8-SNAPSHOT.tar.gz -C /usr/local/canal
修改配置
cd /usr/local/canal/
vim conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# 这个很重要 相当于从库的 server-id 参数,必须要比主库的server-id数大
canal.instance.mysql.slaveId=1001
# enable gtid use true/false
canal.instance.gtidon=false
# position info
# 远端mysql地址
canal.instance.master.address=192.168.174.137:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
# mysql账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# 需要监控的数据库表
# table regex
canal.instance.filter.regex=info\\.devops,info\\.test
# table black regex
# 略过的数据库表
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#
# multi stream for polardbx
canal.instance.multi.stream.on=false
#################################################
vim conf/canal.properties
因为我们要将监控到的数据传到ES,所以修改地方比较少
canal.serverMode = tcp
canal.instance.tsdb.enable = true
# 集群的配置只要把H2改为mysql,因为要进行元数据管理。
#canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
#canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3066/canal_manager
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 123456
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_manager
canal.instance.tsdb.dbUsername = root
canal.instance.tsdb.dbPassword = 123456
# 这个数据库非远端的数据库,需要在本机或者其他机器上创建一个数据库,方便保存canal历史命令和日志
作者是在本机创建的mysql数据库
登录进去之后,创建的canal_manager数据库
> create database canal_manager
配置文件中注释的中文备注,记得观察。
启动 canal
./bin/startup.sh
# 查看日志
tail -f ./logs/canal/canal.log
输出如下及正常。
tail -f canal.deployer/logs/example/example.log
这个日志记录的 bin-log日志读取到什么地方了
如果报错如下:
[main] WARN o.s.context.support.ClassPathXmlApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'tableMetaTSDB' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'metaHistoryDAO' while setting bean property 'metaHistoryDAO'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'metaHistoryDAO' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'sqlSessionFactory' while setting bean property 'sqlSessionFactory'; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'sqlSessionFactory' defined in class path resource [spring/tsdb/h2-tsdb.xml]: Cannot resolve reference to bean 'dataSource' while setting bean property 'dataSource'; nested exception is org.springframework.beans.factory.CannotLoadBeanClassException: Cannot find class [com.alibaba.druid.pool.DruidDataSource] for bean with name 'dataSource' defined in class path resource [spring/tsdb/h2-tsdb.xml]; nested exception is java.lang.ClassNotFoundException: com.alibaba.druid.pool.DruidDataSource
处理
将druid的jar包放在lib目录就可以了。druid-1.2.22.jar测试通过
下载地址 :https://repo1.maven.org/maven2/com/alibaba/druid/1.2.22/
安装 Canal.adapter
cd /usr/local/canal.adapter
vim application.yml
将多余的删除,取其精华即可
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
# canal.tcp.server.host需要修改
# 本机地址
canal.tcp.server.host: 192.168.174.136:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
srcDataSources:
defaultDS:
# url,username,password需要修改 canal_manager是库名
# 这个地址也不是远端的地址,是本机地址
url: jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true
username: root
password: 123456
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: gl
outerAdapters:
- name: logger
# name需要修改
# 这个表示我们使用的是哪个适配器,es8 表示使用的是 es8 适配器,其他的可以参考解压后的 conf 下面的目录名称
- name: es8
# hosts需要修改(注意,要加上http://)
hosts: http://127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest
# security.auth: test:123456 # only used for rest mode
# cluster.name需要修改 ES集群名称
cluster.name: es-dev
适配器配置
在上面的启动器的配置中我们已经配置了 ES8 作为适配器,那具体要同步的是哪张表, 以及对应的 ES中是索引是哪个怎么配置呢?这些配置就放在适配器的配置里面, 每一个适配器的配置都是一个想要同步到 ES 的模板配置。
因为我使用的es8适配器,所以进到es8中
cd conf/es8
cp -v mytest_user.yml esMappingNotice.yml
rm -rf biz_order.yml customer.yml mytest_user.yml
vim esMappingNotice.yml
dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
_index: notice
_id: _id
_type: _doc
upsert: true
sql: "
SELECT
c.id AS _id,
c.title AS title,
c.content AS content,
DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
notice AS c
"
commitBatch: 3000
注释:
dataSourceKey: defaultDS
destination: example
outerAdapterKey: es-key
groupId: g1
上面的几个配置,都需要跟启动器里面的配置保持一致。
esMapping:该配置是表示的是如何将 MySQL 的数据同步到 ES 中,配置比较复杂,其中
_index 表示 ES 的索引(需要提前创建);
_id 和 pk 二选一配置,表示使用查询出来的哪个字段作为唯一值;
upsert 表示对应主键的数据不存在的时候执行插入动作,存在的时候执行更新动作;
sql:表示要同步的数据,这个的 SQL 形式要求会比较严格
而且 _id 必须要加别名,我索性把所有都改个别名
commitBatch: 3000 设置了每次批量提交的记录数量为 3000。这意味着每当 canal.adapter 收集到 3000 条记录时,
会将这些记录批量提交到 Elasticsearch。确保这个批量大小适合你的数据量和 Elasticsearch 的处理能力,
以避免超时或性能问题。如果你遇到性能瓶颈,可以尝试调整这个参数值,增大或减小批量大小来优化性能。
vim esMappingResult.yml
dataSourceKey: defaultDS
destination: example
groupId: gl
esMapping:
_index: result
_id: _id
_type: _doc
upsert: true
sql: "
SELECT
c.id AS _id,
c.user_id AS userid,
c.score AS score,
DATE_FORMAT (c.created_at, '%Y-%m-%dT%H:%i:%s') AS created,
DATE_FORMAT (c.updated_at, '%Y-%m-%dT%H:%i:%s') AS updated
FROM
result AS c
"
commitBatch: 3000
时间类型的表结构想要存到es中必须自定义转换器或格式化程序,将 Timestamp 转换为 Elasticsearch 支持的日期格式
(例如 ISO 8601 格式)。 否则导入时报错
ERROR c.a.otter.canal.client.adapter.es8x.etl.ESEtlService - cannot write xcontent for unknown value of type class java.sql.Timestamp
java.lang.IllegalArgumentException: cannot write xcontent for unknown value of type class java.sql.Timestamp
然后启动canal.adapter
赋权:
cd /usr/local/canal.adapter/
chmod 777 -R conf/es8
./bin/startup.sh
tail -f logs/adapter/adapter.log
2024-08-30 15:06:03.275 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterLoader - Start adapter for canal-client mq topic: example-gl succeed
2024-08-30 15:06:03.275 [main] INFO c.a.o.canal.adapter.launcher.loader.CanalAdapterService - ## the canal client adapters are running now ......
2024-08-30 15:06:03.275 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Start to connect destination: example <=============
2024-08-30 15:06:03.285 [main] INFO c.a.otter.canal.adapter.launcher.CanalAdapterApplication - Started CanalAdapterApplication in 3.543 seconds (JVM running for 4.264)
2024-08-30 15:06:03.368 [Thread-3] INFO c.a.otter.canal.adapter.launcher.loader.AdapterProcessor - =============> Subscribe destination: example succeed <=============
2024-08-30 15:09:27.964 [http-nio-8081-exec-1] INFO o.a.catalina.core.ContainerBase.[Tomcat].[localhost].[/] - Initializing Spring DispatcherServlet 'dispatcherServlet'
输出如上则为成功
然后测试
# 插入数据,查看一下Canal.adapter是否可以读到数据
登录到mysql中
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (22, 'New Notice', 'This is a new notice', NOW(), NOW());
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (23, 'New Notice', 'This is a new notice', NOW(), NOW());
tail -f logs/adapter/adapter.log
输出如下:说明成功
2024-08-30 15:03:05.827 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":22,"title":"New Notice","content":"This is a new notice","created_at":1725001385000,"updated_at":1725001385000}],"database":"info","destination":"example","es":1725001385000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001385626,"type":"INSERT"}
2024-08-30 15:03:15.858 [pool-3-thread-1] INFO c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: {"data":[{"id":23,"title":"New Notice","content":"This is a new notice","created_at":1725001395000,"updated_at":1725001395000}],"database":"info","destination":"example","es":1725001395000,"groupId":"gl","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"notice","ts":1725001395857,"type":"INSERT"}
然后全量导入一次数据
curl "localhost:8081/etl/es8/esMappingNotice.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:23 条"}
esMappingNotice.yml 则为适配器文件的名称。
curl "localhost:8081/etl/es8/esMappingResult.yml" -X POST
{"succeeded":true,"resultMessage":"导入ES 数据:20 条"}
然后打开kibana 或者 elasticsearch-head-5.0.0
作者这里使用的是 elasticsearch-head-5.0.0
验证同步配置
在 MySQL 数据库中执行一些增、删、改操作,例如:
登录到mysql中,
INSERT INTO notice (id, title, content, created_at, updated_at) VALUES (1, 'New Notice', 'This is a new notice', NOW(), NOW());
UPDATE notice SET content = 'Updated content' WHERE id = 1;
DELETE FROM notice WHERE id = 1;
#### 查询_id 为 10 11 的sql 已经删除
注释
如果刚来的小伙伴看不懂 一定要结合作者下面这篇文章部署,
这里,这里 注释中所提的下面上面都是这篇文章 👆👇