当前位置: 首页 > article >正文

Flink 离线计算

文章目录

      • 一、样例一:读 csv 文件生成 csv 文件
      • 二、样例二:读 starrocks 写 starrocks
      • 三、样例三:DataSet、Table Sql 处理后写入 StarRocks
      • 四、遇到的坑

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>
        <!--使用Java编程语言支持DataStream / DataSet API的Table&SQL API-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <!--表程序规划器和运行时-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

一、样例一:读 csv 文件生成 csv 文件

  参考:(3)Flink学习- Table API & SQL编程

import lombok.Data;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.sinks.CsvTableSink;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        // 1、获取执行环境 ExecutionEnvironment (批处理用这个对象)
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);
//        DataSet<WC> input = env.fromElements(
//                WC.of("hello", 1),
//                WC.of("hqs", 1),
//                WC.of("world", 1),
//                WC.of("hello", 1)
//        );
        // 注册数据集
//        tEnv.registerDataSet("WordCount", input, "word, frequency");


        // 2、加载数据源到 DataSet
        DataSet<Student> csv = env.readCsvFile("D:\\tmp\\data.csv").ignoreFirstLine().pojoType(Student.class, "name", "age");

        // 3、将DataSet装换为Table
        Table students = bTableEnv.fromDataSet(csv);
        bTableEnv.registerTable("student", students);

        // 4、注册student表
        Table result = bTableEnv.sqlQuery("select name,age from student");
        result.printSchema();
        DataSet<Student> dset = bTableEnv.toDataSet(result, Student.class);
        System.out.println("count-->" + dset.count());
        dset.print();

        // 5、sink输出
        CsvTableSink sink1 = new CsvTableSink("D:\\tmp\\result.csv", ",", 1, FileSystem.WriteMode.OVERWRITE);
        String[] fieldNames = {"name", "age"};
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
        bTableEnv.registerTableSink("CsvOutPutTable", fieldNames, fieldTypes, sink1);
        result.insertInto("CsvOutPutTable");

        env.execute("SQL-Batch");
    }

    @Data
    public static class Student {
        private String name;
        private int age;
    }
}

  准备测试文件 data.csv

name,age
zhangsan,23
lisi,43
wangwu,12

  运行程序后会生成 D:\\tmp\\result.csv 文件。

二、样例二:读 starrocks 写 starrocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();

        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // 方式一
        DataSource s = env.createInput(jdbcInputFormat);

        s.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("insert into student values(?, ?)")
                .finish()
        );
        
        // 方式二
//        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
//
//        dataSource.output(JDBCOutputFormat.buildJDBCOutputFormat()
//                .setDrivername("com.mysql.jdbc.Driver")
//                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
//                .setUsername("root").setPassword("")
//                .setQuery("insert into student values(?, ?)")
//                .finish()
//        );

        env.execute("SQL-Batch");
    }
}

  数据准备:

CREATE TABLE student (
    name STRING,
    age INT
) ENGINE=OLAP 
DUPLICATE KEY(`name`)
DISTRIBUTED BY RANDOM
PROPERTIES (
"compression" = "LZ4",
"fast_schema_evolution" = "false",
"replicated_storage" = "true",
"replication_num" = "1"
);

insert into student values('zhangsan', 23);

参考:
flink 读取mysql源 JDBCInputFormat、自定义数据源
flink1.10中三种数据处理方式的连接器说明
flink读写MySQL的两种方式

注意:如果运行 java -cp flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar com.xiaoqiang.app.SQLWordCount 时报错:Exception in thread "main" com.typesafe.config.ConfigException$UnresolvedSubstitution: reference.conf @ jar:file:flink-app-1.0-SNAPSHOT-jar-with-dependencies.jar!/reference.conf: 875: Could not resolve substitution to a value: ${akka.stream.materializer}

  解决:报错:Flink Could not resolve substitution to a value: ${akka.stream.materializer}

    <build>
        <plugins>
            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <!--<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>flink.KafkaDemo1</mainClass>
                                </transformer>-->
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

三、样例三:DataSet、Table Sql 处理后写入 StarRocks

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat;
import org.apache.flink.api.java.io.jdbc.JDBCOutputFormat;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.types.Row;

public class SQLWordCount {
    public static void main(String[] args) throws Exception {
        TypeInformation[] fieldTypes = {Types.STRING, Types.INT};

        RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes);

        JDBCInputFormat jdbcInputFormat = JDBCInputFormat.buildJDBCInputFormat().setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("select * from student").setRowTypeInfo(rowTypeInfo).finish();

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment bTableEnv = BatchTableEnvironment.create(env);

        DataSet<Row> dataSource = env.createInput(jdbcInputFormat);
        dataSource.print();

        Table students = bTableEnv.fromDataSet(dataSource);
        bTableEnv.registerTable("student", students);

        Table result = bTableEnv.sqlQuery("select name, age from (select f0 as name, f1 as age from student) group by name, age");
        result.printSchema();

        DataSet<Row> dset = bTableEnv.toDataSet(result, Row.class);

        dset.output(JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.jdbc.Driver")
                .setDBUrl("jdbc:mysql://192.168.xx.xx:9030/dwd?characterEncoding=utf8")
                .setUsername("root").setPassword("")
                .setQuery("insert into student values(?, ?)")
                .finish()
        );

        env.execute("SQL-Batch");
    }
}

四、遇到的坑

  坑1:Bang equal '!=' is not allowed under the current SQL conformance level
  解决:将 sql 中的 != 修改为 <>

  坑2:java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
  解释:在最后一行代码 env.execute() 执行的时候,没有新的数据接收器被定义,对于 Flink 批处理而前一行代码 result.print() 已经触发了代码的执行和输出,所以再执行 env.execute(),就是多余的了,因此报了上面的异常。
  解决方法:去掉最后一行代码 env.execute(); 就可以了。


http://www.kler.cn/a/416968.html

相关文章:

  • 【开源免费】基于SpringBoot+Vue.JS宠物咖啡馆平台(JAVA毕业设计)
  • 004 MATLAB数值微积分
  • 家校通小程序实战教程04教师管理
  • Cesium CZML绘制Entity
  • 处理流—BufferReader和BufferWrite
  • 【C语言】结构体、联合体、枚举类型的字节大小详解
  • 【kafka02】消息队列与微服务之Kafka部署
  • 如何bug是前端还是后端
  • (即插即用模块-Attention部分) 二十、(2021) GAA 门控轴向注意力
  • 【Spring框架 二】
  • DimensionX 学习部署笔记
  • 大小写转换
  • Ubuntu 常用解压与压缩命令
  • 如何将WSL的虚拟机安装到任意目录中
  • Nginx和Apache有什么异同?
  • 关于NXP开源的MCU_boot的项目心得
  • Spring Boot 实战:分别基于 MyBatis 与 JdbcTemplate 的数据库操作方法实现与差异分析
  • 【QNX+Android虚拟化方案】125 - 如何创建android-spare镜像
  • 基于SpringBoot的欢迪迈手机商城架构设计
  • 从扩散模型开始的生成模型范式演变--SDE
  • AI/ML 基础知识与常用术语全解析
  • C# 数据类型详解:掌握数据类型及操作为高效编码奠定基础
  • 闪豆下载器(多平台视频批量下载器)v4.0
  • 神经网络中的优化方法(一)
  • 数据结构与算法——N叉树(自学笔记)
  • pandas read_csv读取中文内容文件报错UnicodeDecodeError: ‘utf-8‘ codec can‘t decode byte