Flink 1.17.2 版本用 java 读取 starrocks
文章目录
- 方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)
- 方法二:使用 StarRocks Flink Connector(推荐)
在 Flink 1.17.2 中使用 Java 读取 StarRocks 数据,可以通过 JDBC 连接器 或 StarRocks 官方提供的 Flink Connector 实现。以下是两种方法的详细步骤:
方法一:使用 Flink JDBC 连接器(兼容 MySQL 协议)
StarRocks 兼容 MySQL 协议,可通过 Flink 的 JDBC 连接器读取数据。在 pom.xml 中添加以下依赖:
<!-- Flink JDBC 连接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>3.1.1-1.17</version>
</dependency>
<!-- MySQL 驱动(兼容 StarRocks) -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
编写 Java 代码:
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.jdbc.JdbcInputFormat;
import org.apache.flink.types.Row;
public class ReadStarRocksJDBC {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
TypeInformation[] fieldTypes = {
Types.STRING,
Types.STRING,
Types.INT
};
RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);
JdbcInputFormat inputFormat = JdbcInputFormat.buildJdbcInputFormat()
.setDrivername("com.mysql.cj.jdbc.Driver")
.setDBUrl("jdbc:mysql://<starrocks-fe-host>:<port>/<database>")
.setUsername("<username>")
.setPassword("<password>")
.setQuery("SELECT teacher_id, student_id, student_num FROM dwd_student = limit 10")
.setRowTypeInfo(rowTypeInfo)
.finish();
DataSet<Row> dataSet = env.createInput(inputFormat);
dataSet.print();
}
}
输出:
+I[teacher03, abc01, 2]
+I[teacher01, abc01, 3]
方法二:使用 StarRocks Flink Connector(推荐)
StarRocks 提供官方 Connector,支持高效读写。在 pom.xml 中添加以下依赖:
<dependency>
<groupId>com.starrocks.connector</groupId>
<artifactId>flink-connector-starrocks</artifactId>
<version>1.2.9_flink-1.17</version>
</dependency>
通过 Flink SQL 直接注册 StarRocks 表:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ReadStarRocksSQL {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String createTableSQL = "CREATE TABLE starrocks_table (\n" +
" id INT,\n" +
" name STRING,\n" +
" score INT\n" +
") WITH (\n" +
" 'connector' = 'starrocks',\n" +
" 'jdbc-url' = 'jdbc:mysql://192.168.101.xx:9030',\n" +
" 'scan-url' = '192.168.101.xx:8030',\n" +
" 'database-name' = 'mydatabase',\n" +
" 'table-name' = 'table1',\n" +
" 'username' = 'root',\n" +
" 'password' = ''\n" +
")";
tableEnv.executeSql(createTableSQL);
tableEnv.executeSql("SELECT * FROM starrocks_table").print();
}
}
输出:
+----+-------------+--------------------------------+-------------+
| op | id | name | score |
+----+-------------+--------------------------------+-------------+
| +I | 4 | Julia | 25 |
| +I | 2 | Rose | 23 |
| +I | 3 | Alice | 24 |
| +I | 1 | Lily | 23 |
+----+-------------+--------------------------------+-------------+
4 rows in set
注:StarRocks 表 table1 结构
CREATE TABLE `table1` (
`id` int(11) NOT NULL COMMENT "用户 ID",
`name` varchar(65533) NULL COMMENT "用户姓名",
`score` int(11) NOT NULL COMMENT "用户得分"
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"compression" = "LZ4",
"enable_persistent_index" = "true",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);
INSERT INTO mydatabase.table1(id, name, score) VALUES(3, 'Alice', 24);
INSERT INTO mydatabase.table1(id, name, score) VALUES(2, 'Rose', 23);
INSERT INTO mydatabase.table1(id, name, score) VALUES(4, 'Julia', 25);
INSERT INTO mydatabase.table1(id, name, score) VALUES(1, 'Lily', 23);