当前位置: 首页 > article >正文

Flink 1.17.2 版本用 java 读取 starrocks

文章目录

      • 方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)
      • 方法二:使用 StarRocks Flink Connector(推荐)

  在 Flink 1.17.2 中使用 Java 读取 StarRocks 数据,可以通过 JDBC 连接器 或 StarRocks 官方提供的 Flink Connector 实现。以下是两种方法的详细步骤:

方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)

  StarRocks 兼容 MySQL 协议,可通过 Flink 的 JDBC 连接器读取数据。在 pom.xml 中添加以下依赖:

<!-- Flink JDBC 连接器 -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc</artifactId>
    <version>3.1.1-1.17</version>
</dependency>
<!-- MySQL 驱动(兼容 StarRocks) -->
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.28</version>
</dependency>

  编写 Java 代码:

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;

public class ReadStarRocksJDBC {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        TypeInformation[] fieldTypes = {
                Types.STRING,
                Types.STRING,
                Types.INT
        };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JdbcInputFormat inputFormat = JdbcInputFormat.buildJdbcInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://<starrocks-fe-host>:<port>/<database>")
                .setUsername("<username>")
                .setPassword("<password>")
                .setQuery("SELECT teacher_id, student_id, student_num FROM dwd_student = limit 10")
                .setRowTypeInfo(rowTypeInfo)
                .finish();

        DataSet<Row> dataSet = env.createInput(inputFormat);
        dataSet.print();
    }
}

  输出:

+I[teacher03, abc01, 2]
+I[teacher01, abc01, 3]

方法二:使用 StarRocks Flink Connector(推荐)

  StarRocks 提供官方 Connector,支持高效读写。在 pom.xml 中添加以下依赖:

<dependency>
    <groupId>com.starrocks.connector</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <version>1.2.9_flink-1.17</version>
</dependency>

  通过 Flink SQL 直接注册 StarRocks 表:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class ReadStarRocksSQL {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        String createTableSQL = "CREATE TABLE starrocks_table (\n" +
                "  id INT,\n" +
                "  name STRING,\n" +
                "  score INT\n" +
                ") WITH (\n" +
                "  'connector' = 'starrocks',\n" +
                "  'jdbc-url' = 'jdbc:mysql://192.168.101.xx:9030',\n" +
                "  'scan-url' = '192.168.101.xx:8030',\n" +
                "  'database-name' = 'mydatabase',\n" +
                "  'table-name' = 'table1',\n" +
                "  'username' = 'root',\n" +
                "  'password' = ''\n" +
                ")";

        tableEnv.executeSql(createTableSQL);
        tableEnv.executeSql("SELECT * FROM starrocks_table").print();
    }
}

  输出:

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |       score |
+----+-------------+--------------------------------+-------------+
| +I |           4 |                          Julia |          25 |
| +I |           2 |                           Rose |          23 |
| +I |           3 |                          Alice |          24 |
| +I |           1 |                           Lily |          23 |
+----+-------------+--------------------------------+-------------+
4 rows in set

  注:StarRocks 表 table1 结构

CREATE TABLE `table1` (
  `id` int(11) NOT NULL COMMENT "用户 ID",
  `name` varchar(65533) NULL COMMENT "用户姓名",
  `score` int(11) NOT NULL COMMENT "用户得分"
) ENGINE=OLAP 
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);

INSERT INTO mydatabase.table1(id, name, score) VALUES(3, 'Alice', 24);
INSERT INTO mydatabase.table1(id, name, score) VALUES(2, 'Rose', 23);
INSERT INTO mydatabase.table1(id, name, score) VALUES(4, 'Julia', 25);
INSERT INTO mydatabase.table1(id, name, score) VALUES(1, 'Lily', 23);

http://www.kler.cn/a/582672.html

相关文章:

  • c#如何直接获取json中的某个值
  • Java中的加盐加密:提升密码存储安全性的关键实践
  • 深度解读 | AI驱动下的新型金融对冲策略:稀疏奖励强化学习的应用
  • 17153.班级活动(java)
  • Linux-基础开发工具
  • 机器学习之超参数优化(Hyperparameter Optimization)
  • 使用 ArkUI 开发鸿蒙登录界面
  • MyBatis-Plus 注解大全
  • Vue | Vue3中为什么要用 Proxy 代替 Object.defineProperty
  • el-table 通过 slot=“header“ 自定义表头,遇到数据不更新的问题。
  • 深入解析Java虚拟机(JVM):架构、内存管理与性能优化
  • 网关与默认网关
  • KMeans实战——聚类和轮廓系数评估啤酒数据集
  • DataWhale 大语言模型 - 语言模型发展历程
  • ubuntu下在pycharm中配置已有的虚拟环境
  • 谈谈ArrayList和LinkedList的区别
  • Scala编程_数组、列表、元组、集合与映射
  • Day23 洛谷真题讲解(贪心)
  • Ubuntu 22.04使用pigz多线程快速解压/压缩文件
  • 1.2 CogPMAlignTool(模板匹配工具), CogFixtureTool(坐标系转换工具)