Linux安装canal
Linux安装canal
文章目录
- Linux安装canal
- msql修改
- 1. mysql创建用户并授权
- 2. 修改mysql的配置文件
- 3. 重启mysql
- 4. 检查是否打开binlog
- 5. 查看binlog日志列表和当前正在写入的binlog
- canal
- 1. 下载canal:
- 2. 在服务器上解压
- 3. 修改配置文件
- 4. 启动canal
- 5. 查看日志是否正常
- 6. 别忘了开放防火墙,默认端口号:11111
- java
- 1. pom.xml
- 2. application.yml
- 3. spring.factories
- 4. CannalProperties.java
- 5. 新建CanalUtil.java
- 开始测试
- 1. 启动项目
- 2. 数据库中执行insert语句
- 3. 控制台查看结果
- 4. 结合到es上使用
msql修改
1. mysql创建用户并授权
-- 创建用户 用户名:canal 密码:xxxxxx
create user 'canal'@'%' identified by 'xxxxxx';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'xxxxxx';
2. 修改mysql的配置文件
[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1
3. 重启mysql
4. 检查是否打开binlog
show variables like 'log_bin';
5. 查看binlog日志列表和当前正在写入的binlog
# 查看binlog列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;
记着正在写入的binlog名称 mysql-bin.000001
canal
1. 下载canal:
地址:https://github.com/alibaba/canal/releases,选择deployer的版本
2. 在服务器上解压
tar -zxvf 压缩包名
3. 修改配置文件
相对路径:/canal/conf/example/instance.properties
//数据库连接地址
canal.instance.master.address=xx.xx.xx.xx:3306
//刚才记录的正在写入的binlog
canal.instance.master.journal.name=mysql-bin.000006
//刚才创建的数据库用户
canal.instance.dbUsername=canal
//刚才创建的数据库用户密码
canal.instance.dbPassword=XXXXXX
4. 启动canal
相对路径:/canal/bin 下
stop.sh 停止
restart.sh 重启
startup.sh 启动
到/bin下 执行 ./startup.sh 启动
5. 查看日志是否正常
相对路径:/canal/logs/example/example.log
如果直接输出下面的就没问题了,
按上面步骤正常无问题,如果有异常,请复制example.log中的异常自行百度查询
2023-05-06 18:19:46.374 [New I/O server worker #1-2] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:23:11.578 [New I/O server worker #1-4] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:23:11.579 [New I/O server worker #1-4] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:27:43.075 [New I/O server worker #1-1] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:27:43.075 [New I/O server worker #1-1] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
6. 别忘了开放防火墙,默认端口号:11111
java
1. pom.xml
<!--canal-->
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.3</version>
</dependency>
2. application.yml
canal-monitor-mysql:
host-name: ip地址
port: 端口号
database-name: 数据库名
table-name: 表名,暂时未使用
3. spring.factories
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.w.yizhi.common.oss.CanalUtil
4. CannalProperties.java
package com.w.yizhi.common.oss.properties;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
@Data
@Configuration
@ConfigurationProperties(prefix = "canal-monitor-mysql")
public class CannalProperties {
private String hostName;
private Integer port;
private String tableName;
private String databaseName;
}
5. 新建CanalUtil.java
package com.w.yizhi.common.oss;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.w.yizhi.common.oss.properties.CannalProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
@Component
@EnableConfigurationProperties({ CannalProperties.class })
@Slf4j
public class CanalUtil {
private final static int BATCH_SIZE = 10000;
/**
* 打印canal server解析binlog获得的实体类信息
*/
private static void handleDATAChange(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
//RowChange对象,包含了一行数据变化的所有特征
CanalEntry.RowChange rowChage;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
switch (eventType) {
/**
* 删除操作
*/
case DELETE:
printColumn(rowData.getBeforeColumnsList());
break;
/**
* 添加操作
*/
case INSERT:
printColumn(rowData.getAfterColumnsList());
break;
/**
* 更新操作
*/
case UPDATE:
printColumn(rowData.getAfterColumnsList());
break;
default:
break;
}
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
@Bean
public Boolean startMonitorSQL(CannalProperties cannalProperties) {
while (true) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(cannalProperties.getHostName(), cannalProperties.getPort()), "example", "", "");
try {
//打开连接
connector.connect();
log.info("数据库检测连接成功!");
connector.checkValid();
//订阅数据库表,全部表q
connector.subscribe(cannalProperties.getTableName() + "\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
while (true) {
// 获取指定数量的数据
Message message = connector.getWithoutAck(BATCH_SIZE);
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep(1000);
} else {
handleDATAChange(message.getEntries());
}
// 提交确认
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
log.error("成功断开监测连接!尝试重连");
} finally {
connector.disconnect();
//防止频繁访问数据库链接: 线程睡眠 10秒
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
开始测试
1. 启动项目
09:27:13.344 [restartedMain] INFO c.w.y.c.o.CanalUtil - [canal,92] - 数据库检测连接成功!
2. 数据库中执行insert语句
INSERT INTO `ry-admin`.`bs_note_info` (`id`, `title`, `context`, `file_name`, `bucket_name`, `original`, `type`, `file_size`, `create_user_id`, `create_user_name`, `create_time`, `update_user_id`, `update_user_name`, `update_time`, `del_flag`) VALUES (NULL, 'title', 'context', NULL, NULL, NULL, NULL, NULL, 1, 'admin', '2023-04-26 21:09:09', NULL, NULL, NULL, '0');
3. 控制台查看结果
09:30:13.345 [restartedMain] INFO c.w.y.c.o.CanalUtil - [handleDATAChange,47] - Canal监测到更新:【bs_note_info】
id : 14 update=true
title : title update=true
context : context update=true
file_name : update=true
bucket_name : update=true
original : update=true
type : update=true
file_size : update=true
create_user_id : 1 update=true
create_user_name : admin update=true
create_time : 2023-04-26 21:09:09 update=true
update_user_id : update=true
update_user_name : update=true
update_time : update=true
del_flag : 0 update=true