当前位置: 首页 > article >正文

flink重温笔记(十七): flinkSQL 顶层 API ——SQLClient 及流批一体化

Flink学习笔记

前言:今天是学习 flink 的第 17 天啦!学习了 flinkSQL 的客户端工具 flinkSQL-client,主要是解决大数据领域数据计算避免频繁提交jar包,而是简单编写sql即可测试数据,文章中主要结合 hive,即编写 flinksql 可以操作 hive 中的数据表,以及流批一体化:kafak 将数据写入到 hive中,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!

喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"


文章目录

  • Flink学习笔记
    • 三、FlinkSQL Client
      • 1. 启动 Hive 组件
      • 2. Flink-Client 准备操作
      • 3. Flink-Client 三种显示格式
      • 4. 与 hive 结合的初始化
      • 5. 两种方式创建 Hive 相关库表
        • 5.1 SQL 方式
        • 5.2 Table-API 方式
      • 6. 基于 Catalog 的数据库操作
      • 7. kafka 通过 FlinkSQL Client 写入 Hive
        • 7.1 flink 准备 jar 包
        • 7.2 案例演示

三、FlinkSQL Client

1. 启动 Hive 组件

###############################################################	
# 记得先启动 hadoop 不然连接 beeline 时报错:
Error: Could not open client transport with JDBC Uri: jdbc:hive2://node1:10000: java.net.ConnectException: 拒绝连接 (Connection refused) (state=08S01,code=0)

##hive:##
	nohup hive --service metastore & >> nohup.out
	nohup hive --service hiveserver2 & >> hiveserver2.out
	beeline
		!connect jdbc:hive2://node1:10000
		
	关闭:
	ps -ef | grep hive
	kill -9 pid
###############################################################

2. Flink-Client 准备操作

可以在建表或者建库的时候,可以直接编写 SQL 语句提交,web 页面也能显示 Running,省去打包操作!

  • flink 的 lib 目录下 jar 包

基于 flink 1.3.1 ,hive 2.1.1,jar 包放在我的资源区了,大家有需要可以下载一下!

flink-connector-hive_2.11-1.13.1.jar
hive-exec-2.1.1.jar
antlr-runtime-3.4.jar
  • 需要提前启动 flink 集群模式
./bin/start-cluster.sh
  • 进入 Flink Client 操作
./bin/sql-client.sh embedded
  • 退出操作
quit;

3. Flink-Client 三种显示格式

  • 1- 表格模式
SET sql-client.execution.result-mode=table;
  • 2- 变更日志模式
SET sql-client.execution.result-mode=changelog;
  • 3- Tableau 模式
SET sql-client.execution.result-mode=tableau;

4. 与 hive 结合的初始化

创建 init.sql

# 显示打印错误日志信息
SET sql-client.verbose = true;
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'hive-conf-dir'='/export/server/flink-1.13.1/conf'
  );

USE CATALOG myhive;

CREATE DATABASE if not exists itcast_flinksql;

USE itcast_flinksql;

初始化启动:

./sql-client.sh -i init.sql

5. 两种方式创建 Hive 相关库表

现将 hive 的 hive-site.xml 文件保存到 src/main/resource 目录下

5.1 SQL 方式
package cn.itcast.day01.catalog;

/**
 * @author lql
 * @time 2024-03-14 09:38:43
 * @description TODO
 */

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.util.stream.Stream;

/**
 * 用户可以使用 DDL 通过 Table API 或者 SQL Client 在 Catalog 中创建表
 */
public class SqlDDLDemo {
    public static void main(String[] args) {
        //todo 0)设置当前hadoop操作的用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        EnvironmentSettings bbSetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, bbSetting);

        //todo 2)创建HiveCatalog
        String catalogName = "myHive";
        String databaseName = "itcast_flinksql";
        HiveCatalog catalog = new HiveCatalog(
                catalogName, //指定catalog的名字
                "default", //默认数据库的名字
                "src/main/resources", //指定hive-site.xml文件的路径
                "2.1.1" //指定hive的版本
        );

        //todo 3)注册目录
        System.out.println("===========注册目录==================");
        tabEnv.registerCatalog(catalogName, catalog);

        //todo 4)切换目录
        System.out.println("===========切换目录==================");
        tabEnv.useCatalog(catalogName);

        //todo 5)创建数据库
        System.out.println("===========创建数据库==================");
        String createDBSql = "CREATE DATABASE IF NOT EXISTS "+catalogName+"."+databaseName;
        tabEnv.executeSql(createDBSql);

        //todo 6)切换数据库
        System.out.println("===========切换数据库==================");
        tabEnv.useDatabase(databaseName);

        //todo 7)创建表
        System.out.println("===========创建表==================");
        String createTableSql = "CREATE TABLE IF NOT EXISTS mytable(name string, age int)";
        tabEnv.executeSql(createTableSql);

        //todo 8)查询所有的表
        System.out.println("===========创建表==================");
        tabEnv.listTables();

    }
}

