当前位置: 首页 > article >正文

芝法酱学习笔记(2.6)——flink-cdc监听mysql binlog并同步数据至elastic-search和更新redis缓存

一、需求背景

在有的项目中,尤其是进销存类的saas软件,一开始为了快速把产品做出来,并没有考虑缓存问题。而这类软件,有着复杂的业务逻辑。如果想在原先的代码中,添加redis缓存,改动面将非常大,还需要大量的测试工作。有些时候会有更离谱的情况,比如一些一些项目可能用JDK1.6写的,想要在这个框架下接入redis缓存,也会变得十分困难。
这时我们就会想到,能否像mysql的主从复制一样,监听mysql的binlog,对数据进行更新呢?Flink CDC就呼之欲出。

二、mysql环境搭建

需要注意的是,当前的flink-cdc,仅仅支持mysql8.0,8.4是完全不支持的。
由于我的mysql装的是8.4,为了方便起见,我们使用docker安装mysql8.0

2.1 docker-compose.yml

services:
  master:
    image: mysql:8.0.41
    container_name: mysql-8
    restart: always
    #mem_limit: 512M
    environment:
      MYSQL_ROOT_PASSWORD: study@2025
      TZ: Asia/Shanghai
    ports:
      - "3307:3306"
    volumes:
      - ./cfg/my.cnf:/etc/my.cnf
      - ./data:/var/lib/mysql
      - ./initdb:/docker-entrypoint-initdb.d
      - ./dump:/var/dump
      - ./log:/var/log
    networks:
      - mysql-cluster
networks:
  mysql-cluster:

2.2 初始化sql

-- 创建复制用户

create role role_app;
GRANT SELECT,UPDATE,INSERT,DELETE ON *.* to role_app;
GRANT REPLICATION SLAVE,REPLICATION CLIENT ON *.* TO role_app;

CREATE USER 'app'@'%' IDENTIFIED WITH caching_sha2_password by 'study@2025' DEFAULT ROLE role_app COMMENT 'app user';


FLUSH PRIVILEGES;

-- 创建两个数据库,用于测试
CREATE SCHEMA `shop-center`;
FLUSH TABLES WITH READ LOCK;

2.3 注意点

首先把容器卷 - ./cfg/my.cnf:/etc/my.cnf的这一句注释掉,启动服务
而后使用下面语句,把配置文件粘出来

docker exec <id> cp /etc/my.cnf ./cfg/my.cnf

之后把注释打开,再重新启动

三、工程搭建与pom引用

3.1 主模块pom引用

flink程序不需要接入Spring框架,直接一个main就可运行。
但我们还想使用一些我们熟悉的接口,来操作redis和el。

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.20.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime</artifactId>
            <version>1.20.0</version>
        </dependency>
        
		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>3.3.0</version>
        </dependency>	

3.2 common-data模块

一些entity数据,为了保持各模块共通,最好独立到一个common模块。
同时,我还会把redis和el-search的操作,在这个模块接入并封装

3.2.1 pom引用

<dependencies>

        <dependency>
            <groupId>org.yaml</groupId>
            <artifactId>snakeyaml</artifactId>
            <version>2.3</version>
        </dependency>

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch</groupId>
            <artifactId>elasticsearch-x-content</artifactId>
            <version>8.17.0</version>
        </dependency>

        <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-core</artifactId>
            <version>5.8.32</version>
        </dependency>


        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-redis</artifactId>
            <version>3.4.2</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2-extension-spring6</artifactId>
            <version>2.0.54</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.12.1</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.54</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>6.4.2.RELEASE</version>
        </dependency>

        <!-- Flink Redis Connector -->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.bahir</groupId>-->
        <!--            <artifactId>flink-connector-redis_2.12</artifactId>-->
        <!--            <version>1.1.0</version>-->
        <!--        </dependency>-->

    </dependencies>

3.2.2 一些基本的entity类

@Data
public class GenItemEntity{
    Long id;
    String name;
    Long price;
    String brand;
    String specification;
    Integer version;
}

四、 redis操作和elsearch操作的封装

4.1 redis操作的封装

在pom上,接入spring-data-redis
而后,我们可以使用我们熟悉的RedisTemplate来操作redis

public class RedisConfig {

