当前位置: 首页 > article >正文

通过MySQL binlog日志,使用canal同步分库分表数据,到 Elasticsearch

前言
通过简单干净实践的方式教会读者,配置出一套 Canal 工具服务,来同步分库分表的数据到 Elasticsearch 文件夹系统中。同时在 SpringBoot 工程中,配置出两套数据源,一套是 MySQL + MyBatis,一套是 Elasticsearch + MyBatis。【这是非常重要的设计手段】

工程代码里面已提供完整需要的本次环境的docker安装环境 如下的目录
docs/dev-ops/tech-canal-docker-compose.yml:提供了所需的环境安装,mysql、canal-server、canal-adapter、elasticsearch、kibana

组件
在这里插入图片描述
canal ,译为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

它的工作原理是,canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议。在 MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal ) 这样 canal 再解析 binary log (binlog)进行配置分发,同步到 Elasticsearch 等系统中进行使用。

那么有了 canal 就可以把分库分表的数据同步到 Elasticsearch,提供汇总查询和聚合操作,也就不需要把轮训每个分库分表数据了。

测试预期
案例会把MySQL,2库4表的数据,通过 Sharding 分库分表写入数据后,同步到 Elasticsearch。分库分表如下(环境安装中会自动安装数据库和设置库表);

环境安装
部署环境
在这里插入图片描述
如果是在 Linux 安装了 docker 可以把 dev-ops 整个文件夹都上传到云服务器,之后通过脚本;

docker-compose -f xfg-dev-tech-canal-docker-compose.yml up -d

进行安装。
开启 binlog
mysql 数据同步需要创建一个 canal 的账户,之后还需要开启 binlog 日志。
在这里插入图片描述
在这里插入图片描述
1:在 mysql 配置文件夹中,设置了初始化授权的账户、导入的库表,以及开启 mysql-bin 和配置要采集的库。
2:如果你有配置自己其他的库要同步也可以如此配置。
库表采集配置
在这里插入图片描述
本文选择的是 es 同步方式,所以需要在 canal-adapter 中 es7 文件夹添加同步的库表 yml 配置。

以及在 application.yml 中配置出需要链接的库表以及同步的目标地址,也就是 es 的地址。【因为本文的案例是在同一个 docker compose 下安装,所以直接用名称 elsticsearch 即可访问】

环境运行状态
在这里插入图片描述
安装完成后可以进入 portainer 查看各个组件的运行,如果有哪个运行失败了,可以点击那个小文件的图标,它可以查看日志。

创建索引
在 doc/dev-ops/curl 下提供了创建 Elasticsearch 的脚本;你可以点击执行或者直接复制执行,也可以复制导入到 ApiPost 里执行。

以上这些脚本是为了创建出数据库表同步到 Elasticsearch 后对应的索引和映射的字段。文章下面会用到。
创建

curl -X PUT "127.0.0.1:9200/tech.user_order" -H 'Content-Type: application/json' -d'
{
    "mappings": {
      "properties": {
        "_user_id":{"type": "text"},
        "_user_name":{"type": "text"},
        "_order_id":{"type": "text"},
        "_uuid":{"type": "text"},
        "_create_time":{"type": "date"},
        "_update_time":{"type": "date"}
      }
    }
}'

添加

curl -X PUT "127.0.0.1:9200/tech.user_order/_mapping" -H 'Content-Type: application/json' -d'
{
  "properties": {
    "_sku_name": {
      "type": "text"
    }
  }
}'

删除

curl -X DELETE "127.0.0.1:9200/tech.user_order"

创建索引(Kibana)
地址:http://127.0.0.1:5601/app/management/kibana/indexPatterns
在这里插入图片描述
数据页面
http://127.0.0.1:5601/app/discover
在这里插入图片描述
许可证
kibana 提供了免费30天的试用许可,安装后可以使用 x-pack-sql-jdbc。它的好处是可以让我们通过 MyBatis 的方式查询 Elasticsearch 数据。

