Flink1.14.* 各种算子在StreamTask控制下如何调用的源码
- 前言:
- 一、StreamTask执行算子的生命周期
- 二、 Source的streamTask用的是SourceStreamTask
- 三、基础转换操作,窗口用的是OneInputStreamTask
- 1、初始化OneInputStreamTask
- 2、StreamTask运行invoke调用的是StreamTask的processInput方法
- 3、从缓冲区获取数据放入到内存中
- 4、调用算子的processElement方法处理数据,
- 四、sink的streamTask用的也是OneInputStreamTask
- 五、OneInputStreamTask和SourceStreamTask类关系图
前言:
在 Apache Flink 中,StreamTask
类是处理流数据的核心执行单元。
它负责管理算子的生命周期,并调用算子的处理方法。StreamTask 类的全路径(即完整的包名和类名)如下:
StreamTask
类位于 flink-streaming-java
模块中,具体的包结构为 org.apache.flink.streaming.runtime.tasks
。
全路径如下
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
一、StreamTask执行算子的生命周期
先看StreamTask
大体执行流程(忽略实现类的细节)
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails {
protected OP mainOperator;
private boolean mailboxLoopRunning;
//第一步构造函数,把processInput赋值给mailboxProcessor
protected StreamTask(Environment environment, @Nullable TimerService timerService, UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor actionExecutor, TaskMailbox mailbox) throws Exception {
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
//默认为true
this.mailboxLoopRunning = true;
}
//第三步StreamTask执行
public final void invoke() throws Exception {
//省略代码
this.runMailboxLoop();
}
//SourceStreamTask会重写这个方法,OneInputStreamTask不会重写
protected void processInput(Controller controller) throws Exception {
//删除干扰代码,
}
}
public class MailboxProcessor implements Closeable {
protected final MailboxDefaultAction mailboxDefaultAction;
//第二步 构造函数把processInput方法赋值给mailboxDefaultAction
public MailboxProcessor(MailboxDefaultAction mailboxDefaultAction, TaskMailbox mailbox, StreamTaskActionExecutor actionExecutor) {
//这里mailboxDefaultAction传的是this::processInput
this.mailboxDefaultAction = (MailboxDefaultAction)Preconditions.checkNotNull(mailboxDefaultAction);
}
//第四步,
public void runMailboxLoop() throws Exception {
//suspended默认是false
this.suspended = !this.mailboxLoopRunning;
//this.isNextLoopPossible默认是true
while(this.isNextLoopPossible()) {
this.mailboxProcessor.runMailboxLoop();
}
}
private boolean isNextLoopPossible() {
return !this.suspended;
}
//第五步,调用processInput
public void runMailboxLoop() throws Exception {
//这个执行的是processInput方法
this.mailboxDefaultAction.runDefaultAction(defaultActionContext);
}
}
不同的实现类都是按照上面的步骤初始化执行的
二、 Source的streamTask用的是SourceStreamTask
@Internal
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> {
private final SourceStreamTask<OUT, SRC, OP>.LegacySourceFunctionThread sourceThread;
protected void init() {
//这个mainOperator是StreamTask的字段,,
SourceFunction<?> source = (SourceFunction)((StreamSource)this.mainOperator).getUserFunction();
}
protected void processInput(Controller controller) throws Exception {
//这里启动线程的run方法
this.sourceThread.start();
}
private class LegacySourceFunctionThread extends Thread {
private final CompletableFuture<Void> completionFuture = new CompletableFuture();
LegacySourceFunctionThread() {
}
public void run() {
try {
if (!SourceStreamTask.this.operatorChain.isTaskDeployedAsFinished()) {
StreamTask.LOG.debug("Legacy source {} skip execution since the task is finished on restore", SourceStreamTask.this.getTaskNameWithSubtaskAndId());
((StreamSource)SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.operatorChain);
}
//删除干扰代码
} catch (Throwable var2) {
//删除干扰代码
}
}
}
}
第一点需要注意的是由于SourceStreamTask
重写了streamTask
的processInput
方法,所以streamTask
的invoke
方法执行的是子类的SourceStreamTask
的processInput
方法
第二点看一下init
方法,这里(SourceFunction)((StreamSource)this.mainOperator).getUserFunction()
就是获取的source
算子,不清楚的可以看一下kafkaSource这篇文章Flink 1.14.*版本kafkaSource源码
由这里来触发SourceFunction
的run
方法,即FlinkKafkaConsumerBase
的run
方法
三、基础转换操作,窗口用的是OneInputStreamTask
这种一般都是中间算子,或者最后一个算子(例如
kafkaSink
),所以主要涉及到从输入源获取数据,处理数据,并将结果写入输出中
如果连着看下面两篇文章,你就会知道为什么sink也是用的OneInputStreamTask
了
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码
1、初始化OneInputStreamTask
public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamOperator<IN, OUT>> {
public void init() throws Exception {
//output是私有类StreamTaskNetworkOutput对象
DataOutput<IN> output = this.createDataOutput(numRecordsIn);
StreamTaskInput<IN> input = this.createTaskInput(inputGate);
//这个inputProcessor字段是给父类StreamTask初始化的,这时候父类inputProcessor=StreamOneInputProcessor
this.inputProcessor = new StreamOneInputProcessor(input, output, this.operatorChain);
}
private StreamTaskInput<IN> createTaskInput(CheckpointedInputGate inputGate) {
int numberOfInputChannels = inputGate.getNumberOfInputChannels();
StatusWatermarkValve statusWatermarkValve = new StatusWatermarkValve(numberOfInputChannels);
TypeSerializer<IN> inSerializer = this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());
return StreamTaskNetworkInputFactory.create(inputGate, inSerializer, this.getEnvironment().getIOManager(), statusWatermarkValve, 0, this.getEnvironment().getTaskStateManager().getInputRescalingDescriptor(), (gateIndex) -> {
return ((StreamEdge)this.configuration.getInPhysicalEdges(this.getUserCodeClassLoader()).get(gateIndex)).getPartitioner();
}, this.getEnvironment().getTaskInfo());
}
//返回的是下面私有类StreamTaskNetworkOutput对象
private DataOutput<IN> createDataOutput(Counter numRecordsIn) {
return new OneInputStreamTask.StreamTaskNetworkOutput(this.operatorChain.getFinishedOnRestoreInputOrDefault((Input)this.mainOperator), this.inputWatermarkGauge, numRecordsIn);
}
//私有内部类,对应上面init中的output
private static class StreamTaskNetworkOutput<IN> implements DataOutput<IN> {
private final Input<IN> operator;
public void emitRecord(StreamRecord<IN> record) throws Exception {
//调用的算子的processElement方法
this.operator.processElement(record);
}
}
}
这里是调用init
初始化,StreamOneInputProcessor
一起初始化了
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
private StreamTaskInput<IN> input;
private DataOutput<IN> output;
public StreamOneInputProcessor(StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {
//此input就是StreamTaskNetworkInput
this.input = (StreamTaskInput)Preconditions.checkNotNull(input);
//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象
this.output = (DataOutput)Preconditions.checkNotNull(output);
this.endOfInputAware = (BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);
}
public DataInputStatus processInput() throws Exception {
DataInputStatus status = this.input.emitNext(this.output);
//删除干扰代码
return status;
}
}
后面看到this.inputProcessor.processInput
其实就是调用的上面类的processInput
方法
下面简单介绍一下StreamTaskNetworkInputFactory
的创建的两种不同的StreamTaskInput
,也可以不用看
public class StreamTaskNetworkInputFactory {
public StreamTaskNetworkInputFactory() {
}
//这里只看返回StreamTaskNetworkInput
public static <T> StreamTaskInput<T> create(CheckpointedInputGate checkpointedInputGate, TypeSerializer<T> inputSerializer, IOManager ioManager, StatusWatermarkValve statusWatermarkValve, int inputIndex, InflightDataRescalingDescriptor rescalingDescriptorinflightDataRescalingDescriptor, Function<Integer, StreamPartitioner<?>> gatePartitioners, TaskInfo taskInfo) {
return (StreamTaskInput)(rescalingDescriptorinflightDataRescalingDescriptor.equals(InflightDataRescalingDescriptor.NO_RESCALE) ? new StreamTaskNetworkInput(checkpointedInputGate, inputSerializer, ioManager, statusWatermarkValve, inputIndex) : new RescalingStreamTaskNetworkInput(checkpointedInputGate, inputSerializer, ioManager, statusWatermarkValve, inputIndex, rescalingDescriptorinflightDataRescalingDescriptor, gatePartitioners, taskInfo));
}
}
StreamTaskNetworkInput
是 Flink
中用于从网络接收数据并将其传递给任务处理的基本组件。它实现了 StreamInput
接口,并负责从网络缓冲区中读取数据,将数据反序列化为 StreamRecord
,然后传递给下游的处理逻辑。
主要功能:
- 从网络接收数据:读取来自上游任务通过网络发送的数据。
- 数据反序列化:将接收到的字节数据反序列化为
StreamRecord
对象 - 调用下游处理逻辑:将反序列化后的
StreamRecord
对象传递给下游的处理逻辑(如操作符的processElement
方法)。
RescalingStreamTaskNetworkInput
是 StreamTaskNetworkInput
的一个扩展,用于处理任务重新缩放(rescaling
)场景下的数据接收。任务重新缩放是指在运行时动态调整任务并行度,以适应负载变化。RescalingStreamTaskNetworkInput
主要用于确保在重新缩放过程中数据能够正确地重新分配和处理。
主要功能:
- 处理重新缩放场景:在任务重新缩放期间,确保数据能够正确地重新分配和处理。
- 数据重分配逻辑:在接收数据时,可能需要根据新的并行度进行数据重分配,以确保数据能够被正确处理。
- 继承自
StreamTaskNetworkInput
:继承了StreamTaskNetworkInput
的基本功能,同时增加了处理重新缩放场景的逻辑。
这样初始化部分就完成了
2、StreamTask运行invoke调用的是StreamTask的processInput方法
通过上面第一章节介绍
StreamTask
的,知道StreamTask
的invoke
方法最终执行的是processInput
方法,因为OneInputStreamTask
不像SourceStreamTask
重写了processInput
方法,所以调用的还是父类StreamTask
的processInput
方法
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> implements TaskInvokable, CheckpointableTask, CoordinatedTask, AsyncExceptionHandler, ContainingTaskDetails {
protected void processInput(Controller controller) throws Exception {
DataInputStatus status = this.inputProcessor.processInput();
}
}
这时候this.inputProcessor=StreamOneInputProcessor
,调用processInput
即调用StreamOneInputProcessor
的processInput
方法
//从OneInputStreamTask初始化章节粘贴过来的,方便
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
private StreamTaskInput<IN> input;
private DataOutput<IN> output;
public StreamOneInputProcessor(StreamTaskInput<IN> input, DataOutput<IN> output, BoundedMultiInput endOfInputAware) {
//此input就是StreamTaskNetworkInput
this.input = (StreamTaskInput)Preconditions.checkNotNull(input);
//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象
this.output = (DataOutput)Preconditions.checkNotNull(output);
this.endOfInputAware = (BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);
}
public DataInputStatus processInput() throws Exception {
DataInputStatus status = this.input.emitNext(this.output);
//删除干扰代码
return status;
}
}
而StreamOneInputProcessor.processInput
中会调this.input.emitNext(this.output)
,因为构造StreamOneInputProcessor
对象时已经赋值
所以processInput
方法中DataInputStatus status = this.input.emitNext(this.output)
调用的是StreamTaskNetworkInput
的emitNext
方法;
public final class StreamTaskNetworkInput<T> extends AbstractStreamTaskNetworkInput<T, SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>> {
}
public abstract class AbstractStreamTaskNetworkInput<T, R extends RecordDeserializer<DeserializationDelegate<StreamElement>>> implements StreamTaskInput<T> {
//从缓冲区读取到当前内存中
private R currentRecordDeserializer = null;
public DataInputStatus emitNext(DataOutput<T> output) throws Exception {
while(true) {
//当前内存有缓冲区的数据
if (this.currentRecordDeserializer != null) {
DeserializationResult result;
try {
//从deserializationDelegate尝试获取下一个记录
result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
} catch (IOException var4) {
throw new IOException(String.format("Can't get next record for channel %s", this.lastChannel), var4);
}
if (result.isFullRecord()) {
//处理该记录并返回
this.processElement((StreamElement)this.deserializationDelegate.getInstance(), output);
return DataInputStatus.MORE_AVAILABLE;
}
}
//通过pollNext()方法从checkpointedInputGate中获取下一个元素,并将其封装在Optional中。
Optional<BufferOrEvent> bufferOrEvent = this.checkpointedInputGate.pollNext();
//然后检查bufferOrEvent是否存在
if (bufferOrEvent.isPresent()) {
//如果是缓冲区,则调用processBuffer方法进行处理
if (((BufferOrEvent)bufferOrEvent.get()).isBuffer()) {
this.processBuffer((BufferOrEvent)bufferOrEvent.get());
continue;
}
//如果是事件,则调用processEvent方法进行处理并返回结果
return this.processEvent((BufferOrEvent)bufferOrEvent.get());
}
}
}
}
最终调的是父类AbstractStreamTaskNetworkInput
的emitNext
方法
3、从缓冲区获取数据放入到内存中
通过上面
emitNext
实现,while
循环中先判断当前内存区是否有缓冲区的数据,有则处理结束此次emitNext
方法,如果没有则从缓冲区获取数据到当前内存区,再跳过本次循环,让下一个循环开始执行处理内存区数据的方法
this.checkpointedInputGate.pollNext()
这个就不看了,你就知道从缓冲区返回数据就行了,
看一下processBuffer
方法
protected void processBuffer(BufferOrEvent bufferOrEvent) throws IOException {
//获取缓存管道信息
this.lastChannel = bufferOrEvent.getChannelInfo();
Preconditions.checkState(this.lastChannel != null);
//可以理解为给currentRecordDeserializer初始化,选定类型
this.currentRecordDeserializer = this.getActiveSerializer(bufferOrEvent.getChannelInfo());
Preconditions.checkState(this.currentRecordDeserializer != null, "currentRecordDeserializer has already been released");
//把缓冲区的数据写入到当前内存区
this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
}
4、调用算子的processElement方法处理数据,
通过
StreamOneInputProcessor
初始化知道,入参output
实际上是OneInputStreamTask
里的私有类StreamTaskNetworkOutput
对象
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()) {
//这里就调用了OneInputStreamTask里的私有类StreamTaskNetworkOutput中的emitRecord方法
output.emitRecord(recordOrMark.asRecord());
}
}
private static class StreamTaskNetworkOutput<IN> implements DataOutput<IN> {
private final Input<IN> operator;
public void emitRecord(StreamRecord<IN> record) throws Exception {
//调用的算子的processElement方法
this.operator.processElement(record);
}
}
emitRecord
方法就会调用算子的processElement
方法,之后就可以看基础转换函数和窗口函数文章中,他们是被调用processElement
触发的
如果不清楚可以看Flink 1.14.*中flatMap,filter等基本转换函数源码
四、sink的streamTask用的也是OneInputStreamTask
sink
可以看成是一个像flatMap
,filter
、窗口一样的算子,通过OneInputStreamTask
触发到sinkFuncition
的processElement
方法,执行流程都是一样的,
不懂的可以看下面两篇文章,比对一下,sink
和基本转换、窗口算子触发方式是否一样
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码
五、OneInputStreamTask和SourceStreamTask类关系图
比对两个关系图,SourceStreamTask
多了SourceFunction
接口和streamSource
类