ShuffleMapStage¶
ShuffleMapStage
(shuffle map stage or simply map stage) is one of the two types of stages in a physical execution DAG (beside a ResultStage).
NOTE: The logical DAG or logical execution plan is the RDD lineage.
ShuffleMapStage corresponds to (and is associated with) a <
ShuffleMapStage is created when DAGScheduler is requested to DAGScheduler.md#createShuffleMapStage[plan a ShuffleDependency for execution].
ShuffleMapStage can also be DAGScheduler.md#submitMapStage[submitted independently as a Spark job] for DAGScheduler.md#adaptive-query-planning[Adaptive Query Planning / Adaptive Scheduling].
ShuffleMapStage is an input for the other following stages in the DAG of stages and is also called a shuffle dependency's map side.
Creating Instance¶
ShuffleMapStage takes the following to be created:
- [[id]] Stage ID
- [[rdd]] RDD of the <
> - [[numTasks]] Number of tasks
- [[parents]] Parent Stage.md[stages]
- [[firstJobId]] ID of the ActiveJob that created it
- [[callSite]] CallSite
- [[shuffleDep]] ShuffleDependency
- [[mapOutputTrackerMaster]] MapOutputTrackerMaster
== [[_mapStageJobs]][[mapStageJobs]][[addActiveJob]][[removeActiveJob]] Jobs Registry
ShuffleMapStage keeps track of jobs that were submitted to execute it independently (if any).
The registry is used when DAGScheduler is requested to DAGScheduler.md#markMapStageJobsAsFinished[markMapStageJobsAsFinished] (FIXME: when DAGSchedulerEventProcessLoop.md#handleTaskCompletion[DAGScheduler
is notified that a ShuffleMapTask
has finished successfully] and the task made ShuffleMapStage completed and so marks any map-stage jobs waiting on this stage as finished).
A new job is registered (added) when DAGScheduler is DAGScheduler.md#handleMapStageSubmitted[notified that a ShuffleDependency was submitted for execution (as a MapStageSubmitted event)].
An active job is deregistered (removed) when DAGScheduler is requested to DAGScheduler.md#cleanupStateForJobAndIndependentStages[clean up after a job and independent stages].
== [[isAvailable]][[numAvailableOutputs]] ShuffleMapStage is Available (Fully Computed)
When executed, a ShuffleMapStage saves map output files (for reduce tasks).
When all <
ShuffleMapStage is asked about its availability when DAGScheduler is requested for DAGScheduler.md#getMissingParentStages[missing parent map stages for a stage], DAGScheduler.md#handleMapStageSubmitted[handleMapStageSubmitted], DAGScheduler.md#submitMissingTasks[submitMissingTasks], DAGScheduler.md#handleTaskCompletion[handleTaskCompletion], DAGScheduler.md#markMapStageJobsAsFinished[markMapStageJobsAsFinished], DAGScheduler.md#stageDependsOn[stageDependsOn].
ShuffleMapStage uses the <
== [[findMissingPartitions]] Finding Missing Partitions
[source, scala]¶
findMissingPartitions(): Seq[Int]¶
findMissingPartitions requests the <
If MapOutputTrackerMaster does not track the ShuffleDependency yet, findMissingPartitions simply returns all the Stage.md#numPartitions[partitions] as missing.
findMissingPartitions is part of the Stage.md#findMissingPartitions[Stage] abstraction.
== [[stage-sharing]] ShuffleMapStage Sharing
A ShuffleMapStage can be shared across multiple jobs, if these jobs reuse the same RDDs.
.Skipped Stages are already-computed ShuffleMapStages image::dagscheduler-webui-skipped-stages.png[align="center"]
[source, scala]¶
val rdd = sc.parallelize(0 to 5).map((_,1)).sortByKey() // <1> rdd.count // <2> rdd.count // <3>
<1> Shuffle at sortByKey()
<2> Submits a job with two stages with two being executed <3> Intentionally repeat the last action that submits a new job with two stages with one being shared as already-being-computed