结果:成功注册了目录,数据库,表!

总结:

  • 1- 需要设置 hadoop 权限
  • 2- new 一个 HiveCatalog 对象

5.2 Table-API 方式
package cn.itcast.day01.catalog;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import java.util.HashMap;
import java.util.Map;


/**
 * @author lql
 * @time 2023-03-14 09:10:58
 * @description TODO: 用户可以用编程的方式使用Java 或者 Scala 来创建 Catalog 表。
 */
public class JavaClientDemo {
    public static void main(String[] args) throws DatabaseAlreadyExistException, TableAlreadyExistException, DatabaseNotExistException {
        // 设置当前hadoop操作的用户名
        System.setProperty("HADOOP_USER_NAME","root");

        // 创建流处理环境,设置并行度为 5
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(5);

        // 设置 table 环境,使用 blink planner,并开启流式查询模式
        EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);

        // 定义 Hive Catalog 名称、数据库名称、表格名称等信息
        String catalogName = "myhive";
        String databaseName = "itcast_flinksql";
        String tableName = "test";

        // 创建一个 HiveCatalog 实例,用于访问 Hive 中的表格和数据库
        HiveCatalog catalog = new HiveCatalog(
                catalogName,                   // Catalog 名称
                "default",                // 默认使用的数据库
                "src/main/resources",  // Hive 配置文件的目录路径(hive-site.xml)
                "2.1.1"                   // Hive 版本号
        );

        // 注册 HiveCatalog
        // 在注册之前需要保证在指定目录下有 metadata 目录,并且 metadata 目录下没有 myhive 目录,否则会失败
        tableEnv.registerCatalog(catalogName, catalog);
        tableEnv.useCatalog(catalogName);

        // 创建数据库
        System.out.println("---------------创建数据库------------------------");
        catalog.createDatabase(databaseName,
                new CatalogDatabaseImpl(
                        new HashMap<>(),"my_comments"),true);

        // 创建表格
        System.out.println("--------------创建表table-----------------------------");
        TableSchema schema = TableSchema.builder()
                .field("name", DataTypes.STRING())
                .field("age", DataTypes.INT())
                .build();

        Map<String,String> properties = new HashMap<>();

        /**
         * flink 通用表将具有is_generic=true.
		*/

        properties.put("is_generic",String.valueOf(true));

        catalog.createTable(
                new ObjectPath(databaseName,tableName),
                new CatalogTableImpl(
                        schema,
                        properties,
                        "my_comments"),
                true);

        System.out.println(tableName);
    }
}

结果:成功注册了目录,数据库,表!

总结:

  • 1- 建立数据库时,需要 new 一个 CatalogDatabaseImpl
  • 2- 创建表格式,需要先定义数据结构:schema
  • 3- 设置 hadoop 操作的用户名

6. 基于 Catalog 的数据库操作

例子:对 hive 数据库进行创建、删除、判断等操作的代码 java 实现

package cn.itcast.day01.catalog;

/**
 * @author lql
 * @time 2024-03-14 09:37:16
 * @description TODO
 */
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;

import java.util.HashMap;

/**
 * 数据库操作
 */
public class DatabaseOperater {
    public static void main(String[] args) throws DatabaseAlreadyExistException, DatabaseNotEmptyException, DatabaseNotExistException {
        //设置当前hadoop操作的用户名
        System.setProperty("HADOOP_USER_NAME", "root");

        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
        streamEnv.setParallelism(5);

        EnvironmentSettings tableEnvSettings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, tableEnvSettings);

        String catalogName = "myhive";
        String databaseName = "itcast_flinksql";
        String tableName = "test";
        HiveCatalog catalog = new HiveCatalog(
                catalogName,                   // catalog name
                "default",                // default database
                "src/main/resources",  // Hive config (hive-site.xml) directory
                "2.1.1"                   // Hive version
        );
        //注册目录
        tableEnv.registerCatalog(catalogName, catalog);
        tableEnv.useCatalog(catalogName);

        System.out.println("---------------创建数据库------------------------");
        catalog.createDatabase(databaseName,
                new CatalogDatabaseImpl(new HashMap<>(), "my comment"), true);

