本文共 32077 字,大约阅读时间需要 106 分钟。
异步屏障快照是一种轻量级的快照技术,能以低成本备份 DAG(有向无环图)或 DCG(有向有环图)计算作业的状态,这使得计算作业可以频繁进行快照并且不会对性能产生明显影响。异步屏障快照核心思想是通过屏障消息(barrier)来标记触发快照的时间点和对应的数据,从而将数据流和快照时间解耦以实现异步快照操作,同时也大大降低了对管道数据的依赖(对 DAG 类作业甚至完全不依赖),减小了随之而来的快照大小。下面将逐步分析异步屏障快照的实现原理及其源码实现。
Flink的Barrier是一种特殊的内部消息,用于将数据流从时间上切分为多个窗口,每个窗口对应一系列连续的快照中的一个。
Barrier由SourceStreamTask发起并向下游广播,下游算子当收到所有inputChannel的同一snapshot编号的barrier时向下游广播barrier同时自己做snapshot,下游算子重复此过程,最后barrier会流至作业的sink,sink接收到所有barrier以后向JobManager返回ack,至此整个快照过程结束。
我们通过下面这幅图来描述一下checkpoint快照过程的各个阶段:
a) 首先barrier会由source第一次插入到数据流中,黑色竖线代表Snapshot barrier,它将数据流切分在两个checkpoint中,同时barrier随着数据流一起向下流动。
b) count-1上游依赖了两个channel并且已经收到两个barrier,此时count-1首先向它下游的print-1广播barrier(黑色竖线),同时自己做snapshot。由于count-2此时接收到一个barrier,如果是exactly-once那么这个已经接到barrier的channel后面的数据会暂时被缓存起来,等到另一个barrier到达后一起计算这就是数据流的对齐。
c) 此时count-2收到了另一个src-2发来的barrier,于是count-2和图b的count-1做同样的动作,先向数据流插入barrier然后做自己的state snapshot。
d) barrier传递到print后向JobManager返回ack,告知本次snapshot完成,JobManager将其从PendingCheckpoint转成CompeletedCheckpoint。
public void enableCheckpointing( ......省略部分代码 // 构造checkpointCoordinator对象 checkpointCoordinator = new CheckpointCoordinator( jobInformation.getJobId(), interval, checkpointTimeout, minPauseBetweenCheckpoints, maxConcurrentCheckpoints, externalizeSettings, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, checkpointIDCounter, checkpointStore, checkpointDir, ioExecutor);// register the master hooks on the checkpoint coordinator for (MasterTriggerRestoreHook hook : masterHooks) { if (!checkpointCoordinator.addMasterHook(hook)) { LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); } } checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); // interval of max long value indicates disable periodic checkpoint, // the CheckpointActivatorDeactivator should be created only if the interval is not max value if (interval != Long.MAX_VALUE) { //注册状态变化监听器 registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); }}这个状态变化监听器就是由checkpointCoordinator.createActivatorDeactivator返回的
// ------------------------------------------------------------------------// job status listener that schedules / cancels periodic checkpoints// ------------------------------------------------------------------------public JobStatusListener createActivatorDeactivator() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } if (jobStatusListener == null) { jobStatusListener = new CheckpointCoordinatorDeActivator(this); } return jobStatusListener; }}实际上里面就是new了一个CheckpointCoordinatorDeActivator它实现了JobStatusListener接口,当job状态发生变化时作出响应。CheckpointCoordinatorDeActivator主要监听两种类型的Job状态变化:
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */public class CheckpointCoordinatorDeActivator implements JobStatusListener { private final CheckpointCoordinator coordinator; public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) { this.coordinator = checkNotNull(coordinator); } @Override public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { if (newJobStatus == JobStatus.RUNNING) { // start the checkpoint scheduler coordinator.startCheckpointScheduler(); } else { // anything else should stop the trigger for now coordinator.stopCheckpointScheduler(); } }}至此,JobStatusListener就注册完成了,接下来这个监听器时刻监听Job的变化, 在Job的状态由CREATE变为RUNNING时开启Checkpoint调度器,那什么时候Job的状态会由CREATE变成RUNNING呢?下面给出对应的两个场景:
// --------------------------------------------------------------------------------------------// Periodic scheduling of checkpoints// --------------------------------------------------------------------------------------------public void startCheckpointScheduler() { synchronized (lock) { if (shutdown) { throw new IllegalArgumentException("Checkpoint coordinator is shut down"); } // make sure all prior timers are cancelled stopCheckpointScheduler(); periodicScheduling = true; currentPeriodicTrigger = timer.scheduleAtFixedRate( new ScheduledTrigger(), baseInterval, baseInterval, TimeUnit.MILLISECONDS); }}从上面的代码可以看到,里面主要做了两件事,首先确保之前的checkpoint调度器已经停止,然后将ScheduledTrigger扔到定期执行的线程池中做定时调度。这也就解释了为什么我们在设置checkpoint enable的时候需要指定一个intervalTime,flink就可以帮我们按照这个时间间隔做定时checkpoint snapshot。
private final class ScheduledTrigger implements Runnable { @Override public void run() { try { triggerCheckpoint(System.currentTimeMillis(), true); } catch (Exception e) { LOG.error("Exception while triggering checkpoint.", e); } }}
CheckpointCoordinator.triggerCheckpoint方法代码有点长,大概介绍一下里面都做了什么以及里面的核心代码段:
a)检查符合触发checkpoint的条件,例如如果禁止了周期性的checkpoint,尚未达到触发checkpoint的最小间隔等等,就直接return;
b)检查是否所有需要checkpoint和需要响应checkpoint的ACK(ack涉及到checkpoint的两阶段提交,后面会讲)的task都处于running状态,否则return;
c)如果都符合,那么执行checkpointID = checkpointIdCounter.getAndIncrement();以生成一个新的id,然后生成一个PendingCheckpoint。PendingCheckpoint是一个启动了的checkpoint,但是还没有被确认。等到所有的task都确认了本次checkpoint完成,那么这个checkpoint对象将转化为一个CompletedCheckpoint,这也就是一个checkpoint snapshot的整个生命周期;
d)定义一个超时callback,如果checkpoint执行了很久还没完成,就把它取消;
e)触发MasterHooks,用户可以定义一些额外的操作,用以增强checkpoint的功能(如准备和清理外部资源);
f)接下来是核心逻辑:
// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) { execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}每个Execution通过akka消息通知taskManager来triggerCheckpoint
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) { final SimpleSlot slot = assignedResource; if (slot != null) { final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); //通过akka消息通知taskManager做checkpoint taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions); } else { LOG.debug("The execution has no slot assigned. This indicates that the execution is " + "no longer running."); }}至此,触发checkpoint的工作就交到了TaskManager的手里,由TaskManager完成实际的快照工作。
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = { actorMessage match { case message: TriggerCheckpoint => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp val checkpointOptions = message.getCheckpointOptions log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.") //从运行的task列表中获取task val task = runningTasks.get(taskExecutionId) if (task != null) { //触发task checkpoint task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions) } else { log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.") } case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { task.notifyCheckpointComplete(checkpointId) } else { log.debug( s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.") } // unknown checkpoint message case _ => unhandled(actorMessage) } }
TaskManager将调用Task类的triggerCheckpointBarrier方法,其核心逻辑就是在一个单线程的线程池异步调用StatefulTask类型任务的triggerCheckpoint方法,其核心实现都在StreamTask中。
public void triggerCheckpointBarrier( final long checkpointID, long checkpointTimestamp, final CheckpointOptions checkpointOptions) { final AbstractInvokable invokable = this.invokable; final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp); if (executionState == ExecutionState.RUNNING && invokable != null) { if (invokable instanceof StatefulTask) { // build a local closure final StatefulTask statefulTask = (StatefulTask) invokable; final String taskName = taskNameWithSubtask; final SafetyNetCloseableRegistry safetyNetCloseableRegistry = FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread(); Runnable runnable = new Runnable() { @Override public void run() { // set safety net from the task's context for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); try { // StatefulTask是一个接口,实际调用StreamTask的triggerCheckpoint boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions); if (!success) { checkpointResponder.declineCheckpoint( getJobID(), getExecutionId(), checkpointID, new CheckpointDeclineTaskNotReadyException(taskName)); } } catch (Throwable t) { if (getExecutionState() == ExecutionState.RUNNING) { failExternally(new Exception( "Error while triggering checkpoint " + checkpointID + " for " + taskNameWithSubtask, t)); } else { LOG.debug("Encountered error while triggering checkpoint {} for " + "{} ({}) while being not in state running.", checkpointID, taskNameWithSubtask, executionId, t); } } finally { FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; // 扔到SingleThreadExecutor中异步执行 executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId)); } ......省略部分代码 }
注意:以下部分只有Source才会直接触发checkpoint,而中间算子或sink都是由barrier触发checkpoint快照的。
StreamTask的triggerCheckpoint会调用其performCheckpoint方法,我们来看下这个方法的两个核心逻辑:
1)向所有下游算子广播barrier;
2)触发本task的state保存;
注意,在触发本task的state保存之前需要广播barrier给下游的算子,下游算子在接收到所有barrier以后也是先广播barrier再做自己的checkpoint,如此循环。也正是在这里整个执行链路上开始出现Barrier(由source第一次发起)。
synchronized (lock) { if (isRunning) { // we can do a checkpoint // Since both state checkpointing and downstream barrier emission occurs in this // lock scope, they are an atomic operation regardless of the order in which they occur. // Given this, we immediately emit the checkpoint barriers, so the downstream operators // can start their checkpoint work as soon as possible // 向所有下游算子广播barrier operatorChain.broadcastCheckpointBarrier( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); // 触发本task的state保存 checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics); return true; } }StreamTask在其内部继续调用其executeCheckpointing方法,在这个方法里面会对所有StreamOperator做checkpoint
public void executeCheckpointing() throws Exception { startSyncPartNano = System.nanoTime(); boolean failed = true; try { for (StreamOperator op : allOperators) { //遍历所有streamOperator做checkpoint checkpointStreamOperator(op); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // 给JobManager 发送ack runAsyncCheckpointingAndAcknowledge(); failed = false; ......省略部分代码 }
StreamTask.checkpointStreamOperator里面会实际将snapshot持久化至filesystem中
private void checkpointStreamOperator(StreamOperator op) throws Exception { if (null != op) { // 这里实际处理的时候只有当user function实现了checkpointed接口时才会做snapshot // 而checkpointed接口目前已经废弃并被CheckpointedFunction接口替代,所以这段代码内部基本不会走到 nonPartitionedStates.add(op.snapshotLegacyOperatorState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions)); // 实际写filesystem的底层逻辑都在这里面(对于CheckpointedFunction的snapshot在这里面做) OperatorSnapshotResult snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); snapshotInProgressList.add(snapshotInProgress); } else { nonPartitionedStates.add(null); OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult(); snapshotInProgressList.add(emptySnapshotInProgress); } }
接下来我们看下AbstractStreamOperator的snapshotState,所有类型的snapshot都会在这里面进行:
1)对rich function做state snapshot;
2)对operator做 state snapshot;
3)对keyed state做snapshot;
注意:在这几种类型的state snapshot存储过程中如果出现异常,会继续往上抛,taskManager发现快照失败会取消所有task任务并启动job重启策略。
KeyGroupRange keyGroupRange = null != keyedStateBackend ? keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE; OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult(); CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl( checkpointId, timestamp, factory, keyGroupRange, getContainingTask().getCancelables())) { // 为rich function做state snapshot snapshotState(snapshotContext); snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture()); snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture()); if (null != operatorStateBackend) { snapshotInProgress.setOperatorStateManagedFuture( // 对operator做 state snapshot operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } if (null != keyedStateBackend) { snapshotInProgress.setKeyedStateManagedFuture( // 对keyed state做snapshot keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions)); } } catch (Exception snapshotException) { try { snapshotInProgress.cancel(); } catch (Exception e) { snapshotException.addSuppressed(e); } // 在上面做snapshot的过程中如果出现异常,会继续往上抛,taskManager接到异常会cancel所有任务 // 同时重启整个job throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + getOperatorName() + '.', snapshotException); }
下面我们看下Barrier这块的实现
看Barrier这块的源码建议先了解一下inputGate,详细可参考这篇博文http://www.cnblogs.com/fxjwind/p/7641347.html, barrier第一次由source发起,下游算子捕获barrier的过程其实就是处理input数据的过程,对应
着StreamInputProcessor.processInput()
方法,我们看下其核心逻辑
// 每个元素都会触发这一段逻辑,如果下一个数据是buffer则从外围的while循环中进入处理用户数据的逻辑,这个方法里默默的处理了barrier的逻辑final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked(); if (bufferOrEvent != null) { if (bufferOrEvent.isBuffer()) { currentChannel = bufferOrEvent.getChannelIndex(); currentRecordDeserializer = recordDeserializers[currentChannel]; currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer()); } else { // Event received final AbstractEvent event = bufferOrEvent.getEvent(); if (event.getClass() != EndOfPartitionEvent.class) { throw new IOException("Unexpected event: " + event); } } } else { isFinished = true; if (!barrierHandler.isEmpty()) { throw new IllegalStateException("Trailing data in checkpoint barrier handler."); } return false; } }
BarrierBuffer实现了CheckpointBarrierHandler接口,里面有两个最核心的方法getNextNonBlocked和processBarrier
public BufferOrEvent getNextNonBlocked() throws Exception { while (true) { // process buffered BufferOrEvents before grabbing new ones BufferOrEvent next; if (currentBuffered == null) { // 直接从inputGate中获取BufferOrEvent next = inputGate.getNextBufferOrEvent(); } else { // 如果currentBuffered中有BufferOrEvent则直接取 next = currentBuffered.getNext(); if (next == null) { completeBufferedSequence(); return getNextNonBlocked(); } } if (next != null) { if (isBlocked(next.getChannelIndex())) {// 如果barrier已经到达过并且还有其他barrier未到达,该channel的block开关会打开 // if the channel is blocked we, we just store the BufferOrEvent // block开关被开启,直接缓存BufferOrEvent(如果进到这个分支,说明当前channel属于本次checkpoint编号的数据已经处理完,所以把下个checkpoint的数据先缓存起来) bufferSpiller.add(next); checkSizeLimit(); } else if (next.isBuffer()) { //如果没有被block,就处理该条数据(如果进到这个分支说明当前channel里的数据属于本次snapshot的数据) return next; } else if (next.getEvent().getClass() == CheckpointBarrier.class) { // channel的block开关关闭状态并且event是CheckpointBarrier if (!endOfStream) { // process barriers only if there is a chance of the checkpoint completing // 这个方法里面是处理barrier的核心逻辑(本checkpoint编号的channel第一次收到barrier) processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex()); } } else if (next.getEvent().getClass() == CancelCheckpointMarker.class) { processCancellationBarrier((CancelCheckpointMarker) next.getEvent()); } else { if (next.getEvent().getClass() == EndOfPartitionEvent.class) { processEndOfPartition(); } return next; } } else if (!endOfStream) { // end of input stream. stream continues with the buffered data endOfStream = true; // 释放资源,关闭channel block开关 releaseBlocksAndResetBarriers(); return getNextNonBlocked(); } else { // final end of both input and buffered data return null; } } }从上面的代码可以看到下游算子在收到barrier之前和之后对数据流的处理是如何实现的。
下面我们再看下processBarrier
private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { final long barrierId = receivedBarrier.getId(); // fast path for single channel cases if (totalNumberOfInputChannels == 1) { if (barrierId > currentCheckpointId) { // new checkpoint currentCheckpointId = barrierId; notifyCheckpoint(receivedBarrier); } return; } // -- general code path for multiple input channels -- if (numBarriersReceived > 0) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { // regular case: onBarrier里面主要是打开当前channel block的开关,后面的buffer直接缓存起来 onBarrier(channelIndex); } else if (barrierId > currentCheckpointId) { // 如果进到这个分支说明当前checkpoint已经过期,下面的过程主要是释放资源,开启新的checkpoint // we did not complete the current checkpoint, another started before LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", barrierId, currentCheckpointId); // let the task know we are not completing this notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId)); // abort the current checkpoint releaseBlocksAndResetBarriers(); // begin a the new checkpoint beginNewAlignment(barrierId, channelIndex); } else { // ignore trailing barrier from an earlier checkpoint (obsolete now) return; } } else if (barrierId > currentCheckpointId) { // first barrier of a new checkpoint beginNewAlignment(barrierId, channelIndex); } else { // either the current checkpoint was canceled (numBarriers == 0) or // this barrier is from an old subsumed checkpoint return; } // check if we have all barriers - since canceled checkpoints always have zero barriers // this can only happen on a non canceled checkpoint if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) { // 如果满足上面的判断条件说明所有channel的barrier全部到达,真正开始出发checkpoint if (LOG.isDebugEnabled()) { LOG.debug("Received all barriers, triggering checkpoint {} at {}", receivedBarrier.getId(), receivedBarrier.getTimestamp()); } releaseBlocksAndResetBarriers(); // 通知trigger checkpoint notifyCheckpoint(receivedBarrier); } }通知trigger checkpoint:BarrierBuffer的notifyCheckpoint里面就是调用StreamTask的triggerCheckpointOnBarrier
private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception { if (toNotifyOnCheckpoint != null) { CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp()); long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L; CheckpointMetrics checkpointMetrics = new CheckpointMetrics() .setBytesBufferedInAlignment(bytesBuffered) .setAlignmentDurationNanos(latestAlignmentDurationNanos); // 触发本task的checkpoint动作,后面重复source做checkpoint流程 toNotifyOnCheckpoint.triggerCheckpointOnBarrier( checkpointMetaData, checkpointBarrier.getCheckpointOptions(), checkpointMetrics); } }再看下StreamTask的triggerCheckpointOnBarrier直接调用performCheckpoint方法,到这里就和上面讲的source做触发checkpoint的逻辑一模一样了。
public void triggerCheckpointOnBarrier( CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception { try { performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics); } catch (CancelTaskException e) { LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), checkpointMetaData.getCheckpointId()); throw e; } catch (Exception e) { throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e); } }
Flink在提交job和重启job的时候都会从最后一次completed的checkpoint开始恢复,这个逻辑也是在CheckpointCoordinator类中下面我们来看下这个恢复状态的方法
public boolean restoreLatestCheckpointedState( Maptasks, boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { ...... // Recover the checkpoints completedCheckpointStore.recover(sharedStateRegistry); // 获取最后一次completed的checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); ...... LOG.info("Restoring from latest valid checkpoint: {}.", latest); // re-assign the task states final Map operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState); // 恢复状态 stateAssignmentOperation.assignStates(); ...... return true; }}
StateAssignmentOperation.assignStates
public boolean assignStates() throws Exception { MapStateAssignmentOperation.assignAttemptState做实际的状态初始化localOperators = new HashMap<>(operatorStates); // 获取所有tasks Map localTasks = this.tasks; checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks); for (Map.Entry task : localTasks.entrySet()) { final ExecutionJobVertex executionJobVertex = task.getValue(); // find the states of all operators belonging to this task List operatorIDs = executionJobVertex.getOperatorIDs(); List altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs(); List operatorStates = new ArrayList<>(); boolean statelessTask = true; for (int x = 0; x < operatorIDs.size(); x++) { OperatorID operatorID = altOperatorIDs.get(x) == null ? operatorIDs.get(x) : altOperatorIDs.get(x); // 从map中删除并返回operateState OperatorState operatorState = localOperators.remove(operatorID); if (operatorState == null) { operatorState = new OperatorState( operatorID, executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism()); } else { statelessTask = false; } // add到operatorStates集合中 operatorStates.add(operatorState); } if (statelessTask) { // skip tasks where no operator has any state continue; } // 对每个task做状态初始化 assignAttemptState(task.getValue(), operatorStates); } return true; }
......private void assignAttemptState(ExecutionJobVertex executionJobVertex, ListoperatorStates) { // check if a stateless task if (!allElementsAreNull(subNonPartitionableState) || !allElementsAreNull(subManagedOperatorState) || !allElementsAreNull(subRawOperatorState) || subKeyedState != null) { // 构造taskStateHandles TaskStateHandles taskStateHandles = new TaskStateHandles( new ChainedStateHandle<>(subNonPartitionableState), subManagedOperatorState, subRawOperatorState, subKeyedState != null ? subKeyedState.f0 : null, subKeyedState != null ? subKeyedState.f1 : null); // 在当前execution中初始化state currentExecutionAttempt.setInitialState(taskStateHandles);}
Execution的setInitialState方法就是将checkpointStateHandles赋值给taskState
public void setInitialState(TaskStateHandles checkpointStateHandles) { checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED"); this.taskState = checkpointStateHandles; }recovery只是把从zk中获取的checkpoint中的状态赋值给operatorState
然后deployToSlot会把初始state,提交给taskManager
public void deployToSlot(final SimpleSlot slot) throws JobException {......// 将taskState封装到deployment中final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor( attemptId, slot, taskState, attemptNumber); final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); // 将deployment提交给taskmanager final FuturesubmitResultFuture = taskManagerGateway.submitTask(deployment, timeout); submitResultFuture.exceptionallyAsync(new ApplyFunction () { @Override public Void apply(Throwable failure) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a timeout of " + timeout, failure)); } else { markFailed(failure); } return null; } }, executor); }}
// the very last thing before the actual execution starts running is to inject// the state into the task. the state is non-empty if this is an execution// of a task that failed but had backuped state from a checkpointif (null != taskStateHandles) { if (invokable instanceof StatefulTask) { StatefulTask op = (StatefulTask) invokable; op.setInitialState(taskStateHandles);//注意这里会给StreamTask的restoreStateHandles赋值 } else { throw new IllegalStateException("Found operator state for a non-stateful task invokable"); } // be memory and GC friendly - since the code stays in invoke() for a potentially long time, // we clear the reference to the state handle //noinspection UnusedAssignment taskStateHandles = null;}Task类中的下面行代码会通过反射调用StreamTask的invoke方法
// run the invokableinvokable.invoke();在这个方法里面我们只截取跟restore states相关的一小段代码
synchronized (lock) { // both the following operations are protected by the lock // so that we avoid race conditions in the case that initializeState() // registers a timer, that fires before the open() is called. //这里会对状态进行初始化 initializeState(); openAllOperators();}接着我们看下StreamTask的initializeState方法里面都做了些什么
private void initializeState() throws Exception { boolean restored = null != restoreStateHandles; if (restored) { // restart并且有需要恢复的状态时会走到这个分支 checkRestorePreconditions(operatorChain.getChainLength()); // 初始化operators states initializeOperators(true); restoreStateHandles = null; // free for GC } else { initializeOperators(false); }}接下来就是实际初始化恢复keyedStates和operatorStates的过程
public final void initializeState(OperatorStateHandles stateHandles) throws Exception { CollectionkeyedStateHandlesRaw = null; Collection operatorStateHandlesRaw = null; Collection operatorStateHandlesBackend = null; boolean restoring = null != stateHandles; initKeyedState(); //TODO we should move the actual initialization of this from StreamTask to this class if (getKeyedStateBackend() != null && timeServiceManager == null) { timeServiceManager = new InternalTimeServiceManager<>( getKeyedStateBackend().getNumberOfKeyGroups(), getKeyedStateBackend().getKeyGroupRange(), this, getRuntimeContext().getProcessingTimeService()); } if (restoring) { //pass directly operatorStateHandlesBackend = stateHandles.getManagedOperatorState(); operatorStateHandlesRaw = stateHandles.getRawOperatorState(); if (null != getKeyedStateBackend()) { //only use the keyed state if it is meant for us (aka head operator) keyedStateHandlesRaw = stateHandles.getRawKeyedState(); } } checkpointStreamFactory = container.createCheckpointStreamFactory(this); initOperatorState(operatorStateHandlesBackend); StateInitializationContext initializationContext = new StateInitializationContextImpl( restoring, // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateHandlesRaw, // access to keyed state stream operatorStateHandlesRaw, // access to operator state stream getContainingTask().getCancelables()); // access to register streams for canceling initializeState(initializationContext); if (restoring) { // finally restore the legacy state in case we are // migrating from a previous Flink version. restoreStreamCheckpointed(stateHandles); }}