当前位置: 首页 > article >正文

SpringBoot 实战:SpringBoot整合Flink CDC,实时追踪mysql数据变动

引言

Flink CDC(Flink Change Data Capture)即 Flink 的变更数据捕获技术,是一种基于数据库日志的CDC技术,它实现了一个全增量一体化的数据集成框架。与Flink计算框架相结合,Flink CDC能够高效地实现海量数据的实时集成。其核心功能在于实时监视数据库或数据流中的数据变动,并将这些变动抽取出来,以便进行进一步的处理和分析。借助Flink CDC,用户可以轻松地构建实时数据管道,实时响应和处理数据变动,为实时分析、实时报表和实时决策等场景提供有力支持。

一、MySQL开启Binlog

MySQL开启 BinLong 功能,需要在配置文件中 修改 [mysql]的相关参数(lunux 中 /etc/my.cnf 文件 或windows 的 /my.ini 文件)

[mysqld]
server-id=1
# 设置日志格式为行级格式
binlog-format=Row
# 设置binlog日志文件的前缀
log-bin=mysql-bin
# 指定需要记录二进制日志的数据库
binlog_do_db=testjpa

开启Binlog 后,在需要 为 Flink CDC 配置响应的权限,使其能够正常链接到 MySQL数据库,包括授权 Flink CDC 链接 数据库的用户权限。

mysql> SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+

二、代码示例

创建Spring Boot项目添加依赖

<dependencies>
    <!-- Flink dependency -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.14.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- Spring Boot dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
</dependencies>

配置Flink和MySQL CDC

flink:
  checkpoint:
    interval: 10000
  parallelism: 1

spring:
  datasource:
    url: jdbc:mysql://localhost:3306/your_database
    username: your_username
    password: your_password

创建服务类实现实时跟踪

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.springframework.stereotype.Service;

@Service
public class FlinkCdcService {

    public void startDataStreaming() {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 使用Flink CDC连接MySQL
        String name = "inventory";
        tableEnv.executeSql("CREATE TABLE " + name + " (" +
            " id INT," +
            " name STRING," +
            " description STRING," +
            " weight DECIMAL(10, 3)" +
            ") WITH (" +
            " 'connector' = 'mysql-cdc'," +
            " 'hostname' = 'localhost'," +
            " 'port' = '3306'," +
            " 'username' = 'your_username'," +
            " 'password' = 'your_password'," +
            " 'database-name' = 'your_database'," +
            " 'table-name' = 'your_table'" +
            ")");

        // 查询并打印结果
        DataStream<String> dataStream = tableEnv.sqlQuery("SELECT * FROM " + name).execute().print();

        try {
            env.execute("Flink CDC Demo");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

配置 SpringBoot 启动类

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class FlinkCdcApplication implements CommandLineRunner {

    @Autowired
    private FlinkCdcService flinkCdcService;

    public static void main(String[] args) {
        SpringApplication.run(FlinkCdcApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        flinkCdcService.startDataStreaming();
    }
}

 

 


http://www.kler.cn/news/292679.html

相关文章:

  • Java简单实现服务器客户端通信
  • 0to1使用JWT实现登录认证
  • ubuntu24下安装pytorch3d
  • ARM 伪指令 (26)
  • 2024年高教社杯数学建模国赛赛题浅析——助攻快速选题
  • 【系统架构设计师】论文:论SOA在企业集成架构设计中的应用
  • 【Rust光年纪】探索Rust嵌入式开发利器:从硬件访问到USB绑定
  • Ubuntu设置
  • Spring Boot实现License生成和校验
  • Java中的Stream流
  • wsl2 安装qt5
  • gdb使用
  • 【运维监控】influxdb 2.0+telegraf 监控tomcat 8.5运行情况(2)
  • 使用文件系统管理硬件设备
  • 深入探索 Go 语言的编译器与垃圾回收机制
  • 深圳又有5家企业高新企业资质被取消?
  • Redis在服务器启动的日志问题
  • RestTemplateRibbonOpenFeign
  • Java 可变参数
  • Canvas 在 微信小程序-uni-APP 和 H5 中的使用差异
  • JavaWeb - Mybatis - 动态SQL
  • STM32 系列MCU 开发利器 STM32CubeIDE
  • webview无法加载http流量及Expo修改Android权限
  • MyBatis 一级缓存原理
  • 启动spring boot项目时,第三方jar包扫描不到的问题
  • 异步编程学习
  • Java项目:142 基于springboot的实习管理系统
  • flutter Image
  • 浏览器百科:网页存储篇-如何在Chrome中打开Cookie(二)
  • Stirling-PDF:基于Web的开源PDF处理工具