        System.out.println("---------------删除数据库------------------------");
        catalog.dropDatabase(databaseName, true, true);

        System.out.println("---------------验证数据库是否存在------------------------");
        boolean result = catalog.databaseExists(databaseName);
        System.out.println("---------------"+result+"------------------------");

        System.out.println("---------------在目录中列出数据库------------------------");
        catalog.listDatabases();
    }
}

总结:需要 new 一个 HashMap<>()


7. kafka 通过 FlinkSQL Client 写入 Hive

7.1 flink 准备 jar 包
flink-sql-connector-kafka_2.12-1.13.1.jar
flink-connector-kafka_2.12-1.13.1.jar
flink-shaded-hadoop-2-uber-2.7.5-10.0.jar

# 如果是集群模式,需要将 lib 包分发到各台机器
7.2 案例演示
  • 准备工作:
# 启动 zookeeper
# 启动 hadoop
# 启动 hive
# 启动 flink cluster 模式
# 进入 flinksql client 客户端,注意要用前面的 init.sql 脚本初始化!
  • 逻辑 sql:
CREATE TABLE IF NOT EXISTS `order`(
  id INT,
  category STRING,
  areaName STRING,
  money INT,
  `timestamp` BIGINT,
  eventTime AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp` / 1000, 'yyyy-MM-dd HH:mm:ss')), -- 事件时间
  WATERMARK FOR eventTime AS eventTime - INTERVAL '10' SECOND -- 水印
) WITH (
  'connector' = 'kafka',
  'topic' = 'order',                        -- 指定消费的topic
  'scan.startup.mode' = 'latest-offset',    -- 指定起始offset位置
  'properties.zookeeper.connect' = 'node1:2181',
  'properties.bootstrap.servers' = 'node1:9092',
  'properties.group.id' = 'order_01',
  'format' = 'json',
  'json.ignore-parse-errors' = 'true'
);
  • 启动kafka生产者的数据:
{"id":1,"timestamp":1588870980000,"category":"电脑","areaName":"石家庄","money":"1450"}
{"id":2,"timestamp":1588870860000,"category":"手机","areaName":"北京","money":"1450"}
{"id":3,"timestamp":1588870980000,"category":"手机","areaName":"北京","money":"8412"}
{"id":4,"timestamp":1588885260000,"category":"电脑","areaName":"上海","money":"1513"}
{"id":5,"timestamp":1588870980000,"category":"家电","areaName":"北京","money":"1550"}
{"id":6,"timestamp":1588870860000,"category":"电脑","areaName":"深圳","money":"1550"}

结果:kafka 数据源源不断写入到 hive 表中

总结:

  • 1- 在 hive 中可以看到表,但是不能查询数据(报错 Error),因为这个表是 flink 通用表;
  • 2- 如果想要建 hive 兼容表,需要通用表将具有 is_generic = true,改为 is_generic = False。


http://www.kler.cn/a/273692.html

相关文章:

  • ubuntu20下编译linux1.0 (part1)
  • matlab编写分段Hermite插值多项式
  • vs2022开发.net窗体应用开发环境安装配置以及程序发布详细教程
  • Elixir语言的学习路线
  • Redis 笔记(二)-Redis 安装及测试
  • 使用python将多个Excel表合并成一个表
  • Excel xlsx file:not supported
  • 零基础学python:10、 函数的基础3
  • 鸿蒙Harmony应用开发—ArkTS声明式开发(绘制组件:Rect)
  • springboot+poi-tl根据模板导出word(含动态表格和图片),并将导出的文档压缩zip导出
  • k8s admin 用户生成token
  • JavaScript之继承
  • 【sql】初识 where EXISTS
  • MySQL---索引
  • 第十四届蓝桥杯省赛C++B组题解
  • 【Unity动画】Unity如何导入序列帧动画(GIF)
  • 护眼灯和白炽灯哪个更保护眼睛?四款必选的高口碑护眼台灯
  • 第二十二章 构建和配置 Nginx (UNIX® Linux macOS) - 示例:为特定路径上的所有流量启用 CSP 路由
  • idea2023 运行多 springboot 实例
  • 计算机网络:TCP篇
  • 外贸网站常用的wordpress模板
  • 学习笔记Day8:GEO数据挖掘-基因表达芯片
  • 用JDBC游标的方式导出mysql数据以及springboot打包成exe程序实践
  • 每天一个数据分析题(二百一十六)
  • SpringBoot2.7集成Swagger3
  • AcWing 1510:楼梯 ← 浮点数二分