flink重温笔记(十七): flinkSQL 顶层 API ——SQLClient 及流批一体化
Flink学习笔记
前言:今天是学习 flink 的第 17 天啦!学习了 flinkSQL 的客户端工具 flinkSQL-client,主要是解决大数据领域数据计算避免频繁提交jar包,而是简单编写sql即可测试数据,文章中主要结合 hive,即编写 flinksql 可以操作 hive 中的数据表,以及流批一体化:kafak 将数据写入到 hive中,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
- Flink学习笔记
- 三、FlinkSQL Client
- 1. 启动 Hive 组件
- 2. Flink-Client 准备操作
- 3. Flink-Client 三种显示格式
- 4. 与 hive 结合的初始化
- 5. 两种方式创建 Hive 相关库表
- 5.1 SQL 方式
- 5.2 Table-API 方式
- 6. 基于 Catalog 的数据库操作
- 7. kafka 通过 FlinkSQL Client 写入 Hive
- 7.1 flink 准备 jar 包
- 7.2 案例演示
三、FlinkSQL Client
1. 启动 Hive 组件
###############################################################
# 记得先启动 hadoop 不然连接 beeline 时报错:
Error: Could not open client transport with JDBC Uri: jdbc:hive2://node1:10000: java.net.ConnectException: 拒绝连接 (Connection refused) (state=08S01,code=0)
##hive:##
nohup hive --service metastore & >> nohup.out
nohup hive --service hiveserver2 & >> hiveserver2.out
beeline
!connect jdbc:hive2://node1:10000
关闭:
ps -ef | grep hive
kill -9 pid
###############################################################
2. Flink-Client 准备操作
可以在建表或者建库的时候,可以直接编写 SQL 语句提交,web 页面也能显示 Running,省去打包操作!
- flink 的 lib 目录下 jar 包
基于 flink 1.3.1 ,hive 2.1.1,jar 包放在我的资源区了,大家有需要可以下载一下!
flink-connector-hive_2.11-1.13.1.jar
hive-exec-2.1.1.jar
antlr-runtime-3.4.jar
- 需要提前启动 flink 集群模式
./bin/start-cluster.sh
- 进入 Flink Client 操作
./bin/sql-client.sh embedded
- 退出操作
quit;
3. Flink-Client 三种显示格式
- 1- 表格模式
SET sql-client.execution.result-mode=table;
- 2- 变更日志模式
SET sql-client.execution.result-mode=changelog;
- 3- Tableau 模式
SET sql-client.execution.result-mode=tableau;
4. 与 hive 结合的初始化
创建 init.sql
# 显示打印错误日志信息
SET sql-client.verbose = true;
CREATE CATALOG myhive WITH (
'type' = 'hive',
'hive-conf-dir'='/export/server/flink-1.13.1/conf'
);
USE CATALOG myhive;
CREATE DATABASE if not exists itcast_flinksql;
USE itcast_flinksql;
初始化启动:
./sql-client.sh -i init.sql
5. 两种方式创建 Hive 相关库表
现将 hive 的 hive-site.xml 文件保存到 src/main/resource 目录下
5.1 SQL 方式
package cn.itcast.day01.catalog;
/**
* @author lql
* @time 2024-03-14 09:38:43
* @description TODO
*/
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.stream.Stream;
/**
* 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表
*/
public class SqlDDLDemo {
public static void main(String[] args) {
//todo 0)设置当前hadoop操作的用户名
System.setProperty("HADOOP_USER_NAME", "root");
//todo 1)初始化flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bbSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bbSetting);
//todo 2)创建HiveCatalog
String catalogName = "myHive";
String databaseName = "itcast_flinksql";
HiveCatalog catalog = new HiveCatalog(
catalogName, //指定catalog的名字
"default", //默认数据库的名字
"src/main/resources", //指定hive-site.xml文件的路径
"2.1.1" //指定hive的版本
);
//todo 3)注册目录
System.out.println("===========注册目录==================");
tabEnv.registerCatalog(catalogName, catalog);
//todo 4)切换目录
System.out.println("===========切换目录==================");
tabEnv.useCatalog(catalogName);
//todo 5)创建数据库
System.out.println("===========创建数据库==================");
String createDBSql = "CREATE DATABASE IF NOT EXISTS "+catalogName+"."+databaseName;
tabEnv.executeSql(createDBSql);
//todo 6)切换数据库
System.out.println("===========切换数据库==================");
tabEnv.useDatabase(databaseName);
//todo 7)创建表
System.out.println("===========创建表==================");
String createTableSql = "CREATE TABLE IF NOT EXISTS mytable(name string, age int)";
tabEnv.executeSql(createTableSql);
//todo 8)查询所有的表
System.out.println("===========创建表==================");
tabEnv.listTables();
}
}
结果:成功注册了目录,数据库,表!
总结:
- 1- 需要设置 hadoop 权限
- 2- new 一个 HiveCatalog 对象
5.2 Table-API 方式
package cn.itcast.day01.catalog;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;
/**
* @author lql
* @time 2023-03-14 09:10:58
* @description TODO: 用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
*/
public class JavaClientDemo {
public static void main(String[] args) throws DatabaseAlreadyExistException, TableAlreadyExistException, DatabaseNotExistException {
// 设置当前hadoop操作的用户名
System.setProperty("HADOOP_USER_NAME","root");
// 创建流处理环境,设置并行度为 5
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(5);
// 设置 table 环境,使用 blink planner,并开启流式查询模式
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);
// 定义 Hive Catalog 名称、数据库名称、表格名称等信息
String catalogName = "myhive";
String databaseName = "itcast_flinksql";
String tableName = "test";
// 创建一个 HiveCatalog 实例,用于访问 Hive 中的表格和数据库
HiveCatalog catalog = new HiveCatalog(
catalogName, // Catalog 名称
"default", // 默认使用的数据库
"src/main/resources", // Hive 配置文件的目录路径(hive-site.xml)
"2.1.1" // Hive 版本号
);
// 注册 HiveCatalog
// 在注册之前需要保证在指定目录下有 metadata 目录,并且 metadata 目录下没有 myhive 目录,否则会失败
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
// 创建数据库
System.out.println("---------------创建数据库------------------------");
catalog.createDatabase(databaseName,
new CatalogDatabaseImpl(
new HashMap<>(),"my_comments"),true);
// 创建表格
System.out.println("--------------创建表table-----------------------------");
TableSchema schema = TableSchema.builder()
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
Map<String,String> properties = new HashMap<>();
/**
* flink 通用表将具有is_generic=true.
*/
properties.put("is_generic",String.valueOf(true));
catalog.createTable(
new ObjectPath(databaseName,tableName),
new CatalogTableImpl(
schema,
properties,
"my_comments"),
true);
System.out.println(tableName);
}
}
结果:成功注册了目录,数据库,表!
总结:
- 1- 建立数据库时,需要 new 一个 CatalogDatabaseImpl
- 2- 创建表格式,需要先定义数据结构:schema
- 3- 设置 hadoop 操作的用户名
6. 基于 Catalog 的数据库操作
例子:对 hive 数据库进行创建、删除、判断等操作的代码 java 实现
package cn.itcast.day01.catalog;
/**
* @author lql
* @time 2024-03-14 09:37:16
* @description TODO
*/
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.HashMap;
/**
* 数据库操作
*/
public class DatabaseOperater {
public static void main(String[] args) throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {
//设置当前hadoop操作的用户名
System.setProperty("HADOOP_USER_NAME", "root");
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
streamEnv.setParallelism(5);
EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);
String catalogName = "myhive";
String databaseName = "itcast_flinksql";
String tableName = "test";
HiveCatalog catalog = new HiveCatalog(
catalogName, // catalog name
"default", // default database
"src/main/resources", // Hive config (hive-site.xml) directory
"2.1.1" // Hive version
);
//注册目录
tableEnv.registerCatalog(catalogName, catalog);
tableEnv.useCatalog(catalogName);
System.out.println("---------------创建数据库------------------------");
catalog.createDatabase(databaseName,
new CatalogDatabaseImpl(new HashMap<>(), "my comment"), true);
System.out.println("---------------删除数据库------------------------");
catalog.dropDatabase(databaseName, true, true);
System.out.println("---------------验证数据库是否存在------------------------");
boolean result = catalog.databaseExists(databaseName);
System.out.println("---------------"+result+"------------------------");
System.out.println("---------------在目录中列出数据库------------------------");
catalog.listDatabases();
}
}
总结:需要 new 一个 HashMap<>()
7. kafka 通过 FlinkSQL Client 写入 Hive
7.1 flink 准备 jar 包
flink-sql-connector-kafka_2.12-1.13.1.jar
flink-connector-kafka_2.12-1.13.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar
# 如果是集群模式,需要将 lib 包分发到各台机器
7.2 案例演示
- 准备工作:
# 启动 zookeeper
# 启动 hadoop
# 启动 hive
# 启动 flink cluster 模式
# 进入 flinksql client 客户端,注意要用前面的 init.sql 脚本初始化!
- 逻辑 sql:
CREATE TABLE IF NOT EXISTS `order`(
id INT,
category STRING,
areaName STRING,
money INT,
`timestamp` BIGINT,
eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH (
'connector' = 'kafka',
'topic' = 'order', -- 指定消费的topic
'scan.startup.mode' = 'latest-offset', -- 指定起始offset位置
'properties.zookeeper.connect' = 'node1:2181',
'properties.bootstrap.servers' = 'node1:9092',
'properties.group.id' = 'order_01',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
- 启动kafka生产者的数据:
{"id":1,"timestamp":1588870980000,"category":"电脑","areaName":"石家庄","money":"1450"}
{"id":2,"timestamp":1588870860000,"category":"手机","areaName":"北京","money":"1450"}
{"id":3,"timestamp":1588870980000,"category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":1588885260000,"category":"电脑","areaName":"上海","money":"1513"}
{"id":5,"timestamp":1588870980000,"category":"家电","areaName":"北京","money":"1550"}
{"id":6,"timestamp":1588870860000,"category":"电脑","areaName":"深圳","money":"1550"}
结果:kafka 数据源源不断写入到 hive 表中
总结:
- 1- 在 hive 中可以看到表,但是不能查询数据(报错 Error),因为这个表是 flink 通用表;
- 2- 如果想要建 hive 兼容表,需要通用表将具有 is_generic = true,改为 is_generic = False。