博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Flink轻量级异步快照机制源码分析
阅读量:6822 次
发布时间:2019-06-26

本文共 32077 字,大约阅读时间需要 106 分钟。

简介

        异步屏障快照是一种轻量级的快照技术,能以低成本备份 DAG(有向无环图)或 DCG(有向有环图)计算作业的状态,这使得计算作业可以频繁进行快照并且不会对性能产生明显影响。异步屏障快照核心思想是通过屏障消息(barrier)来标记触发快照的时间点和对应的数据,从而将数据流和快照时间解耦以实现异步快照操作,同时也大大降低了对管道数据的依赖(对 DAG 类作业甚至完全不依赖),减小了随之而来的快照大小。下面将逐步分析异步屏障快照的实现原理及其源码实现。

快照机制原理描述

       Flink的Barrier是一种特殊的内部消息,用于将数据流从时间上切分为多个窗口,每个窗口对应一系列连续的快照中的一个。

      Barrier由SourceStreamTask发起并向下游广播,下游算子当收到所有inputChannel的同一snapshot编号的barrier时向下游广播barrier同时自己做snapshot,下游算子重复此过程,最后barrier会流至作业的sink,sink接收到所有barrier以后向JobManager返回ack,至此整个快照过程结束。

我们通过下面这幅图来描述一下checkpoint快照过程的各个阶段:

64ab214b7c20bc032e07a8dc531f218f1a2eb116

a) 首先barrier会由source第一次插入到数据流中,黑色竖线代表Snapshot barrier,它将数据流切分在两个checkpoint中,同时barrier随着数据流一起向下流动。

b) count-1上游依赖了两个channel并且已经收到两个barrier,此时count-1首先向它下游的print-1广播barrier(黑色竖线),同时自己做snapshot。由于count-2此时接收到一个barrier,如果是exactly-once那么这个已经接到barrier的channel后面的数据会暂时被缓存起来,等到另一个barrier到达后一起计算这就是数据流的对齐。

c) 此时count-2收到了另一个src-2发来的barrier,于是count-2和图b的count-1做同样的动作,先向数据流插入barrier然后做自己的state snapshot。

d) barrier传递到print后向JobManager返回ack,告知本次snapshot完成,JobManager将其从PendingCheckpoint转成CompeletedCheckpoint。

Checkpoint触发及存储源码分析

        JobManager提交job的时候会构造一个ExecutionGraph图,同时会调用ExecutionGraph的enableCheckpointing方法,这个方法做了两个比较重要的事:
