当前位置: 首页 > article >正文

canal1.1.7实战

1.环境搭建

canal可以用来监听mysql数据库的变化,用来同步数据

先下载最新的部署版本,release地址:Releases · alibaba/canal · GitHub

包下载地址: https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz

 下载完后,在linux上新建一个canal文件夹,放入tar包解压: tar -zxvf canal.xxx.tar.gz

解压完后修改配置文件

查看conf/canal.properties,其中canal.port是客户端连接的端口,需要放开,canal.admin.user和canal.admin.passwd是客户端连接的账号

再打开conf/example/ instance.properties, master.address填数据库地址,dbUsername和dbPassword是数据库账号,flter.regex可以用来过滤数据库,默认是监听所有数据库,如果想监听db_开头的数据可以这么写db_.*\\..*,多个用逗号分隔

 修改完成后,进入bin目录,执行./startup.sh是启动,./stop.sh是关闭

进入logs/example,执行tail -f -n 300 example.log,看到以下输出说明搭建成功了

 2.客户端代码

引入依赖

  <dependencies>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.7</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.protocol</artifactId>
      <version>1.1.7</version>
    </dependency>
  </dependencies>

代码实现:

package cn.hollycloud.iplatform;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.junit.Test;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
 * Unit test for simple App.
 */
@Slf4j
public class CanalTest {
    private Map<String, String> errorMap = new HashMap<>();

    @Test
    public void testCanal() {
        initThread();
    }

    private void initThread() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    try {
                        initConnect();
                    } catch (Exception e) {
                        String key = "canal_connection_error";
                        if (!hasSameError(key, e.getMessage())) {
                            log.error("canal连接出错: {}", e);
                        }
                    }
                    try {
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                    }
                }
            }
        }).start();
    }

    private void initConnect() {
        String canalIp = "localhost";
        int canalPort = 11111;
        String canalDestination = "example";
        String canalUsername = "admin";
        String canalPassword = "123456";
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp,
                canalPort), canalDestination, canalUsername, canalPassword);
        int batchSize = 200;
        try {
            connector.connect(); // 连接到canal server
            connector.subscribe("db_.*\\..*"); // 订阅指定的消息
            connector.rollback(); // 回滚到未进行ack 的地方
            log.info("canal连接成功");
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        //未获取到消息则睡眠
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                    }
                } else {
                    try {
                        //处理消息
                        log.info("从canal接收到: {} 条消息,消息批次: {},开始处理", size, message.getId());
                        handleMessage(message.getEntries());
                    } catch (Exception e) {
                        connector.rollback(batchId); // 处理失败, 回滚数据
                        String key = "canal_sync_data_error";
                        String errMsg = e.getMessage();
                        if (StringUtils.isEmpty(errMsg)) errMsg = e.toString();
                        if (!hasSameError(key, errMsg)) {
                            log.error("同步数据出错: {}", e);
                        }
                        //休眠一段时间继续获取数据
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                        continue;
                    }
                }
                connector.ack(batchId); // 提交确认
            }
        } finally {
            connector.disconnect();
        }
    }

    private boolean hasSameError(String key, String error) {
        String lastError = errorMap.get(key);
        if (Objects.equals(lastError, error)) {
            return true;
        }
        errorMap.put(key, error);
        return false;
    }

    private void handleMessage(List<CanalEntry.Entry> entrys) throws InvalidProtocolBufferException {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                continue;
            }
            //根据数据库名获取租户名
            String databaseName = entry.getHeader().getSchemaName();
            String tableName = entry.getHeader().getTableName();
            log.info("数据库: {}, 表名: {}", databaseName, tableName);
            // 获取类型
            CanalEntry.EntryType entryType = entry.getEntryType();

            // 获取序列化后的数据
            ByteString storeValue = entry.getStoreValue();
            if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
                // 反序列化数据
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
                // 获取当前事件的操作类型
                CanalEntry.EventType eventType = rowChange.getEventType();
                if (eventType == CanalEntry.EventType.INSERT || eventType == CanalEntry.EventType.UPDATE
                        || eventType == CanalEntry.EventType.DELETE) {
                    // 获取数据集
                    List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
                    // 遍历rowDataList,并打印数据集
                    for (CanalEntry.RowData rowData : rowDataList) {
                        List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
                        List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
                        // 变更前数据
                        for (CanalEntry.Column column : beforeColumnsList) {
                            log.info("变更前数据: name: {}, value: {}", column.getName(), column.getValue());
                        }
                        // 变更后数据
                        for (CanalEntry.Column column : afterColumnsList) {
                            log.info("变更后数据: name: {}, value: {}", column.getName(), column.getValue());
                        }
                    }
                }
            }
        }
    }
}


http://www.kler.cn/a/132537.html

相关文章:

  • 【Docker】在 Ubuntu 上安装 Docker 的详细指南
  • STM32 ADC --- 任意单通道采样
  • 1Panel 推送 SSL 证书到阿里云、腾讯云
  • 在Qt(以及C++)中, 和 * 是两个至关重要的符号--【雨露均沾】
  • PyTorch实战-手写数字识别-单层感知机
  • Qt 之 qwt和QCustomplot对比
  • Office Word 中的宏
  • C#WPF数据模板应用实例
  • 防止显卡掉卡的一种方法:nvidia-smi -pm 1
  • adb shell settings高级指令设置系统属性所有的指令汇总+注释
  • 盘点60个Python各行各业管理系统源码Python爱好者不容错过
  • Topaz Video AI:引领视频质量革命,让您的内容焕发新生
  • VSCode使用MinGW中的go并支持CGO
  • 多模态大一统:开启全模态LLM和通用AI时代的大门
  • Go 语言结构体验证详解:validate 标签与自定义规则
  • Bash openldap同步AD组织数据
  • HarmonyOS真机调试报错:INSTALL_PARSE_FAILED_USESDK_ERROR处理
  • bug-xss 攻击漏洞问题
  • 技术架构 - 应用数据分离,应用服务集群架构
  • asp.net core mvc 控制器使用配置
  • 专注于绘画,不受限制!尝试Growly Draw for Mac的快速绘画应用
  • spider 网页爬虫中的 AWS 实例数据获取问题及解决方案
  • 阿里云服务器 手动搭建WordPress(CentOS 8)
  • AI Navigation导航系统_unity基础开发教程
  • 带你快速掌握Linux最常用的命令(图文详解)- 最新版(面试笔试常考)
  • 如何使用Java实现权限认证和登录jwt