当前位置: 首页 > article >正文

【Flink-scala】DataSet编程模型介绍及数据源

DataStream 学习

1.DataStream编程模型总结

文章目录

  • DataStream 学习
  • 介绍
  • 一、DataSet编程模型
  • 二、数据源
    • 1.文件类数据源
    • 2.集合类数据源
    • 3.通用类数据源
    • 4第三方文件系统


介绍

Flink把批处理看成是一个流处理的特例,因此可以在底层统一的流处理引擎上,同时提供了STREAM API和SET API,经典的有限数据流处理方式有:
在这里插入图片描述
由于批处理的对象是有界数据集,因此批处理不需要时间和窗口机制

一、DataSet编程模型

link批处理程序的基本运行流程包括以下4个步骤:

  1. 创建执行环境;
  2. 创建数据源;
  3. 指定对数据进行的转换操作;
  4. 指定数据计算的输出结果方式。
    上面第1步中创建批处理执行环境的方式如下:
val env = ExecutionEnvironment.getExecutionEnvironment

此外,还需要在pom.xml文件中引入flink-scala_2.12依赖库,具体如下:

<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
</dependency>

编程模型如图:
在这里插入图片描述
数据的处理过程:
在这里插入图片描述
读取数据源-进行转换操作-获取结果数据。

批处理数据的基本流程:
在这里插入图片描述

二、数据源

1.文件类数据源

Flink提供了从文件中读取数据生成DataSet的多种方法,具体如下:

readTextFile(path):逐行读取文件并将文件内容转换成DataSet类型数据集;

readTextFileWithValue(path):读取文本文件内容,并将文件内容转换成DataSet[StringValue]类型数据集。
该方法与readTextFile(String)不同的是,其泛型是StringValue,是一种可变的String类型,通过StringValue存储文本数据可以有效降低String对象创建数量,减小垃圾回收的压力;

readCsvFile(path):解析以逗号(或其他字符)分隔字段的文件,返回元组或POJO对象;

readSequenceFile(Key, Value, path):读取SequenceFile,以Tuple2<Key, Value>类型返回。

以readTextFile(path)为例,可以使用如下语句读取文本文件内容:

val dataSet : DataSet[String] = env.readTextFile("file:///home/hadoop/word.txt")

假设有一个CSV格式文件sales.csv,内容如下:

transactionId,customerId,itemId,amountPaid
111,1,1,100.0
112,2,2,505.0
113,1,3,510.0
114,2,4,600.0
115,3,2,500.0

则可以使用如下程序读取该CSV文件:

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
 
object ReadCSVFile{
  def main(args: Array[String]): Unit = {
    val bEnv = ExecutionEnvironment.getExecutionEnvironment
    val filePath="file:///home/hadoop/sales.csv"
    val csv = bEnv.readCsvFile[SalesLog](filePath,ignoreFirstLine = true)//这里
    csv.print()
  }
  case class SalesLog(transactionId:String,customerId:String,itemId:String,amountPaid:Double)//这里定义的类型
}

结果如下:

SalesLog(111,1,1,100.0)
SalesLog(112,2,2,505.0)
SalesLog(113,1,3,510.0)
SalesLog(114,2,4,600.0)
SalesLog(115,3,2,500.0)

2.集合类数据源

Flink提供了fromCollection()、fromElements()和generateSequence()等方法,来构建集合类数据源,具体如下:

fromCollection():从集合中创建DataSet数据集,集合中的元素数据类型相同;

fromElements():从给定数据元素序列中创建DataSet数据集,且所有的数据对象类型必须一致;

generateSequence():指定一个范围区间,然后在区间内部生成数字序列数据集,由于是并行处理的,所以最终的顺序不能保证一致。

val myArray = Array("hello world","hadoop spark flink")
val collectionSet = env.fromCollection(myArray)//从集合中获取

val dataSet = env.fromElements("hadoop","spark","flink")//一个个元素获取

val numSet = env.generateSequence(1,10)//生成的数据 1 2 3 4 ... 10   包含10

3.通用类数据源

以Flink内置的JDBCInputFormat类为实例,介绍通用类数据源的用法。
假设已经在Linux系统中安装了MySQL数据库,在Linux终端中执行如下命令启动MySQL:
输入数据库登录密码以后,就可以启动MySQL了,然后,执行如下命令创建数据库,并添加数据:

$ create database flink
$ use flink
$ create table student(sno char(8),cno char(2),grade int);
$ insert into student values('95001','1',96);
$ insert into student values('95002','1',94);

新建代码文件InputFromMySQL.scala,内容如下:
i

mport org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
import org.apache.flink.api.scala._
 
