当前位置: 首页 > article >正文

Canal实战使用(集群部署)和原理解析

1.mysql数据同步工作原理

 

MySQL master将数据变更写入二进制日志(binary log,其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

MySQL slave将master的binary log events拷贝到它的中继日志(relay log)

MySQL slave重放relay log中事件,将数据变更反映它自己的数据

2.canal工作原理

canal模拟mysql数据同步的原理,伪装自己为mysql slave,向mysql master发送dump协议。

mysql master收到dump请求,开始推送binary log给slave(canal)

使用步骤

1.数据库需要开启binlog

show variables like 'binlog_format'
show variables like 'log_bin'

显示未on则开启。

查看binlog日志


show binlog events in 'DJT2TRG3-bin.000433';

可以查找执行操作的position

可以通过修改canal的meta.dat文件 修改位置 从之前的位置开始同步操作。

下载canal 服务端 canal deployer 和服务端 canal adapter。

下载完成解压后,修改配置文件,由于主要的内容就是修改配置文件,以下贴出我使用的配置:

服务端配置文件canal_local.properties:

# register ip
canal.register.ip = 127.0.0.1

# canal admin config
canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal-cluster-1
canal.admin.register.name = canal-server-2

canal.properties(可以在canal-admin中配置)

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =

canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info 如果不配置的话 canal使用的数据存在内置数据库H2,重启等会丢失数据
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

#################################################
######### 		destinations		#############
#################################################
canal.destinations = example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 	      MQ Properties      #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=

canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local

canal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8

##################################################
######### 		     Kafka 		     #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

##################################################
######### 		    RocketMQ	     #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = 

##################################################
######### 		    RabbitMQ	     #############
##################################################
rabbitmq.host =
rabbitmq.virtual.host =
rabbitmq.exchange =
rabbitmq.username =
rabbitmq.password =
rabbitmq.deliveryMode =

Instance.properties

# MySQL 集群配置中的 serverId 概念,需要保证和当前 MySQL 集群中 id 唯一 (v1.0.26+版本之后 canal 会自动生成,不需要手工指定)
# canal.instance.mysql.slaveId=0
# 源库地址
canal.instance.master.address=11.17.6.185:4679

# 方式一:binlog + postion 如果用方式一 也可以不用填position 和binlog canal会自动获取
# mysql源库起始的binlog文件
# canal.instance.master.journal.name=mysql-bin.000008
# mysql主库链接时起始的binlog偏移量
# canal.instance.master.position=257890708 
# mysql主库链接时起始的binlog的时间戳
# canal.instance.master.timestamp=

# 方式二:gtid(推荐)
# 启用 gtid 方式同步
canal.instance.gtidon=true
# gtid
canal.instance.master.gtid=92572381-0b9c-11ec-9544-8cdcd4b157e0:1-1066141


# 源库账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# MySQL 数据解析编码
canal.instance.connectionCharset = UTF-8

# MySQL 数据解析关注的表,Perl 正则表达式
canal.instance.filter.regex=acpcanaldb.*

# MySQL 数据解析表的黑名单,过滤掉不同步的表
# canal.instance.filter.black.regex=mysql\\.slave_.*

客户端配置:

my_test.yml(adapter\conf\rdb目录下可以添加多个.yml,都会读取)

dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
#dbMapping:
#  database: account
#  table: demo
#  targetTable: demo
#  targetPk:
#    id: id
#  mapAll: true
#  targetColumns:
#    id:
#    name:
#    test:
#    role_id:
#    c_time:
#    test1:
#  etlCondition: "where c_time>={}"
#  commitBatch: 3000 # 批量提交的大小


## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
##使用mirror方式 整个库的改动都会同步 但是必须两个库保持一致,不同的已存在表不会处理,另外因为是
##两个同名库,所以必须两个服务器上(两个ip)搭建mysql
##如果不用mirror方式,只会同步一个表,新增表添加yml文件,比较灵活
dbMapping:
  mirrorDb: true
  database: account

配置完成后,先启动服务端(省略zk安装,较为简单可以自己百度),然后启动客户端,新建表添加数据进行测试。需要注意如果要同步DDL语句,必须用mirror方式,否则只能同步dml单表记录。

下边是运行截图:

 

 

 


http://www.kler.cn/a/18028.html

相关文章:

  • Mysql 8迁移到达梦DM8遇到的报错
  • wordpress搭建主题可配置json
  • PCL 点云分割 基于CPC算法的分割
  • PyTorch版本的3D网络Grad-CAM可视化实验记录
  • uniapp vue里按钮上的文字,换行的方法,用rich-text
  • SpringBoot单体服务无感更新启动,动态检测端口号并动态更新
  • 代码随想录算法训练营day32 | 贪心算法:122.买卖股票的最佳时机II ,55. 跳跃游戏,45.跳跃游戏II
  • 计算机基础必读书籍
  • 从零开始学习Linux运维,成为IT领域翘楚(二)
  • vue3 - 超详细头像裁剪并上传到服务器,支持按照自定义比例裁切图片效果组件插件(详细示例源码教程,一键复制运行开箱即用)
  • 网络安全——传输层安全协议(3)
  • 深度学习入门篇1
  • Filter过滤器和Listener监听器
  • 【致敬未来的攻城狮计划】— 连续打卡第二十五天:RA2E1的 DTC传输模式
  • 深入浅出堆—C语言版【数据结构】
  • 为什么需要使用Docker
  • 掌握这些GitHub搜索技巧,你的开发效率将翻倍!
  • 使用MindSDK的at-server组件开发从机模组
  • ScriptableObject上的prefab内容暂用,ScriptableObject详解
  • random — 伪随机数生成器(史上总结最全)
  • C++学习day--09 字符串比较、运算符
  • 【Java多线程编程】创建线程的基本方式
  • 【Linux】浅谈网络协议栈-网桥br0
  • 分布式锁Redisson对于(不可重入、不可重试、超时释放、主从一致性)四个问题的应对
  • Python人工智能—线性回归
  • C++面试题