当前位置: 首页 > article >正文

Linux安装canal

Linux安装canal

文章目录

  • Linux安装canal
    • msql修改
      • 1. mysql创建用户并授权
      • 2. 修改mysql的配置文件
      • 3. 重启mysql
      • 4. 检查是否打开binlog
      • 5. 查看binlog日志列表和当前正在写入的binlog
    • canal
      • 1. 下载canal:
      • 2. 在服务器上解压
      • 3. 修改配置文件
      • 4. 启动canal
      • 5. 查看日志是否正常
      • 6. 别忘了开放防火墙,默认端口号:11111
    • java
      • 1. pom.xml
      • 2. application.yml
      • 3. spring.factories
      • 4. CannalProperties.java
      • 5. 新建CanalUtil.java
    • 开始测试
      • 1. 启动项目
      • 2. 数据库中执行insert语句
      • 3. 控制台查看结果
      • 4. 结合到es上使用

msql修改

1. mysql创建用户并授权

-- 创建用户 用户名:canal 密码:xxxxxx
create user 'canal'@'%' identified by 'xxxxxx';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'xxxxxx';

2. 修改mysql的配置文件

[mysqld]
# 打开binlog
log-bin=mysql-bin
# 选择ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定义,不要和canal的slaveId重复
server_id=1

3. 重启mysql

4. 检查是否打开binlog

show variables like 'log_bin';

5. 查看binlog日志列表和当前正在写入的binlog

# 查看binlog列表
show binary logs;
# 查看当前正在写入的binlog文件
show master status;

记着正在写入的binlog名称 mysql-bin.000001

canal

1. 下载canal:

地址:https://github.com/alibaba/canal/releases,选择deployer的版本

2. 在服务器上解压

tar -zxvf 压缩包名

3. 修改配置文件

相对路径:/canal/conf/example/instance.properties

//数据库连接地址
canal.instance.master.address=xx.xx.xx.xx:3306
//刚才记录的正在写入的binlog
canal.instance.master.journal.name=mysql-bin.000006
//刚才创建的数据库用户
canal.instance.dbUsername=canal
//刚才创建的数据库用户密码
canal.instance.dbPassword=XXXXXX

4. 启动canal

相对路径:/canal/bin 下

stop.sh 停止
restart.sh 重启
startup.sh 启动

到/bin下 执行 ./startup.sh 启动

5. 查看日志是否正常

相对路径:/canal/logs/example/example.log

如果直接输出下面的就没问题了,
按上面步骤正常无问题,如果有异常,请复制example.log中的异常自行百度查询