地址:http://127.0.0.1:5601/app/management/stack/license_management
在这里插入图片描述
Elasticsearch 提供了 x-pack-sql-jdbc,让对 Elasticsearch 的查询也可以像使用 MySQL 数据库一样通过 MyBatis 进行查询。但这个 x-pack-sql-jdbc 是付费的,免费可以使用 30 天。之后你可以选择使用重新安装,破解,或者使用 Elasticsearch 的查询方式。还可以自己开发一个 Elasticsearch JDBC,GitHub 上也有类似的组件。
使用时需要引入 POM 配置;

<!-- https://mvnrepository.com/artifact/org.elasticsearch.plugin/x-pack-sql-jdbc -->
<dependency>
    <groupId>org.elasticsearch.plugin</groupId>
    <artifactId>x-pack-sql-jdbc</artifactId>
    <version>7.17.14</version>
</dependency>

工程配置

本节涉及到了简明教程中所讲解的 Sharding 分库分表 (opens new window)的使用,因为我们需要把分库分表的数据通过 canal 同步到 Elasticsearch。(也可以使用其他分库分表组件)
如上的 Sharding 分库分表链接点我
在这里插入图片描述
创建数据源

package com.lm.dev.tech.config;

import org.apache.ibatis.session.SqlSessionFactory;
import org.elasticsearch.xpack.sql.jdbc.EsDataSource;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;

import javax.sql.DataSource;

@Configuration
public class DataSourceConfig {

    @Configuration
    @MapperScan(basePackages = "com.lm.dev.tech.infrastructure.dao.elasticsearch", sqlSessionFactoryRef = "elasticsearchSqlSessionFactory")
    static class ElasticsearchMyBatisConfig {

        @Bean("elasticsearchDataSource")
        @ConfigurationProperties(prefix = "spring.elasticsearch.datasource")
        public DataSource igniteDataSource(Environment environment) {
            return new EsDataSource();
        }