object InputFromMySQL{
  def main(args: Array[String]): Unit = {
 
    //创建执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
 
 //使用JDBC输入格式从关系数据库读取数据
    val inputMySQL = env.createInput(JDBCInputFormat.buildJDBCInputFormat()
      //数据库连接驱动名称
      .setDrivername("com.mysql.jdbc.Driver")
      //数据库连接驱动名称
      .setDBUrl("jdbc:mysql://localhost:3306/flink")
      //数据库连接用户名
      .setUsername("root")
      //数据库连接密码
      .setPassword("123456")
      //数据库连接查询SQL
      .setQuery("select sno,cno,grade from student")
      //字段类型、顺序和个数必须与SQL保持一致
      .setRowTypeInfo(new RowTypeInfo(
      BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.STRING_TYPE_INFO,
        BasicTypeInfo.INT_TYPE_INFO))
      .finish()
    )
    inputMySQL.print()
  }
}

新建pom.xml文件,在里面添加与访问MySQL相关的依赖包,内容如下:

<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
<dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.40</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
    </dependencies>
 <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
 <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

使用Maven工具对程序进行编译打包,然后,提交到Flink中运行(请确认Flink已经启动)。运行结束以后,可以在屏幕上看到如下的输出结果:

95001,1,96
95002,1,94

4第三方文件系统

Flink通过FileSystem类来抽象自己的文件系统,这个抽象提供了各类文件系统实现的通用操作和最低保证。

每种数据源(比如HDFS、S3、Alluxio、XtreemFS、FTP等)可以继承和实现FileSystem类,将数据从各个系统读取到Flink中。

DataSet API中内置了HDFS数据源,这里给出一个读取HDFS文件系统的一个实例,代码如下:

import org.apache.flink.api.scala.ExecutionEnvironment
 
object ReadHDFS{
  def main(args: Array[String]): Unit = {
 
    //获取执行环境
    val env = ExecutionEnvironment.getExecutionEnvironment
 
    //创建数据源
val inputHDFS = env.readTextFile("hdfs://localhost:9000/word.txt")
 
    //打印输出
inputHDFS.print()
  }
}

获取数据源就1行代码 ,但是在pom中需要添加依赖。
在pom.xml文件中,需要添加与访问HDFS相关的依赖包,内容如下:

<project>
    <groupId>cn.edu.xmu.dblab</groupId>
    <artifactId>simple-project</artifactId>
    <modelVersion>4.0.0</modelVersion>
    <name>Simple Project</name>
    <packaging>jar</packaging>
    <version>1.0</version>
    <repositories>
        <repository>
            <id>alimaven</id>
            <name>aliyun maven</name>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
        </repository>
    </repositories>
<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.11.2</version>
        </dependency>
 <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.1.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
        </dependency>
    </dependencies>
 <build>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.4.6</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

使用Maven工具对程序进行编译打包。
为了让Flink能够顺利访问HDFS,需要修改环境变量
如果环境变量已经完成了修改,这里就不需要重复操作;如果还没有则修改,添加hadoop环境变量
修改如下。

$ vim ~/.bashrc
export HADOOP_HOME=/usr/local/hadoop
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
$ source ~/.bashrc

使用flink run命令把ReadHDFS程序提交到Flink中运行(请确认Flink和Hadoop已经启动),如果运行成功,就可以在屏幕上看到"hdfs://localhost:9000/word.txt"文件里面的内容了。



http://www.kler.cn/a/449273.html

相关文章:

  • 重塑数字文档处理:TX Text Control的2024年里程碑
  • C/C++圣诞树
  • Pytorch | 从零构建ParNet/Non-Deep Networks对CIFAR10进行分类
  • 「Mac畅玩鸿蒙与硬件47」UI互动应用篇24 - 虚拟音乐控制台
  • centos-stream9系统安装docker
  • 【Java基础面试题016】JavaObject类中有什么主要方法,作用是什么?
  • Pytorch | 从零构建ParNet/Non-Deep Networks对CIFAR10进行分类
  • 在FreeRTOS中动态创建任务,假如在最后一个参数写NULL,该任务有任务句柄吗
  • 安装管理docker
  • 重温设计模式--享元模式
  • 路由器做WPAD、VPN、透明代理中之间一个
  • CSS系列(24)-- 打印样式详解
  • 基于JAVA_JSP电子书下载系统的设计与实现【源码+文档+部署讲解】
  • 设计模式详解(十二):单例模式——Singleton
  • 如何注册和使用Facebook企业号
  • uniapp验证码
  • 数据库管理-第274期 Oracle Enterprise Manager 24ai新特性一览(20241223)
  • 使用frp进行内网穿透
  • 程控电阻箱应用中需要注意哪些安全事项?
  • Log4j简介
  • 在Excel中绘制ActiveX控件:解决文本编辑框定位问题
  • ubuntu装P104
  • 操作系统(22)外存的组织方式
  • 初识Go语言
  • docker部署微信小程序自动构建发布和更新
  • “电找车“ | 助力移动充电机器人快速落地