2023-05-06 18:19:46.374 [New I/O server worker #1-2] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:23:11.578 [New I/O server worker #1-4] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:23:11.579 [New I/O server worker #1-4] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$
2023-05-06 18:27:43.075 [New I/O server worker #1-1] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - subscribe filter change to 数据库名\..*
2023-05-06 18:27:43.075 [New I/O server worker #1-1] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^数据库名\..*$

6. 别忘了开放防火墙,默认端口号:11111

java

1. pom.xml

<!--canal-->
<dependency>
	<groupId>com.alibaba.otter</groupId>
	<artifactId>canal.client</artifactId>
	<version>1.1.3</version>
</dependency>

2. application.yml

canal-monitor-mysql:
  host-name: ip地址
  port: 端口号
  database-name: 数据库名
  table-name: 表名,暂时未使用

3. spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.w.yizhi.common.oss.CanalUtil

4. CannalProperties.java

package com.w.yizhi.common.oss.properties;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Data
@Configuration
@ConfigurationProperties(prefix = "canal-monitor-mysql")
public class CannalProperties {

    private String hostName;
    private Integer port;
    private String tableName;
    private String databaseName;

}

5. 新建CanalUtil.java

package com.w.yizhi.common.oss;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.w.yizhi.common.oss.properties.CannalProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

@Component
@EnableConfigurationProperties({ CannalProperties.class })
@Slf4j
public class CanalUtil {
    private final static int BATCH_SIZE = 10000;

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void handleDATAChange(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);
            }
            CanalEntry.EventType eventType = rowChage.getEventType();
            log.info("Canal监测到更新:【{}】", entry.getHeader().getTableName());
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {

                switch (eventType) {
                    /**
                     * 删除操作
                     */
                    case DELETE:
                        printColumn(rowData.getBeforeColumnsList());
                        break;
                    /**
                     * 添加操作
                     */
                    case INSERT:
                        printColumn(rowData.getAfterColumnsList());
                        break;
                    /**
                     * 更新操作
                     */
                    case UPDATE:
                        printColumn(rowData.getAfterColumnsList());
                        break;
                    default:
                        break;
                }
            }

        }
    }
    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

    @Bean
    public Boolean startMonitorSQL(CannalProperties cannalProperties) {
        while (true) {
                CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(cannalProperties.getHostName(), cannalProperties.getPort()), "example", "", "");
            try {
                //打开连接
                connector.connect();
                log.info("数据库检测连接成功!");
                connector.checkValid();
                //订阅数据库表,全部表q
                connector.subscribe(cannalProperties.getTableName() + "\\..*");
                //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
                connector.rollback();
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(BATCH_SIZE);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        Thread.sleep(1000);
                    } else {
                        handleDATAChange(message.getEntries());
                    }
                    // 提交确认
                    connector.ack(batchId);
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.error("成功断开监测连接!尝试重连");
            } finally {
                connector.disconnect();
                //防止频繁访问数据库链接: 线程睡眠 10秒
                try {
                    Thread.sleep(10 * 1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

开始测试

1. 启动项目

09:27:13.344 [restartedMain] INFO  c.w.y.c.o.CanalUtil - [canal,92] - 数据库检测连接成功!

2. 数据库中执行insert语句

INSERT INTO `ry-admin`.`bs_note_info` (`id`, `title`, `context`, `file_name`, `bucket_name`, `original`, `type`, `file_size`, `create_user_id`, `create_user_name`, `create_time`, `update_user_id`, `update_user_name`, `update_time`, `del_flag`) VALUES (NULL, 'title', 'context', NULL, NULL, NULL, NULL, NULL, 1, 'admin', '2023-04-26 21:09:09', NULL, NULL, NULL, '0');

3. 控制台查看结果

09:30:13.345 [restartedMain] INFO  c.w.y.c.o.CanalUtil - [handleDATAChange,47] - Canal监测到更新:【bs_note_info】
id : 14    update=true
title : title    update=true
context : context    update=true
file_name :     update=true
bucket_name :     update=true
original :     update=true
type :     update=true
file_size :     update=true
create_user_id : 1    update=true
create_user_name : admin    update=true
create_time : 2023-04-26 21:09:09    update=true
update_user_id :     update=true
update_user_name :     update=true
update_time :     update=true
del_flag : 0    update=true

4. 结合到es上使用


http://www.kler.cn/news/18163.html

相关文章:

  • Go type关键字定义新类型和类型别名的区别
  • io,nio,aio区别
  • 测试开发如何进阶?需要哪些能力?吐血整理-你的进阶之路...
  • 深入理解移动端布局:Viewport与设备像素比
  • linux命令之kill详解
  • UICollectionView 实现整页翻动(每页3个cell)
  • Android 9.0 Camera2 拍照功能默认选前摄像头
  • 【论文阅读】A Comparative Study on Camera-Radar Calibration Methods
  • 如何提高执行力
  • 图数据库游记
  • 代码随想录算法训练营day28 | 93.复原IP地址,78.子集,90.子集II
  • 回文数:探索数字世界中的对称美学
  • spark练习例子——单词计数——pyspark
  • Java基础--->基础部分(2)【Java值传递】
  • 项目搭建—常用的插件
  • 基于R语言APSIM模型
  • 国民技术N32G430开发笔记(19)- IAP 升级 I2C1 从机收发数据
  • 本地字体库的引入方法
  • 程序设计的三种结构-C中实现其的6条语句
  • 数据导向下制造业的生产效率、交易效率提升办法
  • 【ESD专题】案例:TVS管钳位电压能不能通过TLP测试数据表征?
  • 【CMIP6月、日数据】【ERA5-LAND陆面再分析数据】【全球VIPPHEN物候数据】
  • javaScript---设计模式-设计模式概论
  • TypeScript基础
  • Chapter 7:XDC Precedence (ug903)
  • TreeMap源码分析,Collections工具类的使用
  • 相对路径的详细用法
  • 行为型模式-中介者模式
  • 武忠祥老师每日一题||定积分基础训练(十)
  • 9大Python常用技巧 经验之谈