1) 初始化CheckpointCoordinator核心组件;
2)注册job状态变化监听器jobStatusListener;
下面我们来看下这段代码:
public void enableCheckpointing(    ......省略部分代码       // 构造checkpointCoordinator对象   checkpointCoordinator = new CheckpointCoordinator(      jobInformation.getJobId(),      interval,      checkpointTimeout,      minPauseBetweenCheckpoints,      maxConcurrentCheckpoints,      externalizeSettings,      tasksToTrigger,      tasksToWaitFor,      tasksToCommitTo,      checkpointIDCounter,      checkpointStore,      checkpointDir,      ioExecutor);// register the master hooks on the checkpoint coordinator   for (MasterTriggerRestoreHook
hook : masterHooks) { if (!checkpointCoordinator.addMasterHook(hook)) { LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier()); } } checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker); // interval of max long value indicates disable periodic checkpoint, // the CheckpointActivatorDeactivator should be created only if the interval is not max value if (interval != Long.MAX_VALUE) { //注册状态变化监听器 registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator()); }}
这个状态变化监听器就是由checkpointCoordinator.createActivatorDeactivator返回的
// ------------------------------------------------------------------------//  job status listener that schedules / cancels periodic checkpoints// ------------------------------------------------------------------------public JobStatusListener createActivatorDeactivator() {   synchronized (lock) {      if (shutdown) {         throw new IllegalArgumentException("Checkpoint coordinator is shut down");      }      if (jobStatusListener == null) {         jobStatusListener = new CheckpointCoordinatorDeActivator(this);      }      return jobStatusListener;   }}
实际上里面就是new了一个CheckpointCoordinatorDeActivator它实现了JobStatusListener接口,当job状态发生变化时作出响应。CheckpointCoordinatorDeActivator主要监听两种类型的Job状态变化:
1)
Job状态由CREATE变成RUNNING,开启CheckpointScheduler;
2)Job状态变成其他任何状态,关闭CheckpointScheduler;
/** * This actor listens to changes in the JobStatus and activates or deactivates the periodic * checkpoint scheduler. */public class CheckpointCoordinatorDeActivator implements JobStatusListener {   private final CheckpointCoordinator coordinator;   public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {      this.coordinator = checkNotNull(coordinator);   }   @Override   public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) {      if (newJobStatus == JobStatus.RUNNING) {         // start the checkpoint scheduler         coordinator.startCheckpointScheduler();      } else {         // anything else should stop the trigger for now         coordinator.stopCheckpointScheduler();      }   }}
       至此,JobStatusListener就注册完成了,接下来这个监听器时刻监听Job的变化,
在Job的状态由CREATE变为RUNNING时开启Checkpoint调度器,那什么时候Job的状态会由CREATE变成RUNNING呢?下面给出对应的两个场景:
1)job第一次被提交;
2)job被重新拉起;
上面这两个场景都会触发开启Checkpoint调度器,下面我们来看下startCheckpointScheduler
// --------------------------------------------------------------------------------------------//  Periodic scheduling of checkpoints// --------------------------------------------------------------------------------------------public void startCheckpointScheduler() {   synchronized (lock) {      if (shutdown) {         throw new IllegalArgumentException("Checkpoint coordinator is shut down");      }      // make sure all prior timers are cancelled      stopCheckpointScheduler();      periodicScheduling = true;      currentPeriodicTrigger = timer.scheduleAtFixedRate(            new ScheduledTrigger(),             baseInterval, baseInterval, TimeUnit.MILLISECONDS);   }}
        从上面的代码可以看到,里面主要做了两件事,首先确保之前的checkpoint调度器已经停止,然后将ScheduledTrigger扔到定期执行的线程池中做定时调度。这也就解释了为什么我们在设置checkpoint enable的时候需要指定一个intervalTime,flink就可以帮我们按照这个时间间隔做定时checkpoint snapshot。

接下来我们重点看一下ScheduledTrigger对象,其实它就是实现了runnable接口,其中的run方法就是在定期触发一次checkpoint动作。
private final class ScheduledTrigger implements Runnable {   @Override   public void run() {      try {         triggerCheckpoint(System.currentTimeMillis(), true);      }      catch (Exception e) {         LOG.error("Exception while triggering checkpoint.", e);      }   }}

CheckpointCoordinator.triggerCheckpoint方法代码有点长,大概介绍一下里面都做了什么以及里面的核心代码段:

a)检查符合触发checkpoint的条件,例如如果禁止了周期性的checkpoint,尚未达到触发checkpoint的最小间隔等等,就直接return;

b)检查是否所有需要checkpoint和需要响应checkpoint的ACK(ack涉及到checkpoint的两阶段提交,后面会讲)的task都处于running状态,否则return;

c)如果都符合,那么执行checkpointID = checkpointIdCounter.getAndIncrement();以生成一个新的id,然后生成一个PendingCheckpoint。PendingCheckpoint是一个启动了的checkpoint,但是还没有被确认。等到所有的task都确认了本次checkpoint完成,那么这个checkpoint对象将转化为一个CompletedCheckpoint,这也就是一个checkpoint snapshot的整个生命周期;

d)定义一个超时callback,如果checkpoint执行了很久还没完成,就把它取消;

e)触发MasterHooks,用户可以定义一些额外的操作,用以增强checkpoint的功能(如准备和清理外部资源);

f)接下来是核心逻辑:

