EN
/info/789357.html

Flink1.14.* 各种算子在StreamTask控制下如何调用的源码

2025-06-24 12:01:53 来源: 新华社
字号:默认超大|打印|

  • 前言:
  • 一、StreamTask执行算子的生命周期
  • 二、 Source的streamTask用的是SourceStreamTask
  • 三、基础转换操作,窗口用的是OneInputStreamTask
    • 1、初始化OneInputStreamTask
    • 2、StreamTask运行invoke调用的是StreamTask的processInput方法
    • 3、从缓冲区获取数据放入到内存中
    • 4、调用算子的processElement方法处理数据,
  • 四、sink的streamTask用的也是OneInputStreamTask
  • 五、OneInputStreamTask和SourceStreamTask类关系图

前言:

在 Apache Flink 中,StreamTask类是处理流数据的核心执行单元。
它负责管理算子的生命周期,并调用算子的处理方法。StreamTask 类的全路径(即完整的包名和类名)如下:
StreamTask类位于 flink-streaming-java模块中,具体的包结构为 org.apache.flink.streaming.runtime.tasks
全路径如下

flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java

一、StreamTask执行算子的生命周期

先看StreamTask大体执行流程(忽略实现类的细节)

publicabstractclassStreamTask<OUT,OP extendsStreamOperator<OUT>>implementsTaskInvokable,CheckpointableTask,CoordinatedTask,AsyncExceptionHandler,ContainingTaskDetails{protectedOPmainOperator;privatebooleanmailboxLoopRunning;//第一步构造函数,把processInput赋值给mailboxProcessorprotectedStreamTask(Environmentenvironment,@NullableTimerServicetimerService,UncaughtExceptionHandleruncaughtExceptionHandler,StreamTaskActionExecutoractionExecutor,TaskMailboxmailbox)throwsException{this.mailboxProcessor =newMailboxProcessor(this::processInput,mailbox,actionExecutor);//默认为truethis.mailboxLoopRunning =true;}//第三步StreamTask执行publicfinalvoidinvoke()throwsException{//省略代码this.runMailboxLoop();}//SourceStreamTask会重写这个方法,OneInputStreamTask不会重写protectedvoidprocessInput(Controllercontroller)throwsException{//删除干扰代码,}}publicclassMailboxProcessorimplementsCloseable{protectedfinalMailboxDefaultActionmailboxDefaultAction;//第二步 构造函数把processInput方法赋值给mailboxDefaultActionpublicMailboxProcessor(MailboxDefaultActionmailboxDefaultAction,TaskMailboxmailbox,StreamTaskActionExecutoractionExecutor){//这里mailboxDefaultAction传的是this::processInputthis.mailboxDefaultAction =(MailboxDefaultAction)Preconditions.checkNotNull(mailboxDefaultAction);}//第四步,publicvoidrunMailboxLoop()throwsException{//suspended默认是falsethis.suspended =!this.mailboxLoopRunning;//this.isNextLoopPossible默认是truewhile(this.isNextLoopPossible()){this.mailboxProcessor.runMailboxLoop();}}privatebooleanisNextLoopPossible(){return!this.suspended;}//第五步,调用processInputpublicvoidrunMailboxLoop()throwsException{//这个执行的是processInput方法this.mailboxDefaultAction.runDefaultAction(defaultActionContext);}}

不同的实现类都是按照上面的步骤初始化执行的

二、 Source的streamTask用的是SourceStreamTask

@InternalpublicclassSourceStreamTask<OUT,SRC extendsSourceFunction<OUT>,OP extendsStreamSource<OUT,SRC>>extendsStreamTask<OUT,OP>{privatefinalSourceStreamTask<OUT,SRC,OP>.LegacySourceFunctionThread sourceThread;protectedvoidinit(){//这个mainOperator是StreamTask的字段,,SourceFunction<?>source =(SourceFunction)((StreamSource)this.mainOperator).getUserFunction();}protectedvoidprocessInput(Controllercontroller)throwsException{//这里启动线程的run方法this.sourceThread.start();}privateclassLegacySourceFunctionThreadextendsThread{privatefinalCompletableFuture<Void>completionFuture =newCompletableFuture();LegacySourceFunctionThread(){}publicvoidrun(){try{if(!SourceStreamTask.this.operatorChain.isTaskDeployedAsFinished()){StreamTask.LOG.debug("Legacy source {} skip execution since the task is finished on restore",SourceStreamTask.this.getTaskNameWithSubtaskAndId());((StreamSource)SourceStreamTask.this.mainOperator).run(SourceStreamTask.this.lock,SourceStreamTask.this.operatorChain);}//删除干扰代码}catch(Throwablevar2){//删除干扰代码}}}}

第一点需要注意的是由于SourceStreamTask重写了streamTaskprocessInput方法,所以streamTaskinvoke方法执行的是子类的SourceStreamTaskprocessInput方法

第二点看一下init方法,这里(SourceFunction)((StreamSource)this.mainOperator).getUserFunction()就是获取的source算子,不清楚的可以看一下kafkaSource这篇文章Flink 1.14.*版本kafkaSource源码
由这里来触发SourceFunctionrun方法,即FlinkKafkaConsumerBaserun方法

三、基础转换操作,窗口用的是OneInputStreamTask

这种一般都是中间算子,或者最后一个算子(例如kafkaSink),所以主要涉及到从输入源获取数据,处理数据,并将结果写入输出中
如果连着看下面两篇文章,你就会知道为什么sink也是用的OneInputStreamTask
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码

1、初始化OneInputStreamTask

publicclassOneInputStreamTask<IN,OUT>extendsStreamTask<OUT,OneInputStreamOperator<IN,OUT>>{publicvoidinit()throwsException{//output是私有类StreamTaskNetworkOutput对象DataOutput<IN>output =this.createDataOutput(numRecordsIn);StreamTaskInput<IN>input =this.createTaskInput(inputGate);//这个inputProcessor字段是给父类StreamTask初始化的,这时候父类inputProcessor=StreamOneInputProcessorthis.inputProcessor =newStreamOneInputProcessor(input,output,this.operatorChain);}privateStreamTaskInput<IN>createTaskInput(CheckpointedInputGateinputGate){intnumberOfInputChannels =inputGate.getNumberOfInputChannels();StatusWatermarkValvestatusWatermarkValve =newStatusWatermarkValve(numberOfInputChannels);TypeSerializer<IN>inSerializer =this.configuration.getTypeSerializerIn1(this.getUserCodeClassLoader());returnStreamTaskNetworkInputFactory.create(inputGate,inSerializer,this.getEnvironment().getIOManager(),statusWatermarkValve,0,this.getEnvironment().getTaskStateManager().getInputRescalingDescriptor(),(gateIndex)->{return((StreamEdge)this.configuration.getInPhysicalEdges(this.getUserCodeClassLoader()).get(gateIndex)).getPartitioner();},this.getEnvironment().getTaskInfo());}//返回的是下面私有类StreamTaskNetworkOutput对象privateDataOutput<IN>createDataOutput(CounternumRecordsIn){returnnewOneInputStreamTask.StreamTaskNetworkOutput(this.operatorChain.getFinishedOnRestoreInputOrDefault((Input)this.mainOperator),this.inputWatermarkGauge,numRecordsIn);}//私有内部类,对应上面init中的outputprivatestaticclassStreamTaskNetworkOutput<IN>implementsDataOutput<IN>{privatefinalInput<IN>operator;publicvoidemitRecord(StreamRecord<IN>record)throwsException{//调用的算子的processElement方法this.operator.processElement(record);}}}

这里是调用init初始化,StreamOneInputProcessor一起初始化了

publicfinalclassStreamOneInputProcessor<IN>implementsStreamInputProcessor{privateStreamTaskInput<IN>input;privateDataOutput<IN>output;publicStreamOneInputProcessor(StreamTaskInput<IN>input,DataOutput<IN>output,BoundedMultiInputendOfInputAware){//此input就是StreamTaskNetworkInputthis.input =(StreamTaskInput)Preconditions.checkNotNull(input);//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象this.output =(DataOutput)Preconditions.checkNotNull(output);this.endOfInputAware =(BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);}publicDataInputStatusprocessInput()throwsException{DataInputStatusstatus =this.input.emitNext(this.output);//删除干扰代码returnstatus;}}

后面看到this.inputProcessor.processInput其实就是调用的上面类的processInput方法

下面简单介绍一下StreamTaskNetworkInputFactory的创建的两种不同的StreamTaskInput,也可以不用看

publicclassStreamTaskNetworkInputFactory{publicStreamTaskNetworkInputFactory(){}//这里只看返回StreamTaskNetworkInputpublicstatic<T>StreamTaskInput<T>create(CheckpointedInputGatecheckpointedInputGate,TypeSerializer<T>inputSerializer,IOManagerioManager,StatusWatermarkValvestatusWatermarkValve,intinputIndex,InflightDataRescalingDescriptorrescalingDescriptorinflightDataRescalingDescriptor,Function<Integer,StreamPartitioner<?>>gatePartitioners,TaskInfotaskInfo){return(StreamTaskInput)(rescalingDescriptorinflightDataRescalingDescriptor.equals(InflightDataRescalingDescriptor.NO_RESCALE)?newStreamTaskNetworkInput(checkpointedInputGate,inputSerializer,ioManager,statusWatermarkValve,inputIndex):newRescalingStreamTaskNetworkInput(checkpointedInputGate,inputSerializer,ioManager,statusWatermarkValve,inputIndex,rescalingDescriptorinflightDataRescalingDescriptor,gatePartitioners,taskInfo));}}

StreamTaskNetworkInputFlink中用于从网络接收数据并将其传递给任务处理的基本组件。它实现了 StreamInput接口,并负责从网络缓冲区中读取数据,将数据反序列化为 StreamRecord,然后传递给下游的处理逻辑。
主要功能:

  1. 从网络接收数据:读取来自上游任务通过网络发送的数据。
  2. 数据反序列化:将接收到的字节数据反序列化为 StreamRecord对象
  3. 调用下游处理逻辑:将反序列化后的 StreamRecord对象传递给下游的处理逻辑(如操作符的 processElement方法)。

RescalingStreamTaskNetworkInputStreamTaskNetworkInput的一个扩展,用于处理任务重新缩放(rescaling)场景下的数据接收。任务重新缩放是指在运行时动态调整任务并行度,以适应负载变化。RescalingStreamTaskNetworkInput主要用于确保在重新缩放过程中数据能够正确地重新分配和处理。
主要功能:

  1. 处理重新缩放场景:在任务重新缩放期间,确保数据能够正确地重新分配和处理。
  2. 数据重分配逻辑:在接收数据时,可能需要根据新的并行度进行数据重分配,以确保数据能够被正确处理。
  3. 继承自 StreamTaskNetworkInput:继承了 StreamTaskNetworkInput的基本功能,同时增加了处理重新缩放场景的逻辑。

这样初始化部分就完成了

2、StreamTask运行invoke调用的是StreamTask的processInput方法

通过上面第一章节介绍StreamTask的,知道StreamTaskinvoke方法最终执行的是processInput方法,因为OneInputStreamTask不像SourceStreamTask重写了processInput方法,所以调用的还是父类StreamTaskprocessInput方法

publicabstractclassStreamTask<OUT,OP extendsStreamOperator<OUT>>implementsTaskInvokable,CheckpointableTask,CoordinatedTask,AsyncExceptionHandler,ContainingTaskDetails{protectedvoidprocessInput(Controllercontroller)throwsException{DataInputStatusstatus =this.inputProcessor.processInput();}}

这时候this.inputProcessor=StreamOneInputProcessor,调用processInput即调用StreamOneInputProcessorprocessInput方法

//从OneInputStreamTask初始化章节粘贴过来的,方便publicfinalclassStreamOneInputProcessor<IN>implementsStreamInputProcessor{privateStreamTaskInput<IN>input;privateDataOutput<IN>output;publicStreamOneInputProcessor(StreamTaskInput<IN>input,DataOutput<IN>output,BoundedMultiInputendOfInputAware){//此input就是StreamTaskNetworkInputthis.input =(StreamTaskInput)Preconditions.checkNotNull(input);//此output就是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象this.output =(DataOutput)Preconditions.checkNotNull(output);this.endOfInputAware =(BoundedMultiInput)Preconditions.checkNotNull(endOfInputAware);}publicDataInputStatusprocessInput()throwsException{DataInputStatusstatus =this.input.emitNext(this.output);//删除干扰代码returnstatus;}}

StreamOneInputProcessor.processInput中会调this.input.emitNext(this.output),因为构造StreamOneInputProcessor对象时已经赋值

所以processInput方法中DataInputStatus status = this.input.emitNext(this.output)调用的是StreamTaskNetworkInputemitNext方法;

publicfinalclassStreamTaskNetworkInput<T>extendsAbstractStreamTaskNetworkInput<T,SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>>{}
publicabstractclassAbstractStreamTaskNetworkInput<T,RextendsRecordDeserializer<DeserializationDelegate<StreamElement>>>implementsStreamTaskInput<T>{//从缓冲区读取到当前内存中privateRcurrentRecordDeserializer =null;publicDataInputStatusemitNext(DataOutput<T>output)throwsException{while(true){//当前内存有缓冲区的数据if(this.currentRecordDeserializer !=null){DeserializationResultresult;try{//从deserializationDelegate尝试获取下一个记录result =this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);}catch(IOExceptionvar4){thrownewIOException(String.format("Can't get next record for channel %s",this.lastChannel),var4);}if(result.isFullRecord()){//处理该记录并返回this.processElement((StreamElement)this.deserializationDelegate.getInstance(),output);returnDataInputStatus.MORE_AVAILABLE;}}//通过pollNext()方法从checkpointedInputGate中获取下一个元素,并将其封装在Optional中。Optional<BufferOrEvent>bufferOrEvent =this.checkpointedInputGate.pollNext();//然后检查bufferOrEvent是否存在if(bufferOrEvent.isPresent()){//如果是缓冲区,则调用processBuffer方法进行处理if(((BufferOrEvent)bufferOrEvent.get()).isBuffer()){this.processBuffer((BufferOrEvent)bufferOrEvent.get());continue;}//如果是事件,则调用processEvent方法进行处理并返回结果returnthis.processEvent((BufferOrEvent)bufferOrEvent.get());}}}}

最终调的是父类AbstractStreamTaskNetworkInputemitNext方法

3、从缓冲区获取数据放入到内存中

通过上面emitNext实现,while循环中先判断当前内存区是否有缓冲区的数据,有则处理结束此次emitNext方法,如果没有则从缓冲区获取数据到当前内存区,再跳过本次循环,让下一个循环开始执行处理内存区数据的方法
this.checkpointedInputGate.pollNext()这个就不看了,你就知道从缓冲区返回数据就行了,

看一下processBuffer方法

protectedvoidprocessBuffer(BufferOrEventbufferOrEvent)throwsIOException{//获取缓存管道信息this.lastChannel =bufferOrEvent.getChannelInfo();Preconditions.checkState(this.lastChannel !=null);//可以理解为给currentRecordDeserializer初始化,选定类型this.currentRecordDeserializer =this.getActiveSerializer(bufferOrEvent.getChannelInfo());Preconditions.checkState(this.currentRecordDeserializer !=null,"currentRecordDeserializer has already been released");//把缓冲区的数据写入到当前内存区this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());}

4、调用算子的processElement方法处理数据,

通过StreamOneInputProcessor初始化知道,入参output实际上是OneInputStreamTask里的私有类StreamTaskNetworkOutput对象

privatevoidprocessElement(StreamElementrecordOrMark,DataOutput<T>output)throwsException{if(recordOrMark.isRecord()){//这里就调用了OneInputStreamTask里的私有类StreamTaskNetworkOutput中的emitRecord方法output.emitRecord(recordOrMark.asRecord());}}
privatestaticclassStreamTaskNetworkOutput<IN>implementsDataOutput<IN>{privatefinalInput<IN>operator;publicvoidemitRecord(StreamRecord<IN>record)throwsException{//调用的算子的processElement方法this.operator.processElement(record);}}

emitRecord方法就会调用算子的processElement方法,之后就可以看基础转换函数和窗口函数文章中,他们是被调用processElement触发的
如果不清楚可以看Flink 1.14.*中flatMap,filter等基本转换函数源码

四、sink的streamTask用的也是OneInputStreamTask

sink可以看成是一个像flatMapfilter、窗口一样的算子,通过OneInputStreamTask触发到sinkFuncitionprocessElement方法,执行流程都是一样的,

不懂的可以看下面两篇文章,比对一下,sink和基本转换、窗口算子触发方式是否一样
Flink 1.14.*中flatMap,filter等基本转换函数源码
Flink 1.14.* 版本kafkaSink源码

五、OneInputStreamTask和SourceStreamTask类关系图

在这里插入图片描述

在这里插入图片描述

比对两个关系图,SourceStreamTask多了SourceFunction接口和streamSource

【我要纠错】责任编辑:新华社