    public RedisConfig(){
        init();
    }

    protected FastJsonConfig redisFastJson(){
        FastJsonConfig config = new FastJsonConfig();
        config.setWriterFeatures(
                JSONWriter.Feature.WriteNullListAsEmpty,
                // 写入类名
                JSONWriter.Feature.WriteClassName,
                // 将 Boolean 类型的 null 转成 false
                JSONWriter.Feature.WriteNullBooleanAsFalse,
                JSONWriter.Feature.WriteEnumsUsingName);
        config.setReaderFeatures(
                JSONReader.Feature.SupportClassForName,
                // 支持autoType
                JSONReader.Feature.SupportAutoType);
        return config;
    }

    protected FastJsonRedisSerializer fastJsonRedisSerializer(FastJsonConfig pFastJsonConfig) {
        FastJsonRedisSerializer fastJsonRedisSerializer = new FastJsonRedisSerializer(Object.class);
        fastJsonRedisSerializer.setFastJsonConfig(pFastJsonConfig);
        return fastJsonRedisSerializer;
    }

    protected RedisConnectionFactory redisConnectionFactory(){
    	// 这里最好读配置,我懒得搞了
        RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration("192.168.0.64",6379);
        redisConfiguration.setPassword("study@2025");

        GenericObjectPoolConfig<?> poolConfig = new GenericObjectPoolConfig<>();
        poolConfig.setMaxTotal(2);  // 最大连接数
        poolConfig.setMaxIdle(2);    // 最大空闲连接数
        poolConfig.setMinIdle(2);    // 最小空闲连接数
        poolConfig.setMaxWait(Duration.ofMillis(3000)); // 连接等待时间

        ClientResources clientResources = DefaultClientResources.create();

        LettucePoolingClientConfiguration lettucePoolingClientConfiguration = LettucePoolingClientConfiguration.builder()
                .poolConfig(poolConfig)
                .build();

        LettucePoolingClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                .clientResources(clientResources)
                .commandTimeout(Duration.ofSeconds(5))
                .poolConfig(poolConfig)
                .build();

        LettuceConnectionFactory redisConnectionFactory = new LettuceConnectionFactory(redisConfiguration,lettucePoolingClientConfiguration);
        redisConnectionFactory.afterPropertiesSet(); // 初始化连接工厂

        return redisConnectionFactory;
    }


    protected RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory, FastJsonRedisSerializer pFastJsonRedisSerializer) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<String, Object>();
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setEnableTransactionSupport(true);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(pFastJsonRedisSerializer);
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(pFastJsonRedisSerializer);
        return redisTemplate;
    }

    protected void init(){
        mFastJsonConfig = redisFastJson();
        mFastJsonRedisSerializer = fastJsonRedisSerializer(mFastJsonConfig);
        mRedisConnectionFactory = redisConnectionFactory();
        mRedisTemplate = redisTemplate(mRedisConnectionFactory,mFastJsonRedisSerializer);
        mRedisTemplate.afterPropertiesSet();
    }

    private FastJsonConfig mFastJsonConfig;
    private FastJsonRedisSerializer mFastJsonRedisSerializer;
    private RedisConnectionFactory mRedisConnectionFactory;
    private RedisTemplate<String, Object> mRedisTemplate;

    public static RedisTemplate<String, Object> redisTemplate(){
        return Holder.INSTANCE.mRedisTemplate;
    }

    public static <T> String serialize(T entity){
        return JSON.toJSONString(entity,Holder.INSTANCE.mFastJsonConfig.getWriterFeatures());
    }

    private static class Holder {
        private static final RedisConfig INSTANCE = new RedisConfig();
    }

}

4.2 elasticsearch操作的封装

由于el-search的连接器,需要配置apikey,以及https,我们最好使用yml配置,并且把http_ca.crt放进该模块的resouce中。
在IDEA环境下,有可能找不到子模块的资源,这时在主模块引入子模块时,只需要这样配置即可:

        <dependency>
            <groupId>indi.zhifa.study2025</groupId>
            <artifactId>common-data</artifactId>
            <version>${project.version}</version>
            <scope>compile</scope>
        </dependency>

注意,重点是<scope>compile</scope>

public class EsClientConfig {

    @Setter
    @Getter
    private String host;

