当前位置: 首页 > article >正文

使用Canal将MySQL数据同步到ES(Linux)

一、Canal官网文档

去到官方文档根据官网文档进行操作:

QuickStart · alibaba/canal Wiki · GitHub

二、开启服务器中MySQL的binlog

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

进行修改:

 登录root

mysql -uroot -p
CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

改完后可以在MySQL命令行中使用下面的指令查看是否开启成功,value为ON就是开启成功:

show variables like 'log_bin';

注意注意:记得修改后一定要重启MySQL,我就是因为忘记重启了找了一天的bug ,不记得重启会出现后面第八点那个问题。

三、将canal-deployer下载到服务器

Releases · alibaba/canal · GitHub

cd /tmp/canal
tar zxvf canal.deployer-1.1.8-SNAPSHOT.tar.gz

解压后:

建议这边弄一个单独的文件夹把压缩包放进去再去解压,不然后面还有其他的会比较影响

弄一个类似这种文件夹,在对应的文件夹里面放压缩包,再去解压,后面我都是这种步骤,由于没有保留截图,上面那些还是最开始的演示,建议后面修改一下。

四、修改对应配置文件

五、启动canal 服务端

启动指令:

sh bin/startup.sh

关闭指令:

sh bin/stop.sh

参考:linux下启动Nacos报错解决:which: no javac in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)_java_silence_fengxuan-GitCode 开源社区

 这个是我服务器jdk下载的位置,去找到自己对应的位置即可,后面我改用jdk11了,11是没问题的,记得17可能有些问题,可以参考下面这个步骤来就行,版本可以用11。

jdk下载可以参考这篇博客:

linux下载安装jdk教程_linux jdk下载-CSDN博客

编辑下面这个文件

在这个文件内添加下面信息

export JAVA_HOME=/www/server/java/jdk-17.0.8/          
export JRE_HOME=$JAVA_HOME/jre
export CLASSPATH=$JAVA_HOME/lib:$JRE_HOME/lib:$CLASSPATH
export PATH=$JAVA_HOME/bin:$JRE_HOME/bin:$PATH

修改完,重新加载一下:

source /etc/profile

然后使用下面指令查看jdk版本,没问题即可:

java -version

再次执行启动命令

sh bin/startup.sh

查看输出日志:

启动成功!服务端启动成功!

六、客户端        

查看官方使用例子

ClientExample · alibaba/canal Wiki · GitHub

引入依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

code如下:

修改后启动:

然后我插入一条数据,也是监听到了。

 测试成功。

七、同步ES(Sync ES)

 

大概需要注意这几个东西。

下面给出我的部分配置:

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111 #记得去防火墙开启这个端口
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

  srcDataSources:
    defaultDS:
     url: jdbc:mysql://你的服务器IP:3306/你的DataSource名?useUnicode=true&useSSL=false&serverTimezone=UTC
     username: canal
     password: canal
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: http://你的服务器IP:9200
        properties: 
         mode: rest #9200端口的话就写rest
         cluster.name: elasticsearch

datasourceKey: defaultDS #源数据源的key,对应上面配置的srcDatasources中的值
destination: example #canal的instance或者MO的topic
groupId: g1 #对应MQ模式下的groupId,只会同步对应groupId的数据
esMapping:
  _index: user #es 的索引名称
  _id: _id #es 的 id,如果不配置该项必须配置下面的pk项 id则会由es自动分配
# 下面写你的MySQL需要配置表的查询语句(完整的语句)
  sql: "SELECT
    u.id AS _id,
    u.id ,
    u.openid,
    u.quick_user_id,
    u.mark_number,
    u.brief_introduction
FROM
    user u;" # sq1映射
commitBatch: 500 # 提交批大小

开启服务:

1.进入对应目录

cd /tmp/canal/canal.adapter

2.启动服务

sh bin/startup.sh

再去查看log日志:

启动成功

3.如果想要停止服务

 sh bin/stop.sh

七、测试同步

新增一条数据

更新一条数据

修改同步成功,注意上面一个配置文件中我写的查询语句并不完整,实际上是需要完整的,注意和自己表的查询语句一致,注意注意。。

八、曾遇过的bug

如果出现下面这种情况,可能是修改MySQL后没有重启MySQL。。。实习的时候被这这个问题折磨了一天,搜了各种办法,很逆天。

DEBuG c.a.0.canal.client.adapter.es,core.service.Essyncservice -ml: {"data":null,"database":"quick pickup","destination": "example","es":1732938564088,"groupId":"g1" "isDd1":false,"old":nul1,"okNames":[1,"sal

最后去重启了一下,结果发现成功了,成功监听到信息。


http://www.kler.cn/a/418617.html

相关文章:

  • STL算法之其他算法_上
  • 【聚类】K-Means 聚类(无监督)及K-Means ++
  • 把当下的快乐和长远的目标连接在一起。
  • 【Rust】unsafe rust入门
  • Power BI - Connect to SharePoint online list with Image column
  • 什么是EMS
  • node.js中实现MySQL的增量备份
  • 安卓mokey测试学习思路
  • Maya 中创建游戏角色的头发,并将其导出到 Unreal Engine 5
  • 23种设计模式之桥接设计模式
  • Hadoop生态圈框架部署(九)- Hive部署
  • 游戏启动时“msvcr120.dll文件丢失”是什么原因以及解决方案。四种解决办法轻松搞定“msvcr120.dll文件丢失”问题
  • hadoop集群搭建
  • 如何使用Spring Boot进行Web开发?
  • mysql系列2—InnoDB数据存储方式
  • Android内容提供者
  • SARIMA 模型Matlab代码
  • 制造入门知识-下篇
  • Qt中模拟鼠标消息并与系统鼠标消息进行区分
  • 信息收集-谷歌语法使用大全
  • 随时掌控健康,时刻监测血压,dido医疗级气泵血压手表评测
  • 单片机-- 复位的方式
  • stm32里一个定时器可以提供多路信号吗?
  • 简单web项目自定义部署Dockerfile
  • 数据分析学习
  • MLinear论文解析