以下介绍主要参考 org.apache.spark.scheduler.DAGScheduler 的 scaladoc。但由于 DAGScheduler
是 private class
,它不会出现在官方 API 文档中, 因此强烈建议你先阅读 源代码,然后再阅读此页和相关页面。
是 Apache Spark 的调度层,使用 Jobs 和 Stages 实现**stage 层面的调度**。
将**逻辑执行计划**(基于依赖关系的 RDD lineage,由 RDD transformations 构建)转化为**物理执行计划**(使用 stages)。
在对 RDD
调用了 action 之后,SparkContext 将一个逻辑计划提交给 DAGScheduler
,它随后转化为一组stages,作为 TaskSets 提交执行。
仅在Driver 端运行,并作为 SparkContext 初始化 工作中一部分,其创建是在( TaskScheduler 和 SchedulerBackend 准备好之后)。
在 Spark 有以下三个职责:
- 为给定的 job 计算一个可**执行 DAG**(stage 的 DAG)
- 确定每个Task运行的首选位置
- 处理因**shuffle 输出文件**丢失导致的失败
DAGScheduler 为每个 job 计并算基于stage的有向无环图 (DAG),跟踪哪些 RDD 和stage的输出需要被保存,并找到一个最小的计划来运行这些jobs。然后将stages提交给 TaskScheduler。
除了计算得到可执行的 DAG 之外,DAGScheduler 还根据当前的缓存状态确定每个Task的首选位置信息,并将信息传递给 TaskScheduler。
DAGScheduler 会追踪哪些 RDD 被缓存(或持久化) 以避免“重新计算”它们,即避免map端的重新shuffle操作。DAGScheduler 会记录哪些 ShuffleMapStage.md[ShuffleMapStage] 已经产生了输出文件(存储在 BlockManager 中)。
只对缓存位置坐标感兴趣,即每个 RDD 分区的主机和执行器 Excutor ID。
此外,它还处理由于 shuffle 输出文件丢失导致的失败,在这种情况下可能需要重新提交Stage。而那些不是由 shuffle 文件丢失引发的阶段内失败,则由 TaskScheduler 处理,它将重试每个任务几次,然后取消整个阶段。
DAGScheduler 使用 事件队列架构,其中一个线程可以发布 DAGSchedulerEvent
事件,例如提交的新 job 或阶段,DAGScheduler 读取并顺序执行这些事件。参见章节
DAGScheduler 按拓扑顺序运stage。
DAGScheduler 使用 SparkContext,TaskSchedulerLiveListenerBus,MapOutputTracker 和 BlockManager 作为其服务。当然,最基本的,DAGScheduler 只需要一个 SparkContext
(并请求 SparkContext
当 DAGScheduler通过执行RDD ACTION 或者 直接调用 SparkContext.runJob()方法时,它会产生并行任务来计算每个分区的(部分)结果。
Creating Instance¶
是在 SparkContext 创建的 时候被创建.
创建完成之后, DAGScheduler
会请求 TaskScheduler 与 自己关联 并请求 DAGScheduler Event Bus 开始接受事件处理.
uses DAGSchedulerSource for performance metrics.
DAGScheduler Event Bus¶
uses an event bus to process scheduling events on a separate thread (one by one and asynchronously).
requests the event bus to start right when created and stops it when requested to stop.
defines event-posting methods for posting DAGSchedulerEvent events to the event bus.
is given a TaskScheduler when created.
is used for the following:
- Submitting missing tasks of a stage
- Handling task completion (CompletionEvent)
- Killing a task
- Failing a job and all other independent single-job stages
- Stopping itself
Running Job¶
runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit
submits a job and waits until a result is available.
prints out the following INFO message to the logs when the job has finished successfully:
Job [jobId] finished: [callSite], took [time] s
prints out the following INFO message to the logs when the job has failed:
Job [jobId] failed: [callSite], took [time] s
is used when:
is requested to run a job
Submitting Job¶
submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U]
increments the nextJobId internal counter.
creates a JobWaiter for the (number of) partitions and the given resultHandler
requests the DAGSchedulerEventProcessLoop to post a JobSubmitted.
In the end, submitJob
returns the JobWaiter
For empty partitions (no partitions to compute), submitJob
requests the LiveListenerBus to post a SparkListenerJobStart and SparkListenerJobEnd (with JobSucceeded
result marker) events and returns a JobWaiter with no tasks to wait for.
throws an IllegalArgumentException
when the partitions indices are not among the partitions of the given RDD
Attempting to access a non-existent partition: [p]. Total number of partitions: [maxPartitions]
is used when:
is requested to submit a jobDAGScheduler
is requested to run a job
Partition Placement Preferences¶
keeps track of block locations per RDD and partition.
uses TaskLocation that includes a host name and an executor id on that host (as ExecutorCacheTaskLocation
The keys are RDDs (their ids) and the values are arrays indexed by partition numbers.
Each entry is a set of block locations where a RDD partition is cached, i.e. the BlockManagers of the blocks.
Initialized empty when DAGScheduler
is created.
Used when DAGScheduler
is requested for the locations of the cache blocks of a RDD.
tracks ActiveJobs:
Adds a new
when requested to handle JobSubmitted or MapStageSubmitted events -
Removes an
when requested to clean up after an ActiveJob and independent stages. -
Removes all
when requested to doCancelAllJobs.
uses ActiveJobs
registry when requested to handle JobGroupCancelled or TaskCompletion events, to cleanUpAfterSchedulerStop and to abort a stage.
The number of ActiveJobs is available using job.activeJobs performance metric.
Creating ResultStage for RDD¶
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage
is used when DAGScheduler
is requested to handle a JobSubmitted event.
Creating ShuffleMapStage for ShuffleDependency¶
shuffleDep: ShuffleDependency[_, _, _],
jobId: Int): ShuffleMapStage
creates a ShuffleMapStage for the given ShuffleDependency as follows:
Stage ID is generated based on nextStageId internal counter
RDD is taken from the given ShuffleDependency
Number of tasks is the number of partitions of the RDD
registers the ShuffleMapStage
in the stageIdToStage and shuffleIdToMapStage internal registries.
requests the MapOutputTrackerMaster to check whether it contains the shuffle ID or not.
If not, createShuffleMapStage
prints out the following INFO message to the logs and requests the MapOutputTrackerMaster to register the shuffle.
Registering RDD [id] ([creationSite]) as input to shuffle [shuffleId]
is used when:
is requested to find or create a ShuffleMapStage for a given ShuffleDependency
Cleaning Up After Job and Independent Stages¶
job: ActiveJob): Unit
cleans up the state for job
and any stages that are not part of any other job.
looks the job
up in the internal <
If no stages are found, the following ERROR is printed out to the logs:
No stages registered for job [jobId]
Oterwise, cleanupStateForJobAndIndependentStages
uses <
For each stage, cleanupStateForJobAndIndependentStages
reads the jobs the stage belongs to.
If the job
does not belong to the jobs of the stage, the following ERROR is printed out to the logs:
Job [jobId] not registered for stage [stageId] even though that stage was registered for the job
If the job
was the only job for the stage, the stage (and the stage id) gets cleaned up from the registries, i.e. <
While removing from <
Removing running stage [stageId]
While removing from <
Removing stage [stageId] from waiting set.
While removing from <
Removing stage [stageId] from failed set.
After all cleaning (using <job
, you should see the following DEBUG message in the logs:
After removal of stage [stageId], remaining stages = [stageIdToStage.size]
The job
is removed from <
The final stage of the job
is removed, i.e. ResultStage or ShuffleMapStage.
is used in handleTaskCompletion when a ResultTask
has completed successfully, failJobAndIndependentStages and markMapStageJobAsFinished.
Marking ShuffleMapStage Job Finished¶
job: ActiveJob,
stats: MapOutputStatistics): Unit
marks the active job
finished and notifies Spark listeners.
Internally, markMapStageJobAsFinished
marks the zeroth partition finished and increases the number of tasks finished in job
The job
listener is notified about the 0th task succeeded.
The <
Ultimately, SparkListenerJobEnd is posted to LiveListenerBus (as <job
, the current time (in millis) and JobSucceeded
job result.
is used in handleMapStageSubmitted and handleTaskCompletion.
Finding Or Creating Missing Direct Parent ShuffleMapStages (For ShuffleDependencies) of RDD¶
rdd: RDD[_],
firstJobId: Int): List[Stage]
and then <
is used when DAGScheduler
is requested to create a ShuffleMapStage or a ResultStage.
Marking Stage Finished¶
stage: Stage,
errorMessage: Option[String] = None,
willRetry: Boolean = false): Unit
is used when...FIXME
Finding or Creating ShuffleMapStage for ShuffleDependency¶
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage
finds a ShuffleMapStage by the shuffleId of the given ShuffleDependency in the shuffleIdToMapStage internal registry and returns it if available.
If not found, getOrCreateShuffleMapStage
finds all the missing ancestor shuffle dependencies and creates the missing ShuffleMapStage stages (including one for the input ShuffleDependency
is used when:
is requested to find or create missing direct parent ShuffleMapStages of an RDD, find missing parent ShuffleMapStages for a stage, handle a MapStageSubmitted event, and check out stage dependency on a stage
Finding Missing ShuffleDependencies For RDD¶
rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]]
finds all missing shuffle dependencies for the given RDD traversing its rdd/spark-rdd-lineage.md[RDD lineage].
NOTE: A missing shuffle dependency of a RDD is a dependency not registered in <
Internally, getMissingAncestorShuffleDependencies
is used when DAGScheduler
is requested to find all ShuffleMapStage stages for a ShuffleDependency.
Finding Direct Parent Shuffle Dependencies of RDD¶
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
finds direct parent shuffle dependencies for the given RDD.
Internally, getShuffleDependencies
takes the direct rdd/index.md#dependencies[shuffle dependencies of the input RDD] and direct shuffle dependencies of all the parent non-ShuffleDependencies
in the dependency chain (aka RDD lineage).
is used when DAGScheduler
is requested to find or create missing direct parent ShuffleMapStages (for ShuffleDependencies of a RDD) and find all missing shuffle dependencies for a given RDD.
Failing Job and Independent Single-Job Stages¶
job: ActiveJob,
failureReason: String,
exception: Option[Throwable] = None): Unit
fails the input job
and all the stages that are only used by the job.
Internally, failJobAndIndependentStages
uses <
If no stages could be found, you should see the following ERROR message in the logs:
No stages registered for job [id]
Otherwise, for every stage, failJobAndIndependentStages
finds the job ids the stage belongs to.
If no stages could be found or the job is not referenced by the stages, you should see the following ERROR message in the logs:
Job [id] not registered for stage [id] even though that stage was registered for the job
Only when there is exactly one job registered for the stage and the stage is in RUNNING state (in runningStages
internal registry), TaskScheduler.md#contract[TaskScheduler
is requested to cancel the stage's tasks] and <
NOTE: failJobAndIndependentStages
uses <
is used when...FIXME
Aborting Stage¶
failedStage: Stage,
reason: String,
exception: Option[Throwable]): Unit
is an internal method that finds all the active jobs that depend on the failedStage
stage and fails them.
Internally, abortStage
looks the failedStage
stage up in the internal <
If it was, abortStage
finds all the active jobs (in the internal <
At this time, the completionTime
property (of the failed stage's StageInfo) is assigned to the current time (millis).
All the active jobs that depend on the failed stage (as calculated above) and the stages that do not belong to other jobs (aka independent stages) are <exception
If there are no jobs depending on the failed stage, you should see the following INFO message in the logs:
Ignoring failure of [failedStage] because all jobs depending on it are done
is used when DAGScheduler
is requested to handle a TaskSetFailed event, submit a stage, submit missing tasks of a stage, handle a TaskCompletion event.
Checking Out Stage Dependency on Given Stage¶
stage: Stage,
target: Stage): Boolean
compares two stages and returns whether the stage
depends on target
stage (i.e. true
) or not (i.e. false
NOTE: A stage A
depends on stage B
if B
is among the ancestors of A
Internally, stageDependsOn
walks through the graph of RDDs of the input stage
. For every RDD in the RDD's dependencies (using RDD.dependencies
) stageDependsOn
adds the RDD of a NarrowDependency to a stack of RDDs to visit while for a ShuffleDependency it <ShuffleDependency
>> for the dependency and the stage
's first job id that it later adds to a stack of RDDs to visit if the map stage is ready, i.e. all the partitions have shuffle outputs.
After all the RDDs of the input stage
are visited, stageDependsOn
checks if the target
's RDD is among the RDDs of the stage
, i.e. whether the stage
depends on target
is used when DAGScheduler
is requested to abort a stage.
Submitting Waiting Child Stages for Execution¶
parent: Stage): Unit
submits for execution all waiting stages for which the input parent
Stage.md[Stage] is the direct parent.
NOTE: Waiting stages are the stages registered in <
When executed, you should see the following TRACE
messages in the logs:
Checking if any dependencies of [parent] are now runnable
running: [runningStages]
waiting: [waitingStages]
failed: [failedStages]
finds child stages of the input parent
stage, removes them from waitingStages
internal registry, and <
is used when DAGScheduler
is requested to submits missing tasks for a stage and handles a successful ShuffleMapTask completion.
Submitting Stage (with Missing Parents) for Execution¶
stage: Stage): Unit
submits the input stage
or its missing parents (if there any stages not computed yet before the input stage
NOTE: submitStage
is also used to DAGSchedulerEventProcessLoop.md#resubmitFailedStages[resubmit failed stages].
recursively submits any missing parents of the stage
Internally, submitStage
first finds the earliest-created job id that needs the stage
NOTE: A stage itself tracks the jobs (their ids) it belongs to (using the internal jobIds
The following steps depend on whether there is a job or not.
If there are no jobs that require the stage
, submitStage
No active job for stage [id]
If however there is a job for the stage
, you should see the following DEBUG message in the logs:
checks the status of the stage
and continues when it was not recorded in <
With the stage
ready for submission, submitStage
calculates the <
missing: [missing]
When the stage
has no parent stages missing, you should see the following INFO message in the logs:
Submitting [stage] ([stage.rdd]), which has no missing parents
If however there are missing parent stages for the stage
, submitStage
is recorded in the internal <
is used recursively for missing parents of the given stage and when DAGScheduler is requested for the following:
resubmitFailedStages (ResubmitFailedStages event)
submitWaitingChildStages (CompletionEvent event)
Handle JobSubmitted, MapStageSubmitted and TaskCompletion events
Stage Attempts¶
A single stage can be re-executed in multiple attempts due to fault recovery. The number of attempts is configured (FIXME).
If TaskScheduler
reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits the lost stage. This is detected through a DAGSchedulerEventProcessLoop.md#handleTaskCompletion-FetchFailed[CompletionEvent
with FetchFailed
], or an <TaskSets
for any lost stage(s) that compute the missing tasks.
Please note that tasks from the old attempts of a stage could still be running.
A stage object tracks multiple StageInfo objects to pass to Spark listeners or the web UI.
The latest StageInfo
for the most recent attempt for a stage is accessible through latestInfo
Preferred Locations¶
DAGScheduler computes where to run each task in a stage based on the rdd/index.md#getPreferredLocations[preferred locations of its underlying RDDs], or <
Adaptive Query Planning / Adaptive Scheduling¶
See SPARK-9850 Adaptive execution in Spark for the design document. The work is currently in progress.
DAGScheduler.submitMapStage method is used for adaptive query planning, to run map stages and look at statistics about their outputs before submitting downstream stages.
ScheduledExecutorService daemon services¶
DAGScheduler uses the following ScheduledThreadPoolExecutors (with the policy of removing cancelled tasks from a work queue at time of cancellation):
- a daemon thread pool usingj.u.c.ScheduledThreadPoolExecutor
with core pool size1
. It is used to post a DAGSchedulerEventProcessLoop.md#ResubmitFailedStages[ResubmitFailedStages] event when DAGSchedulerEventProcessLoop.md#handleTaskCompletion-FetchFailed[FetchFailed
is reported].
They are created using ThreadUtils.newDaemonSingleThreadScheduledExecutor
method that uses Guava DSL to instantiate a ThreadFactory.
Finding Missing Parent ShuffleMapStages For Stage¶
stage: Stage): List[Stage]
finds missing parent ShuffleMapStages in the dependency graph of the input stage
(using the breadth-first search algorithm).
Internally, getMissingParentStages
starts with the stage
's RDD and walks up the tree of all parent RDDs to find <
NOTE: A Stage
tracks the associated RDD using Stage.md#rdd[rdd
NOTE: An uncached partition of a RDD is a partition that has Nil
in the <
traverses the rdd/index.md#dependencies[parent dependencies of the RDD] and acts according to their type, i.e. ShuffleDependency or NarrowDependency.
NOTE: ShuffleDependency and NarrowDependency are the main top-level Dependencies.
For each NarrowDependency
, getMissingParentStages
simply marks the corresponding RDD to visit and moves on to a next dependency of a RDD or works on another unvisited parent RDD.
NOTE: NarrowDependency is a RDD dependency that allows for pipelined execution.
focuses on ShuffleDependency
NOTE: ShuffleDependency is a RDD dependency that represents a dependency on the output of a ShuffleMapStage, i.e. shuffle map stage.
For each ShuffleDependency
, getMissingParentStages
is not available, it is added to the set of missing (map) stages.
NOTE: A ShuffleMapStage
is available when all its partitions are computed, i.e. results are available (as blocks).
CAUTION: FIXME...IMAGE with ShuffleDependencies queried
is used when DAGScheduler
is requested to submit a stage and handle JobSubmitted and MapStageSubmitted events.
Submitting Missing Tasks of Stage¶
stage: Stage,
jobId: Int): Unit
prints out the following DEBUG message to the logs:
requests the given Stage for the missing partitions (partitions that need to be computed).
adds the stage to the runningStages internal registry.
notifies the OutputCommitCoordinator that stage execution started.
determines preferred locations (task locality preferences) of the missing partitions.
requests the stage for a new stage attempt.
requests the LiveListenerBus to post a SparkListenerStageSubmitted event.
uses the closure Serializer to serialize the stage and create a so-called task binary. submitMissingTasks
serializes the RDD (of the stage) and either the ShuffleDependency
or the compute function based on the type of the stage (ShuffleMapStage
or ResultStage
, respectively).
creates a broadcast variable for the task binary.
That shows how important broadcast variables are for Spark itself to distribute data among executors in a Spark application in the most efficient way.
creates tasks for every missing partition:
ResultTasks for a ResultStage
If there are tasks to submit for execution (i.e. there are missing partitions in the stage), submitMissingTasks prints out the following INFO message to the logs:
Submitting [size] missing tasks from [stage] ([rdd]) (first 15 tasks are for partitions [partitionIds])
requests the <
With no tasks to submit for execution, submitMissingTasks
prints out the following DEBUG messages based on the type of the stage:
Stage [stage] is actually done; (available: [isAvailable],available outputs: [numAvailableOutputs],partitions: [numPartitions])
Stage [stage] is actually done; (partitions: [numPartitions])
for ShuffleMapStage
and ResultStage
, respectively.
In the end, with no tasks to submit for execution, submitMissingTasks
is used when DAGScheduler
is requested to submit a stage for execution.
Finding Preferred Locations for Missing Partitions¶
rdd: RDD[_],
partition: Int): Seq[TaskLocation]
is simply an alias for the internal (recursive) <
is used when...FIXME
Finding BlockManagers (Executors) for Cached RDD Partitions (aka Block Location Discovery)¶
rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]]
gives TaskLocations (block locations) for the partitions of the input rdd
. getCacheLocs
caches lookup results in <
NOTE: The size of the collection from getCacheLocs
is exactly the number of partitions in rdd
NOTE: The size of every TaskLocation collection (i.e. every entry in the result of getCacheLocs
) is exactly the number of blocks managed using storage:BlockManager.md[BlockManagers] on executors.
Internally, getCacheLocs
finds rdd
in the <
If rdd
is not in <getCacheLocs
branches per its storage:StorageLevel.md[storage level].
storage level (i.e. no caching), the result is an empty locations (i.e. no location preference).
For other non-NONE
storage levels, getCacheLocs
storage:BlockManagerMaster.md#getLocations-block-array[requests BlockManagerMaster
for block locations] that are then mapped to TaskLocations with the hostname of the owning BlockManager
for a block (of a partition) and the executor id.
NOTE: getCacheLocs
uses <
records the computed block locations per partition (as TaskLocation) in <
NOTE: getCacheLocs
requests locations from BlockManagerMaster
using storage:BlockId.md#RDDBlockId[RDDBlockId] with the RDD id and the partition indices (which implies that the order of the partitions matters to request proper blocks).
NOTE: DAGScheduler uses TaskLocation.md[TaskLocations] (with host and executor) while storage:BlockManagerMaster.md[BlockManagerMaster] uses storage:BlockManagerId.md[] (to track similar information, i.e. block locations).
is used when DAGScheduler
is requested to find missing parent MapStages and getPreferredLocsInternal.
Finding Placement Preferences for RDD Partition (recursively)¶
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation]
first <partition
of the rdd
>> (using <
Otherwise, if not found, getPreferredLocsInternal
rdd/index.md#preferredLocations[requests rdd
for the preferred locations of partition
] and returns them.
NOTE: Preferred locations of the partitions of a RDD are also called placement preferences or locality preferences.
Otherwise, if not found, getPreferredLocsInternal
finds the first parent NarrowDependency and (recursively) finds TaskLocations
If all the attempts fail to yield any non-empty result, getPreferredLocsInternal
returns an empty collection of TaskLocation.md[TaskLocations].
is used when DAGScheduler
is requested for the preferred locations for missing partitions.
Stopping DAGScheduler¶
stop(): Unit
stops the internal dag-scheduler-message
thread pool, dag-scheduler-event-loop, and TaskScheduler.
is used when SparkContext
is requested to stop.
rdd: RDD[_]): Unit
checkBarrierStageWithNumSlots is used when DAGScheduler is requested to create <
Killing Task¶
taskId: Long,
interruptThread: Boolean,
reason: String): Boolean
requests the TaskScheduler to kill a task.
is used when SparkContext
is requested to kill a task.
cleanUpAfterSchedulerStop(): Unit
is used when DAGSchedulerEventProcessLoop
is requested to onStop.
execId: String,
fileLost: Boolean,
hostToUnregisterOutputs: Option[String],
maybeEpoch: Option[Long] = None): Unit
removeExecutorAndUnregisterOutputs is used when DAGScheduler is requested to handle <
shuffleStage: ShuffleMapStage): Unit
is used when DAGScheduler
is requested to submit missing tasks (of a ShuffleMapStage
that has just been computed) and handle a task completion (of a ShuffleMapStage
jobId: Int,
stage: Stage): Unit
is used when DAGScheduler
is requested to create ShuffleMapStage and ResultStage stages.
execId: String,
// (taskId, stageId, stageAttemptId, accumUpdates)
accumUpdates: Array[(Long, Int, Int, Seq[AccumulableInfo])],
blockManagerId: BlockManagerId,
// (stageId, stageAttemptId) -> metrics
executorUpdates: mutable.Map[(Int, Int), ExecutorMetrics]): Boolean
posts a SparkListenerExecutorMetricsUpdate (to listenerBus) and informs BlockManagerMaster that blockManagerId
block manager is alive (by posting BlockManagerHeartbeat).
is used when TaskSchedulerImpl
is requested to handle an executor heartbeat.
Event Handlers¶
AllJobsCancelled Event Handler¶
doCancelAllJobs(): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle an AllJobsCancelled event and onError.
BeginEvent Event Handler¶
task: Task[_],
taskInfo: TaskInfo): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a BeginEvent event.
Handling Task Completion Event¶
event: CompletionEvent): Unit
handles a CompletionEvent.
notifies the OutputCommitCoordinator that a task completed.
finds the stage in the stageIdToStage registry. If not found, handleTaskCompletion
postTaskEnd and quits.
announces task completion application-wide.
branches off per TaskEndReason
(as event.reason
TaskEndReason | Description |
Success | Acts according to the type of the task that completed, i.e. ShuffleMapTask and ResultTask |
Resubmitted | |
others |
Handling Successful Task Completion¶
When a task has finished successfully (i.e. Success
end reason), handleTaskCompletion
marks the partition as no longer pending (i.e. the partition the task worked on is removed from pendingPartitions
of the stage).
NOTE: A Stage
tracks its own pending partitions using scheduler:Stage.md#pendingPartitions[pendingPartitions
branches off given the type of the task that completed, i.e. <
Handling Successful ResultTask Completion¶
For scheduler:ResultTask.md[ResultTask], the stage is assumed a scheduler:ResultStage.md[ResultStage].
finds the ActiveJob
associated with the ResultStage
NOTE: scheduler:ResultStage.md[ResultStage] tracks the optional ActiveJob
as scheduler:ResultStage.md#activeJob[activeJob
property]. There could only be one active job for a ResultStage
If there is no job for the ResultStage
, you should see the following INFO message in the logs:
Ignoring result from [task] because its job has finished
Otherwise, when the ResultStage
has a ActiveJob
, handleTaskCompletion
checks the status of the partition output for the partition the ResultTask
ran for.
NOTE: ActiveJob
tracks task completions in finished
property with flags for every partition in a stage. When the flag for a partition is enabled (i.e. true
), it is assumed that the partition has been computed (and no results from any ResultTask
are expected and hence simply ignored).
CAUTION: FIXME Describe why could a partition has more ResultTask
ignores the CompletionEvent
when the partition has already been marked as completed for the stage and simply exits.
scheduler:DAGScheduler.md#updateAccumulators[updates accumulators].
The partition for the ActiveJob
(of the ResultStage
) is marked as computed and the number of partitions calculated increased.
NOTE: ActiveJob
tracks what partitions have already been computed and their number.
If the ActiveJob
has finished (when the number of partitions computed is exactly the number of partitions in a stage) handleTaskCompletion
does the following (in order):
- scheduler:DAGScheduler.md#markStageAsFinished[Marks
computed]. - scheduler:DAGScheduler.md#cleanupStateForJobAndIndependentStages[Cleans up after
and independent stages]. - Announces the job completion application-wide (by posting a SparkListener.md#SparkListenerJobEnd[SparkListenerJobEnd] to scheduler:LiveListenerBus.md[]).
In the end, handleTaskCompletion
notifies JobListener
of the ActiveJob
that the task succeeded.
NOTE: A task succeeded notification holds the output index and the result.
When the notification throws an exception (because it runs user code), handleTaskCompletion
notifies JobListener
about the failure (wrapping it inside a SparkDriverExecutionException
Handling Successful ShuffleMapTask Completion¶
For scheduler:ShuffleMapTask.md[ShuffleMapTask], the stage is assumed a scheduler:ShuffleMapStage.md[ShuffleMapStage].
scheduler:DAGScheduler.md#updateAccumulators[updates accumulators].
The task's result is assumed scheduler:MapStatus.md[MapStatus] that knows the executor where the task has finished.
You should see the following DEBUG message in the logs:
ShuffleMapTask finished on [execId]
If the executor is registered in scheduler:DAGScheduler.md#failedEpoch[failedEpoch
internal registry] and the epoch of the completed task is not greater than that of the executor (as in failedEpoch
registry), you should see the following INFO message in the logs:
Ignoring possibly bogus [task] completion from executor [executorId]
Otherwise, handleTaskCompletion
scheduler:ShuffleMapStage.md#addOutputLoc[registers the MapStatus
result for the partition with the stage] (of the completed task).
does more processing only if the ShuffleMapStage
is registered as still running (in scheduler:DAGScheduler.md#runningStages[runningStages
internal registry]) and the scheduler:Stage.md#pendingPartitions[ShuffleMapStage
stage has no pending partitions to compute].
The ShuffleMapStage
is <
You should see the following INFO messages in the logs:
looking for newly runnable stages
running: [runningStages]
waiting: [waitingStages]
failed: [failedStages]
scheduler:MapOutputTrackerMaster.md#registerMapOutputs[registers the shuffle map outputs of the ShuffleDependency
with MapOutputTrackerMaster
] (with the epoch incremented) and scheduler:DAGScheduler.md#clearCacheLocs[clears internal cache of the stage's RDD block locations].
NOTE: scheduler:MapOutputTrackerMaster.md[MapOutputTrackerMaster] is given when scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created].
If the scheduler:ShuffleMapStage.md#isAvailable[ShuffleMapStage
stage is ready], all scheduler:ShuffleMapStage.md#mapStageJobs[active jobs of the stage] (aka map-stage jobs) are scheduler:DAGScheduler.md#markMapStageJobAsFinished[marked as finished] (with scheduler:MapOutputTrackerMaster.md#getStatistics[MapOutputStatistics
from MapOutputTrackerMaster
for the ShuffleDependency
NOTE: A ShuffleMapStage
stage is ready (aka available) when all partitions have shuffle outputs, i.e. when their tasks have completed.
Eventually, handleTaskCompletion
scheduler:DAGScheduler.md#submitWaitingChildStages[submits waiting child stages (of the ready ShuffleMapStage
If however the ShuffleMapStage
is not ready, you should see the following INFO message in the logs:
Resubmitting [shuffleStage] ([shuffleStage.name]) because some of its tasks had failed: [missingPartitions]
In the end, handleTaskCompletion
scheduler:DAGScheduler.md#submitStage[submits the ShuffleMapStage
for execution].
TaskEndReason: Resubmitted¶
For Resubmitted
case, you should see the following INFO message in the logs:
Resubmitted [task], so marking it as still running
The task (by task.partitionId
) is added to the collection of pending partitions of the stage (using stage.pendingPartitions
TIP: A stage knows how many partitions are yet to be calculated. A task knows about the partition id for which it was launched.
Task Failed with FetchFailed Exception¶
bmAddress: BlockManagerId,
shuffleId: Int,
mapId: Int,
reduceId: Int,
message: String)
extends TaskFailedReason
When FetchFailed
happens, stageIdToStage
is used to access the failed stage (using task.stageId
and the task
is available in event
in handleTaskCompletion(event: CompletionEvent)
). shuffleToMapStage
is used to access the map stage (using shuffleId
If failedStage.latestInfo.attemptId != task.stageAttemptId
, you should see the following INFO in the logs:
Ignoring fetch failure from [task] as it's from [failedStage] attempt [task.stageAttemptId] and there is a more recent attempt for that stage (attempt ID [failedStage.latestInfo.attemptId]) running
CAUTION: FIXME What does failedStage.latestInfo.attemptId != task.stageAttemptId
And the case finishes. Otherwise, the case continues.
If the failed stage is in runningStages
, the following INFO message shows in the logs:
Marking [failedStage] ([failedStage.name]) as failed due to a fetch failure from [mapStage] ([mapStage.name])
markStageAsFinished(failedStage, Some(failureMessage))
is called.
CAUTION: FIXME What does markStageAsFinished
If the failed stage is not in runningStages
, the following DEBUG message shows in the logs:
Received fetch failure from [task], but its from [failedStage] which is no longer running
When disallowStageRetryForTest
is set, abortStage(failedStage, "Fetch failure will not retry stage due to testing config", None)
is called.
CAUTION: FIXME Describe disallowStageRetryForTest
and abortStage
If the scheduler:Stage.md#failedOnFetchAndShouldAbort[number of fetch failed attempts for the stage exceeds the allowed number], the scheduler:DAGScheduler.md#abortStage[failed stage is aborted] with the reason:
[failedStage] ([name]) has failed the maximum allowable number of times: 4. Most recent failure reason: [failureMessage]
If there are no failed stages reported (scheduler:DAGScheduler.md#failedStages[DAGScheduler.failedStages] is empty), the following INFO shows in the logs:
Resubmitting [mapStage] ([mapStage.name]) and [failedStage] ([failedStage.name]) due to fetch failure
And the following code is executed:
new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
CAUTION: FIXME What does the above code do?
For all the cases, the failed stage and map stages are both added to the internal scheduler:DAGScheduler.md#failedStages[registry of failed stages].
If mapId
(in the FetchFailed
object for the case) is provided, the map stage output is cleaned up (as it is broken) using mapStage.removeOutputLoc(mapId, bmAddress)
and scheduler:MapOutputTracker.md#unregisterMapOutput[MapOutputTrackerMaster.unregisterMapOutput(shuffleId, mapId, bmAddress)] methods.
CAUTION: FIXME What does mapStage.removeOutputLoc
If BlockManagerId
(as bmAddress
in the FetchFailed
object) is defined, handleTaskCompletion
enabled and maybeEpoch
from the scheduler:Task.md#epoch[Task] that completed).
is used when:
- DAGSchedulerEventProcessLoop is requested to handle a CompletionEvent event.
ExecutorAdded Event Handler¶
execId: String,
host: String): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle an ExecutorAdded event.
ExecutorLost Event Handler¶
execId: String,
workerLost: Boolean): Unit
checks whether the input optional maybeEpoch
is defined and if not requests the scheduler:MapOutputTracker.md#getEpoch[current epoch from MapOutputTrackerMaster
NOTE: MapOutputTrackerMaster
is passed in (as mapOutputTracker
) when scheduler:DAGScheduler.md#creating-instance[DAGScheduler is created].
CAUTION: FIXME When is maybeEpoch
passed in?
.DAGScheduler.handleExecutorLost image::dagscheduler-handleExecutorLost.png[align="center"]
Recurring ExecutorLost
events lead to the following repeating DEBUG message in the logs:
DEBUG Additional executor lost message for [execId] (epoch [currentEpoch])
NOTE: handleExecutorLost
handler uses DAGScheduler
's failedEpoch
and FIXME internal registries.
Otherwise, when the executor execId
is not in the scheduler:DAGScheduler.md#failedEpoch[list of executor lost] or the executor failure's epoch is smaller than the input maybeEpoch
, the executor's lost event is recorded in scheduler:DAGScheduler.md#failedEpoch[failedEpoch
internal registry].
CAUTION: FIXME Describe the case above in simpler non-technical words. Perhaps change the order, too.
You should see the following INFO message in the logs:
INFO Executor lost: [execId] (epoch [epoch])
is requested to remove the lost executor execId
CAUTION: FIXME Review what's filesLost
exits unless the ExecutorLost
event was for a map output fetch operation (and the input filesLost
is true
) or external shuffle service is not used.
In such a case, you should see the following INFO message in the logs:
Shuffle files lost for executor: [execId] (epoch [epoch])
walks over all scheduler:ShuffleMapStage.md[ShuffleMapStage]s in scheduler:DAGScheduler.md#shuffleToMapStage[DAGScheduler's shuffleToMapStage
internal registry] and do the following (in order):
is called- scheduler:MapOutputTrackerMaster.md#registerMapOutputs[MapOutputTrackerMaster.registerMapOutputs(shuffleId, stage.outputLocInMapOutputTrackerFormat(), changeEpoch = true)] is called.
In case scheduler:DAGScheduler.md#shuffleToMapStage[DAGScheduler's shuffleToMapStage
internal registry] has no shuffles registered, scheduler:MapOutputTrackerMaster.md#incrementEpoch[MapOutputTrackerMaster
is requested to increment epoch].
Ultimatelly, DAGScheduler scheduler:DAGScheduler.md#clearCacheLocs[clears the internal cache of RDD partition locations].
is used when DAGSchedulerEventProcessLoop
is requested to handle an ExecutorLost event.
GettingResultEvent Event Handler¶
taskInfo: TaskInfo): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a GettingResultEvent event.
JobCancelled Event Handler¶
jobId: Int,
reason: Option[String]): Unit
looks up the active job for the input job ID (in jobIdToActiveJob internal registry) and fails it and all associated independent stages with failure reason:
Job [jobId] cancelled [reason]
When the input job ID is not found, handleJobCancellation
prints out the following DEBUG message to the logs:
Trying to cancel unregistered job [jobId]
is used when DAGScheduler
is requested to handle a JobCancelled event, doCancelAllJobs, handleJobGroupCancelled, handleStageCancellation.
JobGroupCancelled Event Handler¶
groupId: String): Unit
finds active jobs in a group and cancels them.
Internally, handleJobGroupCancelled
computes all the active jobs (registered in the internal collection of active jobs) that have spark.jobGroup.id
scheduling property set to groupId
then cancels every active job in the group one by one and the cancellation reason:
part of cancelled job group [groupId]
is used when DAGScheduler
is requested to handle JobGroupCancelled event.
Handling JobSubmitted Event¶
jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit
creates a ResultStage (as finalStage
in the picture below) for the given RDD, func
, partitions
, jobId
and callSite
creates an ActiveJob for the ResultStage.
clears the internal cache of RDD partition locations.
FIXME Why is this clearing here so important?
prints out the following INFO messages to the logs (with missingParentStages):
Got job [id] ([callSite]) with [number] output partitions
Final stage: [stage] ([name])
Parents of final stage: [parents]
Missing parents: [missingParentStages]
registers the new ActiveJob
in jobIdToActiveJob and activeJobs internal registries.
requests the ResultStage
to associate itself with the ActiveJob.
uses the jobIdToStageIds internal registry to find all registered stages for the given jobId
. handleJobSubmitted
uses the stageIdToStage internal registry to request the Stages
for the latestInfo.
In the end, handleJobSubmitted
posts a SparkListenerJobStart message to the LiveListenerBus and submits the ResultStage.
is used when DAGSchedulerEventProcessLoop
is requested to handle a JobSubmitted event.
MapStageSubmitted Event Handler¶
jobId: Int,
dependency: ShuffleDependency[_, _, _],
callSite: CallSite,
listener: JobListener,
properties: Properties): Unit
event processing is very similar to <
finds or creates a new ShuffleMapStage
for the input ShuffleDependency and jobId
creates an ActiveJob.
clears the internal cache of RDD partition locations.
FIXME Why is this clearing here so important?
prints out the following INFO messages to the logs:
Got map stage job [id] ([callSite]) with [number] output partitions
Final stage: [stage] ([name])
Parents of final stage: [parents]
Missing parents: [missingParentStages]
registers the new job in jobIdToActiveJob and activeJobs internal registries, and with the final ShuffleMapStage
can have multiple ActiveJob
s registered.
finds all the registered stages for the input jobId
and collects their latest StageInfo
In the end, handleMapStageSubmitted
posts SparkListenerJobStart message to LiveListenerBus and submits the ShuffleMapStage
When the ShuffleMapStage
is available already, handleMapStageSubmitted
marks the job finished.
When handleMapStageSubmitted
could not find or create a ShuffleMapStage
, handleMapStageSubmitted
prints out the following WARN message to the logs.
Creating new stage failed due to exception - job: [id]
notifies listener
about the job failure and exits.
is used when DAGSchedulerEventProcessLoop
is requested to handle a MapStageSubmitted event.
ResubmitFailedStages Event Handler¶
resubmitFailedStages(): Unit
iterates over the internal collection of failed stages and submits them.
does nothing when there are no failed stages reported.
prints out the following INFO message to the logs:
Resubmitting failed stages
clears the internal cache of RDD partition locations and makes a copy of the collection of failed stages to track failed stages afresh.
At this point DAGScheduler has no failed stages reported.
The previously-reported failed stages are sorted by the corresponding job ids in incremental order and resubmitted.
is used when DAGSchedulerEventProcessLoop
is requested to handle a ResubmitFailedStages event.
SpeculativeTaskSubmitted Event Handler¶
handleSpeculativeTaskSubmitted(): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a SpeculativeTaskSubmitted event.
StageCancelled Event Handler¶
handleStageCancellation(): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a StageCancelled event.
TaskSetFailed Event Handler¶
handleTaskSetFailed(): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a TaskSetFailed event.
WorkerRemoved Event Handler¶
workerId: String,
host: String,
message: String): Unit
is used when DAGSchedulerEventProcessLoop
is requested to handle a WorkerRemoved event.
Internal Properties¶
The lookup table of lost executors and the epoch of the event.
Stages that failed due to fetch failures (when a DAGSchedulerEventProcessLoop.md#handleTaskCompletion-FetchFailed[task fails with FetchFailed
The lookup table of ActiveJob
s per job id.
The lookup table of all stages per ActiveJob
nextJobId Counter¶
nextJobId: AtomicInteger
is a Java AtomicInteger for job IDs.
starts at 0
Used when DAGScheduler
is requested for numTotalJobs, to submitJob, runApproximateJob and submitMapStage.
The next stage id counting from 0
Used when DAGScheduler creates a <
The set of stages that are currently "running".
A stage is added when <
A lookup table of ShuffleMapStages by ShuffleDependency
A lookup table of stages by stage ID
Used when DAGScheduler creates a shuffle map stage, creates a result stage, cleans up job state and independent stages, is informed that a task is started, a taskset has failed, a job is submitted (to compute a ResultStage
), a map stage was submitted, a task has completed or a stage was cancelled, updates accumulators, aborts a stage and fails a job and independent stages.
Stages with parents to be computed
Event Posting Methods¶
Posting AllJobsCancelled¶
Posts an AllJobsCancelled
Used when SparkContext
is requested to cancel all running or scheduled Spark jobs
Posting JobCancelled¶
Posts a JobCancelled
Used when SparkContext or JobWaiter are requested to cancel a Spark job
Posting JobGroupCancelled¶
Posts a JobGroupCancelled
Used when SparkContext
is requested to cancel a job group
Posting StageCancelled¶
Posts a StageCancelled
Used when SparkContext
is requested to cancel a stage
Posting ExecutorAdded¶
Posts an ExecutorAdded
Used when TaskSchedulerImpl
is requested to handle resource offers (and a new executor is found in the resource offers)
Posting ExecutorLost¶
Posts a ExecutorLost
Used when TaskSchedulerImpl
is requested to handle a task status update (and a task gets lost which is used to indicate that the executor got broken and hence should be considered lost) or executorLost
Posting JobSubmitted¶
Posts a JobSubmitted
Used when SparkContext
is requested to run an approximate job
Posting SpeculativeTaskSubmitted¶
Posts a SpeculativeTaskSubmitted
Used when TaskSetManager
is requested to checkAndSubmitSpeculatableTask
Posting MapStageSubmitted¶
Posts a MapStageSubmitted
Used when SparkContext
is requested to submit a MapStage for execution
Posting CompletionEvent¶
Posts a CompletionEvent
Used when TaskSetManager
is requested to handleSuccessfulTask, handleFailedTask, and executorLost
Posting GettingResultEvent¶
Posts a GettingResultEvent
Used when TaskSetManager
is requested to handle a task fetching result
Posting TaskSetFailed¶
Posts a TaskSetFailed
Used when TaskSetManager
is requested to abort
Posting BeginEvent¶
Posts a BeginEvent
Used when TaskSetManager
is requested to start a task
Posting WorkerRemoved¶
Posts a WorkerRemoved
Used when TaskSchedulerImpl
is requested to handle a removed worker event
Updating Accumulators of Completed Tasks¶
event: CompletionEvent): Unit
merges the partial values of accumulators from a completed task (based on the given CompletionEvent) into their "source" accumulators on the driver.
For every AccumulatorV2 update (in the given CompletionEvent), updateAccumulators
finds the corresponding accumulator on the driver and requests the AccumulatorV2
to merge the updates.
For named accumulators with the update value being a non-zero value, i.e. not Accumulable.zero
for theAccumulableInfo.id
is setCompletionEvent.taskInfo.accumulables
has a new AccumulableInfo added.
CAUTION: FIXME Where are Stage.latestInfo.accumulables
and CompletionEvent.taskInfo.accumulables
is used when DAGScheduler
is requested to handle a task completion.
Posting SparkListenerTaskEnd (at Task Completion)¶
event: CompletionEvent): Unit
reconstructs task metrics (from the accumulator updates in the CompletionEvent
In the end, postTaskEnd
creates a SparkListenerTaskEnd and requests the LiveListenerBus to post it.
is used when:
is requested to handle a task completion
Enable ALL
logging level for org.apache.spark.scheduler.DAGScheduler
logger to see what happens inside.
Add the following line to conf/log4j.properties
Refer to Logging.