    @Setter
    @Getter
    private Integer port;

    @Setter
    @Getter
    private String apiKey;

}
public class ElasticSearchClientProvider {

    private EsClientConfig esClientConfig;

    private RestClientBuilder builder;

    public ElasticSearchClientProvider() {
        try{
            init();
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    public void init() throws IOException {

        Yaml yaml = new Yaml();
        try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("el-config.yml")) {
            if (inputStream == null) {
                throw new IllegalArgumentException("File not found: el-config.yml");
            }
            esClientConfig = yaml.loadAs(inputStream, EsClientConfig.class);
        } catch (Exception e) {
            throw new RuntimeException("Failed to load YAML file", e);
        }

        SSLContext sslContext;
        try (InputStream inputStream = FileUtil.class.getClassLoader().getResourceAsStream("http_ca.crt")){
            sslContext = TransportUtils.sslContextFromHttpCaCrt(inputStream);
        }catch (Exception e) {
            throw new RuntimeException("Failed to load http_ca.crt", e);
        }

        builder = RestClient.builder(
                        new HttpHost(esClientConfig.getHost(), esClientConfig.getPort(), "https") // 替换为你的Elasticsearch地址
                ).setDefaultHeaders(new Header[]{
                        new BasicHeader("Authorization", "ApiKey " + esClientConfig.getApiKey())
                })
                .setFailureListener(new RestClient.FailureListener(){
                    @Override
                    public void onFailure(Node node) {
                        super.onFailure(node);
                    }
                }).setHttpClientConfigCallback(hc->
                   hc.setSSLContext(sslContext)
                );
    }


    public ElasticsearchClient get(){
        RestClient restClient = builder.build();
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());
        ElasticsearchClient esClient = new ElasticsearchClient(transport);
        return esClient;
    }

    public static ElasticSearchClientProvider getInstance(){
        return Holder.INSTANCE;
    }

    private static class Holder {
        private static final ElasticSearchClientProvider INSTANCE = new ElasticSearchClientProvider();
    }

}

五、 redis和elsearch的自定义sink编写

5.1 redis的sink编写

我们希望传入redis时,数据是被处理好的,redis的sink不需要处理任何逻辑,只管更新缓存和删除缓存。

5.1.1 RedisSinkCommand

public class RedisSinkCommand<T> {
    @Setter
    @Getter
    protected ERedisCommand command;

    @Setter
    @Getter
    protected long dua;

    @Setter
    @Getter
    protected  String key;
    @Setter
    @Getter
    protected  T value;


    public void initSet(String pKey, T pValue) {
        command = ERedisCommand.SET;
        dua = 300;
        key = pKey;
        value = pValue;
    }

    public void initDel(String pKey) {
        command = ERedisCommand.DEL;
        key = pKey;
    }

}
public enum ERedisCommand {
    SET,
    DEL
}

5.1.2 SpringDataRedisSink

@Slf4j
public class SpringDataRedisSink<T> implements Sink<RedisSinkCommand<T>> {

    @Override
    public SinkWriter<RedisSinkCommand<T>> createWriter(InitContext context) throws IOException {
        return null;
    }

    @Override
    public SinkWriter<RedisSinkCommand<T>> createWriter(WriterInitContext context){
        return new LettuceRedisSinkWriter();
    }

    class LettuceRedisSinkWriter implements SinkWriter<RedisSinkCommand<T>> {

        @Override
        public void write(RedisSinkCommand<T> pCmd, Context context) throws IOException, InterruptedException {
            RedisTemplate<String, Object> redisTemplate = RedisConfig.redisTemplate();
            switch (pCmd.getCommand()){
                case SET-> {
                    redisTemplate.opsForValue().set(pCmd.getKey(),pCmd.getValue(),pCmd.getDua());
                }
                case DEL -> {
                    redisTemplate.delete(pCmd.getKey());
                }
            }
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {

        }

        @Override
        public void close() throws Exception {

        }
    }

}

5.2 elasticsearch的sink编写

elasticsearch的sink与redis的要求一致,在sink中不关心业务逻辑

5.2.1 ElCommand

@Data
public class ElCommand<T> {
    protected EElCommand command;
    protected String index;
    protected T entity;
    protected String id;
}
public enum EElCommand {
    CREATE,UPDATE,DELETE
}

5.2.2 ElSearchSink

public class ElSearchSink<T> implements Sink<ElCommand<T>> {
    @Override
    public SinkWriter<ElCommand<T>> createWriter(InitContext context) throws IOException {
        return null;
    }