// send the messages to the tasks that trigger their checkpointfor (Execution execution: executions) {   execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}
每个Execution通过akka消息通知taskManager来triggerCheckpoint
public void triggerCheckpoint(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {   final SimpleSlot slot = assignedResource;   if (slot != null) {      final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();      //通过akka消息通知taskManager做checkpoint      taskManagerGateway.triggerCheckpoint(attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);   } else {      LOG.debug("The execution has no slot assigned. This indicates that the execution is " +         "no longer running.");   }}
        至此,触发checkpoint的工作就交到了TaskManager的手里,由TaskManager完成实际的快照工作。

TaskManager接收到TriggerCheckpoint的message触发task做checkpoint。
private def handleCheckpointingMessage(actorMessage: AbstractCheckpointMessage): Unit = {    actorMessage match {      case message: TriggerCheckpoint =>        val taskExecutionId = message.getTaskExecutionId        val checkpointId = message.getCheckpointId        val timestamp = message.getTimestamp        val checkpointOptions = message.getCheckpointOptions        log.debug(s"Receiver TriggerCheckpoint $checkpointId@$timestamp for $taskExecutionId.")        //从运行的task列表中获取task        val task = runningTasks.get(taskExecutionId)        if (task != null) {          //触发task checkpoint           task.triggerCheckpointBarrier(checkpointId, timestamp, checkpointOptions)        } else {          log.debug(s"TaskManager received a checkpoint request for unknown task $taskExecutionId.")        }      case message: NotifyCheckpointComplete =>        val taskExecutionId = message.getTaskExecutionId        val checkpointId = message.getCheckpointId        val timestamp = message.getTimestamp        log.debug(s"Receiver ConfirmCheckpoint $checkpointId@$timestamp for $taskExecutionId.")        val task = runningTasks.get(taskExecutionId)        if (task != null) {          task.notifyCheckpointComplete(checkpointId)        } else {          log.debug(            s"TaskManager received a checkpoint confirmation for unknown task $taskExecutionId.")        }      // unknown checkpoint message      case _ => unhandled(actorMessage)    }  }

TaskManager将调用Task类的triggerCheckpointBarrier方法,其核心逻辑就是在一个单线程的线程池异步调用StatefulTask类型任务的triggerCheckpoint方法,其核心实现都在StreamTask中。

public void triggerCheckpointBarrier(            final long checkpointID,            long checkpointTimestamp,            final CheckpointOptions checkpointOptions) {        final AbstractInvokable invokable = this.invokable;        final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointID, checkpointTimestamp);        if (executionState == ExecutionState.RUNNING && invokable != null) {            if (invokable instanceof StatefulTask) {                // build a local closure                final StatefulTask statefulTask = (StatefulTask) invokable;                final String taskName = taskNameWithSubtask;                final SafetyNetCloseableRegistry safetyNetCloseableRegistry =                    FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();                Runnable runnable = new Runnable() {                    @Override                    public void run() {                        // set safety net from the task's context for checkpointing thread                        LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName());                    FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);                        try {                            // StatefulTask是一个接口,实际调用StreamTask的triggerCheckpoint                            boolean success = statefulTask.triggerCheckpoint(checkpointMetaData, checkpointOptions);                            if (!success) {                                checkpointResponder.declineCheckpoint(                                        getJobID(), getExecutionId(), checkpointID,                                        new CheckpointDeclineTaskNotReadyException(taskName));                            }                        }                        catch (Throwable t) {                            if (getExecutionState() == ExecutionState.RUNNING) {                                failExternally(new Exception(                                    "Error while triggering checkpoint " + checkpointID + " for " +                                        taskNameWithSubtask, t));                            } else {                                LOG.debug("Encountered error while triggering checkpoint {} for " +                                    "{} ({}) while being not in state running.", checkpointID,                                    taskNameWithSubtask, executionId, t);                            }                        } finally {                            FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);                        }                    }                };                // 扔到SingleThreadExecutor中异步执行                executeAsyncCallRunnable(runnable, String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));            }            ......省略部分代码    }

注意:以下部分只有Source才会直接触发checkpoint,而中间算子或sink都是由barrier触发checkpoint快照的。

StreamTask的triggerCheckpoint会调用其performCheckpoint方法,我们来看下这个方法的两个核心逻辑:

1)向所有下游算子广播barrier;

2)触发本task的state保存;

注意,在触发本task的state保存之前需要广播barrier给下游的算子,下游算子在接收到所有barrier以后也是先广播barrier再做自己的checkpoint,如此循环。也正是在这里整个执行链路上开始出现Barrier(由source第一次发起)。

synchronized (lock) {            if (isRunning) {                // we can do a checkpoint                // Since both state checkpointing and downstream barrier emission occurs in this                // lock scope, they are an atomic operation regardless of the order in which they occur.                // Given this, we immediately emit the checkpoint barriers, so the downstream operators                // can start their checkpoint work as soon as possible                // 向所有下游算子广播barrier                operatorChain.broadcastCheckpointBarrier(                        checkpointMetaData.getCheckpointId(),                        checkpointMetaData.getTimestamp(),                        checkpointOptions);                // 触发本task的state保存                checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);                return true;            }      }
StreamTask在其内部继续调用其executeCheckpointing方法,在这个方法里面会对所有StreamOperator做checkpoint

public void executeCheckpointing() throws Exception {            startSyncPartNano = System.nanoTime();            boolean failed = true;            try {                for (StreamOperator
op : allOperators) { //遍历所有streamOperator做checkpoint checkpointStreamOperator(op); } startAsyncPartNano = System.nanoTime(); checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano) / 1_000_000); // 给JobManager 发送ack runAsyncCheckpointingAndAcknowledge(); failed = false; ......省略部分代码 }

StreamTask.checkpointStreamOperator里面会实际将snapshot持久化至filesystem中

private void checkpointStreamOperator(StreamOperator
op) throws Exception { if (null != op) { // 这里实际处理的时候只有当user function实现了checkpointed接口时才会做snapshot // 而checkpointed接口目前已经废弃并被CheckpointedFunction接口替代,所以这段代码内部基本不会走到 nonPartitionedStates.add(op.snapshotLegacyOperatorState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions)); // 实际写filesystem的底层逻辑都在这里面(对于CheckpointedFunction的snapshot在这里面做) OperatorSnapshotResult snapshotInProgress = op.snapshotState( checkpointMetaData.getCheckpointId(), checkpointMetaData.getTimestamp(), checkpointOptions); snapshotInProgressList.add(snapshotInProgress); } else { nonPartitionedStates.add(null); OperatorSnapshotResult emptySnapshotInProgress = new OperatorSnapshotResult(); snapshotInProgressList.add(emptySnapshotInProgress); } }

接下来我们看下AbstractStreamOperator的snapshotState,所有类型的snapshot都会在这里面进行:

1)对rich function做state snapshot;

2)对operator做 state snapshot;

3)对keyed state做snapshot;

注意:在这几种类型的state snapshot存储过程中如果出现异常,会继续往上抛,taskManager发现快照失败会取消所有task任务并启动job重启策略。

KeyGroupRange keyGroupRange = null != keyedStateBackend ?                keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;        OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();        CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(                checkpointId,                timestamp,                factory,                keyGroupRange,                getContainingTask().getCancelables())) {            // 为rich function做state snapshot            snapshotState(snapshotContext);            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());            if (null != operatorStateBackend) {                snapshotInProgress.setOperatorStateManagedFuture(                    // 对operator做 state snapshot                    operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));            }            if (null != keyedStateBackend) {                snapshotInProgress.setKeyedStateManagedFuture(                    // 对keyed state做snapshot                    keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));            }        } catch (Exception snapshotException) {            try {                snapshotInProgress.cancel();            } catch (Exception e) {                snapshotException.addSuppressed(e);            }            // 在上面做snapshot的过程中如果出现异常,会继续往上抛,taskManager接到异常会cancel所有任务            // 同时重启整个job            throw new Exception("Could not complete snapshot " + checkpointId + " for operator " +                getOperatorName() + '.', snapshotException);        }

Barrier源码分析

下面我们看下Barrier这块的实现

看Barrier这块的源码建议先了解一下inputGate,详细可参考这篇博文http://www.cnblogs.com/fxjwind/p/7641347.html,  barrier第一次由source发起,下游算子捕获barrier的过程其实就是处理input数据的过程,对应

StreamInputProcessor.processInput()方法,我们看下其核心逻辑

// 每个元素都会触发这一段逻辑,如果下一个数据是buffer则从外围的while循环中进入处理用户数据的逻辑,这个方法里默默的处理了barrier的逻辑final BufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();			if (bufferOrEvent != null) {				if (bufferOrEvent.isBuffer()) {					currentChannel = bufferOrEvent.getChannelIndex();					currentRecordDeserializer = recordDeserializers[currentChannel];					currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());				}				else {					// Event received					final AbstractEvent event = bufferOrEvent.getEvent();					if (event.getClass() != EndOfPartitionEvent.class) {						throw new IOException("Unexpected event: " + event);					}				}			}			else {				isFinished = true;				if (!barrierHandler.isEmpty()) {					throw new IllegalStateException("Trailing data in checkpoint barrier handler.");				}				return false;			}		}

BarrierBuffer实现了CheckpointBarrierHandler接口,里面有两个最核心的方法getNextNonBlocked和processBarrier

public BufferOrEvent getNextNonBlocked() throws Exception {		while (true) {			// process buffered BufferOrEvents before grabbing new ones			BufferOrEvent next;			if (currentBuffered == null) {				// 直接从inputGate中获取BufferOrEvent				next = inputGate.getNextBufferOrEvent();			}			else {				// 如果currentBuffered中有BufferOrEvent则直接取				next = currentBuffered.getNext();				if (next == null) {					completeBufferedSequence();					return getNextNonBlocked();				}			}			if (next != null) {				if (isBlocked(next.getChannelIndex())) {// 如果barrier已经到达过并且还有其他barrier未到达,该channel的block开关会打开					// if the channel is blocked we, we just store the BufferOrEvent					// block开关被开启,直接缓存BufferOrEvent(如果进到这个分支,说明当前channel属于本次checkpoint编号的数据已经处理完,所以把下个checkpoint的数据先缓存起来)					bufferSpiller.add(next);					checkSizeLimit();				}				else if (next.isBuffer()) {					//如果没有被block,就处理该条数据(如果进到这个分支说明当前channel里的数据属于本次snapshot的数据)					return next;				}				else if (next.getEvent().getClass() == CheckpointBarrier.class) {					// channel的block开关关闭状态并且event是CheckpointBarrier					if (!endOfStream) {						// process barriers only if there is a chance of the checkpoint completing						// 这个方法里面是处理barrier的核心逻辑(本checkpoint编号的channel第一次收到barrier)						processBarrier((CheckpointBarrier) next.getEvent(), next.getChannelIndex());					}				}				else if (next.getEvent().getClass() == CancelCheckpointMarker.class) {					processCancellationBarrier((CancelCheckpointMarker) next.getEvent());				}				else {					if (next.getEvent().getClass() == EndOfPartitionEvent.class) {						processEndOfPartition();					}					return next;				}			}			else if (!endOfStream) {				// end of input stream. stream continues with the buffered data				endOfStream = true;				// 释放资源,关闭channel block开关				releaseBlocksAndResetBarriers();				return getNextNonBlocked();			}			else {				// final end of both input and buffered data				return null;			}		}	}
从上面的代码可以看到下游算子在收到barrier之前和之后对数据流的处理是如何实现的。

下面我们再看下processBarrier

private void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) throws Exception {		final long barrierId = receivedBarrier.getId();		// fast path for single channel cases		if (totalNumberOfInputChannels == 1) {			if (barrierId > currentCheckpointId) {				// new checkpoint				currentCheckpointId = barrierId;				notifyCheckpoint(receivedBarrier);			}			return;		}		// -- general code path for multiple input channels --		if (numBarriersReceived > 0) {			// this is only true if some alignment is already progress and was not canceled			if (barrierId == currentCheckpointId) {				// regular case: onBarrier里面主要是打开当前channel block的开关,后面的buffer直接缓存起来				onBarrier(channelIndex);			}			else if (barrierId > currentCheckpointId) {				// 如果进到这个分支说明当前checkpoint已经过期,下面的过程主要是释放资源,开启新的checkpoint				// we did not complete the current checkpoint, another started before				LOG.warn("Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. " +						"Skipping current checkpoint.", barrierId, currentCheckpointId);				// let the task know we are not completing this				notifyAbort(currentCheckpointId, new CheckpointDeclineSubsumedException(barrierId));				// abort the current checkpoint				releaseBlocksAndResetBarriers();				// begin a the new checkpoint				beginNewAlignment(barrierId, channelIndex);			}			else {				// ignore trailing barrier from an earlier checkpoint (obsolete now)				return;			}		}		else if (barrierId > currentCheckpointId) {			// first barrier of a new checkpoint			beginNewAlignment(barrierId, channelIndex);		}		else {			// either the current checkpoint was canceled (numBarriers == 0) or			// this barrier is from an old subsumed checkpoint			return;		}		// check if we have all barriers - since canceled checkpoints always have zero barriers		// this can only happen on a non canceled checkpoint		if (numBarriersReceived + numClosedChannels == totalNumberOfInputChannels) {			// 如果满足上面的判断条件说明所有channel的barrier全部到达,真正开始出发checkpoint			if (LOG.isDebugEnabled()) {				LOG.debug("Received all barriers, triggering checkpoint {} at {}",						receivedBarrier.getId(), receivedBarrier.getTimestamp());			}			releaseBlocksAndResetBarriers();			// 通知trigger checkpoint			notifyCheckpoint(receivedBarrier);		}	}
通知trigger checkpoint:BarrierBuffer的notifyCheckpoint里面就是调用StreamTask的triggerCheckpointOnBarrier
private void notifyCheckpoint(CheckpointBarrier checkpointBarrier) throws Exception {		if (toNotifyOnCheckpoint != null) {			CheckpointMetaData checkpointMetaData =					new CheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());			long bytesBuffered = currentBuffered != null ? currentBuffered.size() : 0L;			CheckpointMetrics checkpointMetrics = new CheckpointMetrics()					.setBytesBufferedInAlignment(bytesBuffered)					.setAlignmentDurationNanos(latestAlignmentDurationNanos);			// 触发本task的checkpoint动作,后面重复source做checkpoint流程			toNotifyOnCheckpoint.triggerCheckpointOnBarrier(				checkpointMetaData,				checkpointBarrier.getCheckpointOptions(),				checkpointMetrics);		}	}
再看下StreamTask的triggerCheckpointOnBarrier直接调用performCheckpoint方法,到这里就和上面讲的source做触发checkpoint的逻辑一模一样了。
public void triggerCheckpointOnBarrier(			CheckpointMetaData checkpointMetaData,			CheckpointOptions checkpointOptions,			CheckpointMetrics checkpointMetrics) throws Exception {		try {			performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);		}		catch (CancelTaskException e) {			LOG.info("Operator {} was cancelled while performing checkpoint {}.",					getName(), checkpointMetaData.getCheckpointId());			throw e;		}		catch (Exception e) {			throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " +				getName() + '.', e);		}	}

State恢复过程分析

      状态恢复也很重要,之前做的checkpoint snapshot就是为了在作业失败的情况下能够进行状态恢复,保证
Exactly once。

Flink在提交job和重启job的时候都会从最后一次completed的checkpoint开始恢复,这个逻辑也是在CheckpointCoordinator类中下面我们来看下这个恢复状态的方法

public boolean restoreLatestCheckpointedState(      Map
tasks, boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { ...... // Recover the checkpoints completedCheckpointStore.recover(sharedStateRegistry); // 获取最后一次completed的checkpoint CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); ...... LOG.info("Restoring from latest valid checkpoint: {}.", latest); // re-assign the task states final Map
operatorStates = latest.getOperatorStates(); StateAssignmentOperation stateAssignmentOperation = new StateAssignmentOperation(tasks, operatorStates, allowNonRestoredState); // 恢复状态 stateAssignmentOperation.assignStates(); ...... return true; }}

StateAssignmentOperation.assignStates

public boolean assignStates() throws Exception {		Map
localOperators = new HashMap<>(operatorStates); // 获取所有tasks Map
localTasks = this.tasks; checkStateMappingCompleteness(allowNonRestoredState, operatorStates, tasks); for (Map.Entry
task : localTasks.entrySet()) { final ExecutionJobVertex executionJobVertex = task.getValue(); // find the states of all operators belonging to this task List
operatorIDs = executionJobVertex.getOperatorIDs(); List
altOperatorIDs = executionJobVertex.getUserDefinedOperatorIDs(); List
operatorStates = new ArrayList<>(); boolean statelessTask = true; for (int x = 0; x < operatorIDs.size(); x++) { OperatorID operatorID = altOperatorIDs.get(x) == null ? operatorIDs.get(x) : altOperatorIDs.get(x); // 从map中删除并返回operateState OperatorState operatorState = localOperators.remove(operatorID); if (operatorState == null) { operatorState = new OperatorState( operatorID, executionJobVertex.getParallelism(), executionJobVertex.getMaxParallelism()); } else { statelessTask = false; } // add到operatorStates集合中 operatorStates.add(operatorState); } if (statelessTask) { // skip tasks where no operator has any state continue; } // 对每个task做状态初始化 assignAttemptState(task.getValue(), operatorStates); } return true; }
StateAssignmentOperation.assignAttemptState做实际的状态初始化
......private void assignAttemptState(ExecutionJobVertex executionJobVertex, List
operatorStates) { // check if a stateless task if (!allElementsAreNull(subNonPartitionableState) || !allElementsAreNull(subManagedOperatorState) || !allElementsAreNull(subRawOperatorState) || subKeyedState != null) { // 构造taskStateHandles TaskStateHandles taskStateHandles = new TaskStateHandles( new ChainedStateHandle<>(subNonPartitionableState), subManagedOperatorState, subRawOperatorState, subKeyedState != null ? subKeyedState.f0 : null, subKeyedState != null ? subKeyedState.f1 : null); // 在当前execution中初始化state currentExecutionAttempt.setInitialState(taskStateHandles);}

Execution的setInitialState方法就是将checkpointStateHandles赋值给taskState

public void setInitialState(TaskStateHandles checkpointStateHandles) {		checkState(state == CREATED, "Can only assign operator state when execution attempt is in CREATED");		this.taskState = checkpointStateHandles;	}

recovery只是把从zk中获取的checkpoint中的状态赋值给operatorState

然后deployToSlot会把初始state,提交给taskManager

public void deployToSlot(final SimpleSlot slot) throws JobException {......// 将taskState封装到deployment中final TaskDeploymentDescriptor deployment = vertex.createDeploymentDescriptor(				attemptId,				slot,				taskState,				attemptNumber);			final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();                        // 将deployment提交给taskmanager			final Future
submitResultFuture = taskManagerGateway.submitTask(deployment, timeout); submitResultFuture.exceptionallyAsync(new ApplyFunction
() { @Override public Void apply(Throwable failure) { if (failure instanceof TimeoutException) { String taskname = vertex.getTaskNameWithSubtaskIndex()+ " (" + attemptId + ')'; markFailed(new Exception( "Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation() + ") not responding after a timeout of " + timeout, failure)); } else { markFailed(failure); } return null; } }, executor); }}

       deployToSlot主要是构造一个TaskDeploymentDescriptor(TDD)并通过akka 消息发送给taskManager调度task。
接着,taskManager接收到tdd以后会new一个task并启动该task的调度执行其run方法。
这个时候如果是restart的场景,并且存在待恢复的状态(即taskStateHandles != null),就会走到Task类的这段代码
// the very last thing before the actual execution starts running is to inject// the state into the task. the state is non-empty if this is an execution// of a task that failed but had backuped state from a checkpointif (null != taskStateHandles) {   if (invokable instanceof StatefulTask) {      StatefulTask op = (StatefulTask) invokable;      op.setInitialState(taskStateHandles);//注意这里会给StreamTask的restoreStateHandles赋值   } else {      throw new IllegalStateException("Found operator state for a non-stateful task invokable");   }   // be memory and GC friendly - since the code stays in invoke() for a potentially long time,   // we clear the reference to the state handle   //noinspection UnusedAssignment   taskStateHandles = null;}
Task类中的下面行代码会通过反射调用StreamTask的invoke方法

// run the invokableinvokable.invoke();
在这个方法里面我们只截取跟restore states相关的一小段代码

synchronized (lock) {   // both the following operations are protected by the lock   // so that we avoid race conditions in the case that initializeState()   // registers a timer, that fires before the open() is called.    //这里会对状态进行初始化   initializeState();   openAllOperators();}
接着我们看下StreamTask的initializeState方法里面都做了些什么

private void initializeState() throws Exception {   boolean restored = null != restoreStateHandles;   if (restored) {      // restart并且有需要恢复的状态时会走到这个分支         checkRestorePreconditions(operatorChain.getChainLength());      // 初始化operators states      initializeOperators(true);      restoreStateHandles = null; // free for GC   } else {      initializeOperators(false);   }}
接下来就是实际初始化恢复keyedStates和operatorStates的过程
public final void initializeState(OperatorStateHandles stateHandles) throws Exception {   Collection
keyedStateHandlesRaw = null; Collection
operatorStateHandlesRaw = null; Collection
operatorStateHandlesBackend = null; boolean restoring = null != stateHandles; initKeyedState(); //TODO we should move the actual initialization of this from StreamTask to this class if (getKeyedStateBackend() != null && timeServiceManager == null) { timeServiceManager = new InternalTimeServiceManager<>( getKeyedStateBackend().getNumberOfKeyGroups(), getKeyedStateBackend().getKeyGroupRange(), this, getRuntimeContext().getProcessingTimeService()); } if (restoring) { //pass directly operatorStateHandlesBackend = stateHandles.getManagedOperatorState(); operatorStateHandlesRaw = stateHandles.getRawOperatorState(); if (null != getKeyedStateBackend()) { //only use the keyed state if it is meant for us (aka head operator) keyedStateHandlesRaw = stateHandles.getRawKeyedState(); } } checkpointStreamFactory = container.createCheckpointStreamFactory(this); initOperatorState(operatorStateHandlesBackend); StateInitializationContext initializationContext = new StateInitializationContextImpl( restoring, // information whether we restore or start for the first time operatorStateBackend, // access to operator state backend keyedStateStore, // access to keyed state backend keyedStateHandlesRaw, // access to keyed state stream operatorStateHandlesRaw, // access to operator state stream getContainingTask().getCancelables()); // access to register streams for canceling initializeState(initializationContext); if (restoring) { // finally restore the legacy state in case we are // migrating from a previous Flink version. restoreStreamCheckpointed(stateHandles); }}

总结

      
Flink 通过 ABS 技术实现轻量级异步快照,大大降低了实时计算的作业快照的成本,这使得实时作业可以更频繁地进行快照。并且在常见的 DAG 作业中,作业缓存的数据将不会被保存到快照中,这意味着作业恢复期间不需要再对缓存数据进行补算,大大减短了故障恢复时间。相信 ABS 技术将会逐渐取代目前相对笨拙的全局作业快照,被更多计算引擎选作快照算法。

你可能感兴趣的文章
【Javascript第二重境界】函数
查看>>
SpringBoot 与 Web开发
查看>>
JavaWeb 三层框架
查看>>
BOOL, BOOLEAN, bool
查看>>
Mac 下 SVN 的使用
查看>>
简述session
查看>>
Android APK反编译教程(带工具)
查看>>
SSO单点登录学习总结(1)——单点登录(SSO)原理解析
查看>>
Windows学习总结(12)——Windows 10系统开始运行-cmd命令大全
查看>>
单元测试过程
查看>>
新学的的matplotlib库~~~~
查看>>
【树形dp】vijos P1180 选课
查看>>
实验三
查看>>
Codeforces Round #363 (Div. 2)
查看>>
HDU 6141 - I am your Father! | 2017 Multi-University Training Contest 8
查看>>
日期操作
查看>>
angularjs中ng-repeat-start与ng-repeat-end用法实例
查看>>
linux centos6.5 安装gcc-c++时出现 yum install gcc-c++ cannot find a valid baseurl for repo...
查看>>
Jsp动态生成表格
查看>>
MongoDB环境配置
查看>>