        @Bean("elasticsearchSqlSessionFactory")
        public SqlSessionFactory elasticsearchSqlSessionFactory(DataSource elasticsearchDataSource) throws Exception {
            SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
            factoryBean.setDataSource(elasticsearchDataSource);
            factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/elasticsearch/*.xml"));
            return factoryBean.getObject();
        }
    }

    @Configuration
    @MapperScan(basePackages = "com.lm.dev.tech.infrastructure.dao.mysql", sqlSessionFactoryRef = "mysqlSqlSessionFactory")
    static class MysqlMyBatisConfig {

        @Bean("mysqlDataSource")
        @ConfigurationProperties(prefix = "spring.mysql.datasource")
        public DataSource mysqlDataSource(Environment environment) {
            return DataSourceBuilder.create()
                    .url(environment.getProperty("spring.mysql.datasource.url"))
                    .driverClassName(environment.getProperty("spring.mysql.datasource.driver-class-name"))
                    .build();
        }

        @Bean("mysqlSqlSessionFactory")
        public SqlSessionFactory mysqlSqlSessionFactory(DataSource mysqlDataSource) throws Exception {
            SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
            factoryBean.setDataSource(mysqlDataSource);
            factoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:/mybatis/mapper/mysql/*.xml"));
            return factoryBean.getObject();
        }
    }

}

ElasticsearchMyBatisConfig 使用 EsDataSource 创建数据源和映射 MyBatis Mapper 文件。
MysqlMyBatisConfig 使用 DataSourceBuilder 创建 Sharding 提供的数据源和映射 MyBatis Mapper 文件。
Mapper 映射

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.lm.dev.tech.infrastructure.dao.elasticsearch.IElasticsearchUserOrderDao">

    <resultMap id="dataMap" type="com.lm.dev.tech.infrastructure.po.UserOrderPO">
        <result column="_user_id" property="userId"/>
        <result column="_user_name" property="userName"/>
        <result column="_order_id" property="orderId"/>
        <result column="_sku_name" property="skuName"/>
        <result column="_update_time" property="updateTime"/>
        <result column="_create_time" property="createTime"/>
    </resultMap>

    <select id="selectByUserId" resultMap="dataMap">
        select _user_id, _user_name, _order_id, _sku_name
        from "dev_tech.user_order"
        order by _update_time
        limit 10
    </select>

</mapper>

这个是 Elasticsearch 映射的 Mapper 文件,映射的字段就是前面安装环境的时候设置的索引和字段。现在你使用 Elasticsearch 就不用在工程中硬编码查询语句了,变得非常方便。

工程测试
写入数据

@Test
    public void test_insert() throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            UserOrderPO userOrderPO = UserOrderPO.builder()
                    .userName("小明哥")
                    .userId("" + RandomStringUtils.randomAlphabetic(6))
                    .userMobile("+86 13521408***")
                    .sku("13811216")
                    .skuName("《学习canal》")
                    .orderId(RandomStringUtils.randomNumeric(11))
                    .quantity(1)
                    .unitPrice(BigDecimal.valueOf(128))
                    .discountAmount(BigDecimal.valueOf(50))
                    .tax(BigDecimal.ZERO)
                    .totalAmount(BigDecimal.valueOf(78))
                    .orderDate(new Date())
                    .orderStatus(0)
                    .isDelete(0)
                    .uuid(UUID.randomUUID().toString().replace("-", ""))
                    .ipv4("127.0.0.1")
                    .ipv6("2001:0db8:85a3:0000:0000:8a2e:0370:7334".getBytes())
                    .extData("{\"device\": {\"machine\": \"IPhone 14 Pro\", \"location\": \"shanghai\"}}")
                    .build();

            userOrderDao.insert(userOrderPO);

            Thread.sleep(100);
        }
    }

循环插入3条数据,按需你可以设置更多条数据。
这里的用户编号 userId 是随机的,也是切分键的 ID,所以会在不同的库表写入数据
数据验证
MySQL:http://127.0.0.1:8899/ (opens new window)docker compose 配置的管理后台,可以 root/123456 登录

Kibana:http://127.0.0.1:5601/app/discover (opens new window)查询写入的数据。

查询数据

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class UserOrderDaoTest {

    @Resource
    private IElasticsearchUserOrderDao userOrderDao;

    @Test
    public void test() {
        List<UserOrderPO> userOrderPOS = userOrderDao.selectByUserId();
        log.info("测试结果:{}", JSON.toJSONString(userOrderPOS));
    }

}

好了 至此 通过MySQL binlog日志,使用canal同步分库分表数据,到 Elasticsearch 点点关注不迷路 老铁们!!!!!


http://www.kler.cn/a/457483.html

相关文章:

  • Redission红锁
  • 在 SQL 中,区分 聚合列 和 非聚合列(nonaggregated column)
  • Kafka 快速实战及基本原理详解解析-01
  • 在Typora中实现自动编号
  • 嵌入式学习(21)-正点原子脱机下载器Mini-Pro的使用
  • AWS K8s 部署架构
  • 大数据技术-Hadoop(一)Hadoop集群的安装与配置
  • 【每日学点鸿蒙知识】navigation跳转异常、默认加载移动端版本网页、Menu位置、View生成图片保存相册、H5原生交互
  • 2024/12/29 黄冈师范学院计算机学院网络工程《路由期末复习作业一》
  • Linux day 1129
  • java高频面试之SE-05
  • 关于ESD(静电放电)等级的划分
  • .net8使用log4.net并自定义日志表的字段
  • Django管理界面自定义操作重启ECS服务
  • 业务智能化的关键:探索事件驱动的业务规则模型
  • 网络的类型
  • 面试场景题系列:设计键值存储系统
  • 在Bash Shell脚本中创建和使用变量
  • 如何正确使用Jmeter
  • vue2使用pdfjs-dist和jsPDF生成pdf文件
  • 多显卡服务器如何设置使用集成显卡输出信号?
  • 从零开始开发纯血鸿蒙应用之UI封装
  • HarmonyOS NEXT应用开发实战:一分钟写一个网络接口,JsonFormat插件推荐
  • java开发配置文件集合
  • E6 中的 扩展运算符(Spread) 和 剩余运算符(Rest)
  • 游戏陪玩系统:国际版JAVA游戏陪玩系统源码陪练APP源码H5源码电竞系统源码支持Android+IOS+H5