    @Override
    public SinkWriter<ElCommand<T>> createWriter(WriterInitContext context){
        return new ElSearchSink.ElSearchSinkWriter();
    }

    class ElSearchSinkWriter implements SinkWriter<ElCommand<T>> {

        @Override
        public void write(ElCommand<T> pCmd, Context context) throws IOException, InterruptedException {
            ElasticSearchClientProvider elasticSearchClientProvider = ElasticSearchClientProvider.getInstance();
            ElasticsearchClient elClient =  elasticSearchClientProvider.get();
            String index = pCmd.getIndex();
            String id = pCmd.getId();
            T entity = pCmd.getEntity();

            switch (pCmd.getCommand()){
                case CREATE,UPDATE -> {
                    elClient.index(i->i.index(index).id(id).document(entity));
                }
                case DELETE -> {
                    elClient.delete(d->d.index(index).id(id));
                }
            }
        }

        @Override
        public void flush(boolean endOfInput) throws IOException, InterruptedException {

        }

        @Override
        public void close() throws Exception {

        }
    }
}

六、主函数编写

public class FlinkMain {

    public static void main(String[] args) throws Exception {
        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.0.64")
                .port(3307)
                .databaseList("shop-center") // set captured database
                .tableList("shop-center.item") // set captured table
                .username("app")
                .password("study@2025")
                .serverTimeZone("Asia/Shanghai")
                .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
                .startupOptions(StartupOptions.latest())
                .includeSchemaChanges(true)
                .build();

//        FlinkJedisPoolConfig jedisConfig = new FlinkJedisPoolConfig.Builder()
//                .setHost("192.168.0.64") // 替换为 Redis 主机
//                .setPort(6379) // Redis 端口
//                .setPassword("ilv0404@1314") // 如果有密码,设置密码
//                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

//        DataStream<BinlogInfo> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source")
//                .map(str->{
//                    BinlogInfo res =JSONObject.parseObject(str, BinlogInfo.class);
//                    return res;
//                    }
//                 ).filter(bi->bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d"));
//
//        mysqlStream.addSink(new RedisSink(jedisConfig,new RedisItemMapper()));

        DataStream<RedisSinkCommand<GenItemEntity>> newMysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to redis")
                        .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {
                        }), TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
                .filter(bi->bi.getSource().getTable().equals("item") &&  (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
                .map(bi->{
                    String op = bi.getOp();
                    GenItemEntity itemEntity = bi.getAfter();
                    String key = "item:"+itemEntity.getId();
                    switch (op){
                        case "c","u"->{
                            RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
                            redisSinkCommand.initSet(key,itemEntity);
                            return redisSinkCommand;
                        }
                        case "d" ->{
                            RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
                            redisSinkCommand.initDel(key);
                            return redisSinkCommand;
                        }
                        default -> {
                            RedisSinkCommand<GenItemEntity> redisSinkCommand = new RedisSinkCommand();
                            redisSinkCommand.initDel(key);
                            return redisSinkCommand;
                        }
                    }
                },TypeInformation.of(new TypeHint<RedisSinkCommand<GenItemEntity>>() {}));

        newMysqlStream.sinkTo(new SpringDataRedisSink<GenItemEntity>());

        DataStream<ElCommand<GenItemEntity>> mySqlToElStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(),"Mysql source to el")
                .map(str->JSONObject.parseObject(str, new TypeReference<BinlogInfo<GenItemEntity>>() {})
                        , TypeInformation.of(new TypeHint<BinlogInfo<GenItemEntity>>() {}))
        .filter(bi->bi.getSource().getTable().equals("item") &&  (bi.getOp().equals("c")||bi.getOp().equals("u")||bi.getOp().equals("d")))
                .map(bi->{
                    ElCommand elCommand = new ElCommand();
                    GenItemEntity itemEntity = bi.getAfter();
                    elCommand.setId(itemEntity.getId().toString());
                    elCommand.setEntity(itemEntity);
                    elCommand.setIndex("item_npc");
                    String op = bi.getOp();
                    switch (op){
                        case "c"->elCommand.setCommand(EElCommand.CREATE);
                        case "u"->elCommand.setCommand(EElCommand.UPDATE);
                        case "d"->elCommand.setCommand(EElCommand.DELETE);
                    }
                    return elCommand;
                },TypeInformation.of(new TypeHint<ElCommand<GenItemEntity>>() {}));
        mySqlToElStream.sinkTo(new ElSearchSink());
        env.execute();
    }
}

七、代码展示

请道友移步码云

八、相关实践的思考

8.1 redis相关

我这里的代码,仅仅是学习用的。在真实项目中,redis缓存的更新,通常源于查询时,如果发现缓存中没有数据,则查mysql,并把缓存数据加入redis。如果监听到表数据的更改或删除,则直接删除相应缓存,等待查询时重新加入缓存。当然,这样做在同一数据并发访问时,会有重复设置缓存的可能性,我们把这种现象叫缓存穿透。可以在更新缓存前,用redisson加个锁,防止重复读取mysql并更新redis。

public class CacheService {
    @Autowired
    private RedissonClient redissonClient;
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
    @Autowired
    private DataRepository dataRepository;

