当前位置: 首页 > article >正文

Flink 中 JDBC Connector 使用详解

1. 背景

在实时计算或离线任务中,往往需要与关系型数据库交互,例如 MySQL、PostgreSQL 等。Apache Flink 提供了 JDBC Connector,可以方便地将流式数据写入或读取数据库。

本文将介绍 Flink JDBC Connector 的基础用法、配置方法以及注意事项,帮助开发者更好地集成数据库操作。


2. JDBC Connector 的基础概念

JDBC Connector 是 Flink 官方提供的一个用于连接关系型数据库的工具包,支持:

  • Source:从数据库读取数据。
  • Sink:将数据写入数据库。

使用 JDBC Connector 可以实现对数据库的实时写入,也可以用作批量操作的工具。


3. Maven 依赖

在项目中添加 Flink JDBC 依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.17.0</version> <!-- 根据实际使用的 Flink 版本调整 -->
</dependency>

如果使用 MySQL 数据库,还需添加 MySQL 驱动:

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version> <!-- MySQL 驱动版本 -->
</dependency>

4. JDBC Connector 的使用

4.1 写入数据库(Sink)

以下是一个将流式数据写入 MySQL 的示例:

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.jdbc.JdbcSink;

public class JdbcSinkExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 模拟输入数据
        env.fromElements(
                Tuple2.of(1, "Alice"),
                Tuple2.of(2, "Bob"),
                Tuple2.of(3, "Charlie")
        )
        .addSink(JdbcSink.sink(
                "INSERT INTO users (id, name) VALUES (?, ?)", // SQL 语句
                (ps, t) -> {
                    ps.setInt(1, t.f0);  // 设置第一个参数为 ID
                    ps.setString(2, t.f1);  // 设置第二个参数为 Name
                },
                JdbcSink.DefaultJdbcExecutionOptions.builder()
                        .withBatchSize(100) // 批量写入大小
                        .build(),
                () -> JdbcSink.defaultJdbcConnectionProvider(
                        "jdbc:mysql://localhost:3306/testdb", // 数据库 URL
                        "root",  // 用户名
                        "password" // 密码
                )
        ));

        env.execute("Flink JDBC Sink Example");
    }
}
关键点解析
  1. SQL 语句:支持动态参数 ? 占位符,适合批量插入。
  2. 参数绑定:通过 Lambda 表达式绑定输入数据与 SQL 参数。
  3. 批量写入:通过 JdbcExecutionOptions 配置批量写入策略。

4.2 从数据库读取数据(Source)

以下是一个从 MySQL 读取数据并打印的示例:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;

public class JdbcSourceExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<Integer, String>> sourceStream = env.createInput(
            JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver") // JDBC 驱动
                .setDBUrl("jdbc:mysql://localhost:3306/testdb") // 数据库 URL
                .setUsername("root") // 用户名
                .setPassword("password") // 密码
                .setQuery("SELECT id, name FROM users") // SQL 查询
                .setRowTypeInfo(Types.TUPLE(Types.INT, Types.STRING)) // 结果类型
                .finish()
        );

        sourceStream.print();

        env.execute("Flink JDBC Source Example");
    }
}
关键点解析
  1. SQL 查询:需要提供完整的查询语句。
  2. 结果类型:通过 RowTypeInfo 显式定义数据库返回的数据结构。

5. JDBC Connector 的配置选项

5.1 批量写入配置

通过 JdbcExecutionOptions 可调整写入策略:

  • withBatchSize(int):设置批量写入大小(默认为 500)。
  • withBatchIntervalMs(long):设置批量写入的时间间隔。
  • withMaxRetries(int):设置写入失败后的最大重试次数。

5.2 数据库连接池

Flink JDBC Connector 默认使用单个连接执行操作。对于高并发需求,可以结合 HikariCP 等连接池框架优化性能。


6. 注意事项

  1. 事务支持

    • 默认情况下,JDBC Sink 使用批量提交,未显式开启事务。如果需要事务一致性,可以通过 JDBC 驱动自行管理事务。
  2. 数据库性能瓶颈

    • 数据库可能成为瓶颈,建议使用批量写入和合适的索引优化性能。
    • 高写入场景可考虑切换到 Kafka、HBase 等专为实时写入设计的存储系统。
  3. 错误处理

    • 可通过 withMaxRetries 设置重试次数。
    • 对于未能成功写入的数据,可考虑使用侧输出流保存以供后续处理。
  4. 分布式读取

    • 默认情况下,Flink JDBC Source 在单线程上运行,性能可能有限。可以使用分片或其他工具提升读取性能。

7. 总结

Flink JDBC Connector 是一个简单而高效的工具,适用于实时计算场景下与关系型数据库的交互。无论是数据写入还是读取,都可以通过简单配置快速实现。但对于高并发和大规模数据场景,需要根据业务需求调整策略。


http://www.kler.cn/a/412439.html

相关文章:

  • 如何构建SAAS项目
  • 【接口封装】——7、连接并使用 MySQL 数据库
  • SQL for XML
  • 常用的数据结构
  • 深入理解注意力机制(Attention Mechanism)
  • 3174、清除数字
  • WPF ItemsControl控件
  • 【深度学习|目标跟踪】StrongSort 详解(以及StrongSort++)
  • 浏览器缓存与协商缓存
  • 深入理解HTML基本结构:构建现代网页的基石
  • CSDN设置成黑色背景(谷歌 Edge)
  • 手机实时提取SIM卡打电话的信令声音-智能拨号器的双SIM卡切换方案
  • JS-对象-05-DOM
  • vue页面成绩案例(for渲染表格/删除/添加/统计总分/平均分/不及格显红色/输入内容去首尾空格trim/输入内容转数字number)
  • <<WTF-Solidity>>学习笔记(part 9-12)
  • 减速电机的减速比是什么意思?
  • 软件测试丨Pytest 第三方插件与 Hook 函数
  • ffmpeg 预设的值 加速
  • git源码安装
  • 集合卡尔曼滤波(EnKF)的三维滤波(模拟平面定位)例程,带逐行注释
  • Docker容器运行CentOS镜像,执行yum命令提示“Failed to set locale, defaulting to C.UTF-8”
  • mysql window安装(学习使用)
  • 数据库编程(sqlite3)
  • 深度神经网络模型压缩学习笔记二:离线量化算法和工具、实现原理和细节
  • QT入门详解,创建QT应用
  • qsort函数详解+代码展示