Flink--API 之 Source 使用解析
目录
一、Flink Data Sources 分类概览
(一)预定义 Source
(二)自定义 Source
二、代码实战演示
(一)预定义 Source 示例
基于本地集合
基于本地文件
基于网络套接字(socketTextStream)
(二)自定义 Source 示例
三、Kafka Source 应用
四、总结
在大数据处理领域,Apache Flink 作为一款强大的流式计算框架,既能应对流处理场景,也可处理批处理任务。而数据来源(Data Sources)作为整个计算流程的 “源头活水”,其多样性与合理运用至关重要。本文将深入剖析 Flink 中 Data Sources 的相关知识,并结合丰富代码示例,助力大家透彻理解与灵活运用。
一、Flink Data Sources 分类概览
Flink 在批 / 流处理中常见的 source 主要分为两大类:预定义 Source 和自定义 Source。
(一)预定义 Source
基于本地集合的 source(Collection-based-source)
通过env.fromElements()
可传入可变参数创建 DataStream,支持如 Tuple、自定义对象等复合形式,但要注意类型需一致,不一致时虽可用Object
接收但使用易报错,像env.fromElements("haha", 1)
这种就会有问题;env.fromCollection()
支持多种Collection
具体类型(如List
,Set
,Queue
)来构建 DataStream;env.fromSequence()
可基于开始和结束值创建 DataStream(曾有env.generateSequence()
方法创建基于 Sequence 的 DataStream,不过现已废弃),此类方式常应用于学习测试编造数据场景。
基于文件的 source(File-based-source)
能读取本地文件与 HDFS 路径文件,如env.readTextFile("datas/wc.txt")
可读取本地datas
目录下wc.txt
文件,env.readTextFile("hdfs://bigdata01:9820/home/a.txt")
能获取 HDFS 特定路径文件数据。操作时要留意相对路径转绝对路径问题,避免因路径差错引发异常。
基于网络套接字(socketTextStream)
以socketTextStream(String hostname, int port)
方法从指定 Socket 读取数据创建 DataStream,其为非并行 Source,有重载方法可指定行分隔符和最大重新连接次数,默认行分隔符是\n
,最大重新连接次数为 0。使用前需先启动 Socket 服务(Mac 或 Linux 可在命令行终端输入nc -lk 8888
,Windows 需安装netcat
命令后操作),且该方式获取的 DataStream 并行度固定为 1。
(二)自定义 Source
SourceFunction
非并行数据源(并行度只能 = 1),作为接口定义基础数据源规范,实现run
方法持续产生数据,cancel
方法用于停止数据源。
RichSourceFunction
多功能非并行数据源(并行度只能 = 1),是类形式,相比SourceFunction
,额外功能体现在实例化时有open
方法执行一次(多并行度会多次执行,因多实例)、销毁实例时close
方法执行一次,且能通过getRuntimeContext
获取当前Runtime
对象(底层 API)。
ParallelSourceFunction
并行数据源(并行度能够 >= 1),接口形式,允许创建并行处理的数据源,例如自定义类实现此接口,按设定并行度生成数据。
RichParallelSourceFunction
多功能并行数据源(并行度能够 >= 1),类形式且功能齐全,建议使用。继承它并重写相关方法,能充分利用并行特性高效产生数据,同时享有Rich
类的open
、close
等方法优势。
二、代码实战演示
(一)预定义 Source 示例
在flink最常见的创建DataStream方式有四种:
l 使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
注意:类型要一致,不一致可以用Object接收,但是使用会报错,比如:env.fromElements("haha", 1);源码注释中有写:
l 使用env.fromCollection(),这种方式支持多种Collection的具体类型,如List,Set,Queue
l 使用env.generateSequence()方法创建基于Sequence的DataStream --已经废弃了
l 使用env.fromSequence()方法创建基于开始和结束的DataStream
一般用于学习测试时编造数据时使用
1.env.fromElements(可变参数);
2.env.fromColletion(各种集合);
3.env.fromSequence(开始,结束);
基于本地集合
package com.bigdata.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class _01YuDingYiSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 各种获取数据的Source
DataStreamSource<String> dataStreamSource = env.fromElements("hello world txt", "hello nihao kongniqiwa");
dataStreamSource.print();
// 演示一个错误的
//DataStreamSource<Object> dataStreamSource2 = env.fromElements("hello", 1,3.0f);
//dataStreamSource2.print();
DataStreamSource<Tuple2<String, Integer>> elements = env.fromElements(
Tuple2.of("张三", 18),
Tuple2.of("lisi", 18),
Tuple2.of("wangwu", 18)
);
elements.print();
// 有一个方法,可以直接将数组变为集合 复习一下数组和集合以及一些非常常见的API
String[] arr = {"hello","world"};
System.out.println(arr.length);
System.out.println(Arrays.toString(arr));
List<String> list = Arrays.asList(arr);
System.out.println(list);
env.fromElements(
Arrays.asList(arr),
Arrays.asList(arr),
Arrays.asList(arr)
).print();
// 第二种加载数据的方式
// Collection 的子接口只有 Set 和 List
ArrayList<String> list1 = new ArrayList<>();
list1.add("python");
list1.add("scala");
list1.add("java");
DataStreamSource<String> ds1 = env.fromCollection(list1);
DataStreamSource<String> ds2 = env.fromCollection(Arrays.asList(arr));
// 第三种
DataStreamSource<Long> ds3 = env.fromSequence(1, 100);
ds3.print();
// execute 下面的代码不运行,所以,这句话要放在最后。
env.execute("获取预定义的Source");
}
}
可以在代码中指定并行度
l 指定全局并行度:
env.setParallelism(12);
l 获得全局并行度:
env.getParallelism();
指定算子设置并行度:
获取指定算子并行度:
eventSource.getParallelism();
基于本地文件
package com.bigdata.source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class _02YuDingYiSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获取并行度
System.out.println(env.getParallelism());
// 讲第二种Source File类型的
// 给了一个相对路径,说路径不对,老闫非要写,我咋办?
// 相对路径,转绝对路径
File file = new File("datas/wc.txt");
File file2 = new File("./");
System.out.println(file.getAbsoluteFile());
System.out.println(file2.getAbsoluteFile());
DataStreamSource<String> ds1 = env.readTextFile("datas/wc.txt");
ds1.print();
// 还可以获取hdfs路径上的数据
DataStreamSource<String> ds2 = env.readTextFile("hdfs://bigdata01:9820/home/a.txt");
ds2.print();
// execute 下面的代码不运行,所以,这句话要放在最后。
env.execute("获取预定义的Source");
}
}
基于网络套接字(socketTextStream)
socketTextStream(String hostname, int port) 方法是一个非并行的Source,该方法需要传入两个参数,第一个是指定的IP地址或主机名,第二个是端口号,即从指定的Socket读取数据创建DataStream。该方法还有多个重载的方法,其中一个是socketTextStream(String hostname, int port, String delimiter, long maxRetry),这个重载的方法可以指定行分隔符和最大重新连接次数。这两个参数,默认行分隔符是”\n”,最大重新连接次数为0。
提示:
如果使用socketTextStream读取数据,在启动Flink程序之前,必须先启动一个Socket服务,为了方便,Mac或Linux用户可以在命令行终端输入nc -lk 8888启动一个Socket服务并在命令行中向该Socket服务发送数据。Windows用户可以在百度中搜索windows安装netcat命令。
通过网盘分享的文件:netcat-win32-1.11.zip
如果是windows平台:nc -lp 8888
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SourceDemo02_Socket {
public static void main(String[] args) throws Exception {
//TODO 1.env-准备环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//TODO 2.source-加载数据
DataStream<String> socketDS = env.socketTextStream("bigdata01", 8889);
//TODO 3.transformation-数据转换处理
//3.1对每一行数据进行分割并压扁
DataStream<String> wordsDS = socketDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
//3.2每个单词记为<单词,1>
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
//3.3分组
KeyedStream<Tuple2<String, Integer>, String> keyedDS = wordAndOneDS.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//3.4聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.sum(1);
//TODO 4.sink-数据输出
result.print();
//TODO 5.execute-执行
env.execute();
}
}
(二)自定义 Source 示例
SourceFunction:非并行数据源(并行度只能=1) --接口
RichSourceFunction:多功能非并行数据源(并行度只能=1) --类
ParallelSourceFunction:并行数据源(并行度能够>=1) --接口
RichParallelSourceFunction:多功能并行数据源(并行度能够>=1) --类 【建议使用的】
简单自定义非并行 Source(实现 SourceFunction)
package com.bigdata.day02;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Random;
import java.util.UUID;
/**
* 需求: 每隔1秒随机生成一条订单信息(订单ID、用户ID、订单金额、时间戳)
* 要求:
* - 随机生成订单ID(UUID)
* - 随机生成用户ID(0-2)
* - 随机生成订单金额(0-100)
* - 时间戳为当前系统时间
*/
@Data // set get toString
@AllArgsConstructor
@NoArgsConstructor
class OrderInfo{
private String orderId;
private int uid;
private int money;
private long timeStamp;
}
// class MySource extends RichSourceFunction<OrderInfo> {
//class MySource extends RichParallelSourceFunction<OrderInfo> {
class MySource implements SourceFunction<OrderInfo> {
boolean flag = true;
@Override
public void run(SourceContext ctx) throws Exception {
// 源源不断的产生数据
Random random = new Random();
while(flag){
OrderInfo orderInfo = new OrderInfo();
orderInfo.setOrderId(UUID.randomUUID().toString());
orderInfo.setUid(random.nextInt(3));
orderInfo.setMoney(random.nextInt(101));
orderInfo.setTimeStamp(System.currentTimeMillis());
ctx.collect(orderInfo);
Thread.sleep(1000);// 间隔1s
}
}
// source 停止之前需要干点啥
@Override
public void cancel() {
flag = false;
}
}
public class CustomSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 将自定义的数据源放入到env中
DataStreamSource dataStreamSource = env.addSource(new MySource())/*.setParallelism(1)*/;
System.out.println(dataStreamSource.getParallelism());
dataStreamSource.print();
env.execute();
}
}
自定义并行 Source(实现 ParallelSourceFunction)
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import java.util.UUID;
/**
* 自定义多并行度Source
*/
public class CustomerSourceWithParallelDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
mySource.print();
env.execute();
}
public static class MySource implements ParallelSourceFunction<String> {
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect(UUID.randomUUID().toString());
/*
如果不设置无限循环可以看出,设置了多少并行度就打印出多少条数据
*/
}
@Override
public void cancel() {}
}
}
自定义多功能并行 Source(实现 RichParallelSourceFunction)
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.util.UUID;
/**
* 自定义一个RichParallelSourceFunction的实现
*/
public class CustomerRichSourceWithParallelDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> mySource = env.addSource(new MySource()).setParallelism(6);
mySource.print();
env.execute();
}
/*
Rich 类型的Source可以比非Rich的多出有:
- open方法,实例化的时候会执行一次,多个并行度会执行多次的哦(因为是多个实例了)
- close方法,销毁实例的时候会执行一次,多个并行度会执行多次的哦
- getRuntime方法可以获得当前的Runtime对象(底层API)
*/
public static class MySource extends RichParallelSourceFunction<String> {
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open......");
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close......");
}
@Override
public void run(SourceContext<String> ctx) throws Exception {
ctx.collect(UUID.randomUUID().toString());
}
@Override
public void cancel() {}
}
}
三、Kafka Source 应用
Kafka 作为常用消息队列,与 Flink 集成紧密。使用时需添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
并配置相关属性,如下示例展示从 Kafka 主题读取数据并筛选含特定字样消息后打印。
创建一个topic1 这个主题:
cd /opt/installs/kafka3/
bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --create --partitions 1 --replication-factor 3 --topic topic1
通过控制台向topic1发送消息:
bin/kafka-console-producer.sh --bootstrap-server bigdata01:9092 --topic topic1
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaSource {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "bigdata01:9092");
properties.setProperty("group.id", "g1");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties);
DataStreamSource<String> dataStreamSource = env.addSource(kafkaSource);
dataStreamSource.filter(new FilterFunction<String>() {
@Override
public boolean filter(String word) throws Exception {
return word.contains("success");
}
}).print();
env.execute();
}
}
四、总结
掌握 Flink 普通 API 里 Source 的各类使用方式,无论是预定义 Source 快速搭建测试数据场景、灵活运用并行度设置优化资源,还是对接 Kafka 这类外部数据源,都是构建高效、稳定大数据处理管道的关键基石。后续可深入各部分细节实践,深挖性能调优等进阶玩法,让 Flink 在数据处理之旅中大放异彩。希望这篇文章能助大家在 Flink Source 使用上理清思路、顺利上手,开启大数据流式计算的精彩探索!