    public Object getData(String key) {
        // 第一次检查缓存
        Object value = redisTemplate.opsForValue().get(key);
        if (value != null) {
            return value;
        }

        RLock lock = redissonClient.getLock(key + ":LOCK");
        try {
            // 尝试加锁,设置锁超时时间防止死锁
            if (lock.tryLock(5, 30, TimeUnit.SECONDS)) {
                try {
                    // 双重检查缓存
                    value = redisTemplate.opsForValue().get(key);
                    if (value != null) {
                        return value;
                    }
                    // 查询数据库
                    Object dbData = dataRepository.findById(key);
                    // 更新缓存,设置合理过期时间
                    redisTemplate.opsForValue().set(key, dbData, 1, TimeUnit.HOURS);
                    return dbData;
                } finally {
                    lock.unlock();
                }
            } else {
                // 未获取到锁,短暂等待后重试
                Thread.sleep(100);
                return redisTemplate.opsForValue().get(key);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("获取锁失败", e);
        }
    }
}

8.2 es相关

对于es,其实更新数据不建议采用这种方式。因为es中需要反范式设计,不可能用1张表的数据做es查询数据的。
对于电商系统的商品查询,我们可以在商品上架的时候更新es。并且商品商家状态下,不允许修改商品。商品下架时,删除es的数据。想要修改商品数据,可以先下架,再修改,而后上架。


http://www.kler.cn/a/532372.html

相关文章:

  • Codeforces Round 1002 (Div. 2)(部分题解)
  • Python网络自动化运维---批量登录设备
  • 25寒假算法刷题 | Day1 | LeetCode 240. 搜索二维矩阵 II,148. 排序链表
  • 嵌入式知识点总结 操作系统 专题提升(四)-上下文
  • deepseek 本地化部署和小模型微调
  • c++可变参数详解
  • BGP路径属性
  • 将音频mp3文件添加背景音乐
  • Python迭代器:解密数据遍历的核心机制
  • Ajax:重塑Web交互体验的人性化探索
  • 解析PHP文件路径相关常量
  • Unity飞行代码 超仿真 保姆级教程
  • 数据分析师使用Kutools for Excel 插件
  • C++资源管理
  • Android开发EventBus
  • C_数据结构(队列) —— 队列的初始化、入队列队尾、队列判空、出队列队头、取队头队尾数据、队列有效元素个数、销毁队列
  • JS中document获取元素方法【内涵案例】
  • Paimon写入性能
  • 读写锁: ReentrantReadWriteLock
  • 【C++STL标准模板库】二、STL三大组件
  • 数据结构与算法——二分查找
  • e2studio开发RA4M2(6)----GPIO外部中断(IRQ)配置
  • 机器学习中的关键概念:通过SKlearn的MNIST实验深入理解
  • Linux+Docer 容器化部署之 Shell 语法入门篇 【Shell 替代】
  • 神经网络常见激活函数-sigmoid函数
  • deepseek接入pycharm 进行AI编程