TaskSetManager¶
TaskSetManager
is a <
NOTE: A TaskSet.md[TaskSet] represents a set of Task.md[tasks] that correspond to missing spark-rdd-partitions.md[partitions] of a Stage.md[stage].
TaskSetManager
is <TaskSchedulerImpl
is requested to TaskSchedulerImpl.md#createTaskSetManager[create one] (when submitting tasks for a given TaskSet
).
.TaskSetManager and its Dependencies image::TaskSetManager-TaskSchedulerImpl-TaskSet.png[align="center"]
When <TaskSetManager
<
TaskSetManager
is notified when a task (from the TaskSet
it manages) finishes -- <
TaskSetManager
uses <
1
for local/spark-local.md[local
run mode]maxFailures
in local/spark-local.md#local-with-retries[Spark local-with-retries] (i.e.local[N, maxFailures]
)- configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] configuration property for local/spark-local.md[Spark local-cluster] and spark-cluster.md[Spark clustered] (using Spark Standalone, Mesos and YARN)
The responsibilities of a TaskSetManager
include:
- <
> - <
> - <
>
[TIP]¶
Enable DEBUG
logging level for org.apache.spark.scheduler.TaskSchedulerImpl
(or org.apache.spark.scheduler.cluster.YarnScheduler
for YARN) and org.apache.spark.scheduler.TaskSetManager
and execute the following two-stage job to see their low-level innerworkings.
A cluster manager is recommended since it gives more task localization choices (with YARN additionally supporting rack localization).
$ ./bin/spark-shell --master yarn --conf spark.ui.showConsoleProgress=false
// Keep # partitions low to keep # messages low
scala> sc.parallelize(0 to 9, 3).groupBy(_ % 3).count
INFO YarnScheduler: Adding task set 0.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 0.0: 0
DEBUG TaskSetManager: Valid locality levels for TaskSet 0.0: NO_PREF, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 10.0.2.87, executor 1, partition 0, PROCESS_LOCAL, 7541 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, 10.0.2.87, executor 2, partition 1, PROCESS_LOCAL, 7541 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, 10.0.2.87, executor 1, partition 2, PROCESS_LOCAL, 7598 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NO_PREF, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 518 ms on 10.0.2.87 (executor 1) (1/3)
INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 512 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_0.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 51 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool
INFO YarnScheduler: Adding task set 1.0 with 3 tasks
DEBUG TaskSetManager: Epoch for TaskSet 1.0: 1
DEBUG TaskSetManager: Valid locality levels for TaskSet 1.0: NODE_LOCAL, RACK_LOCAL, ANY
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 3, 10.0.2.87, executor 2, partition 0, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 4, 10.0.2.87, executor 1, partition 1, NODE_LOCAL, 7348 bytes)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 5, 10.0.2.87, executor 1, partition 2, NODE_LOCAL, 7348 bytes)
INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 4) in 130 ms on 10.0.2.87 (executor 1) (1/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 1
DEBUG TaskSetManager: No tasks for locality level NODE_LOCAL, so moving to locality level RACK_LOCAL
DEBUG TaskSetManager: No tasks for locality level RACK_LOCAL, so moving to locality level ANY
INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 3) in 133 ms on 10.0.2.87 (executor 2) (2/3)
DEBUG YarnScheduler: parentName: , name: TaskSet_1.0, runningTasks: 0
INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 5) in 21 ms on 10.0.2.87 (executor 1) (3/3)
INFO YarnScheduler: Removed TaskSet 1.0, whose tasks have all completed, from pool
res0: Long = 3
====
[[internal-registries]] .TaskSetManager's Internal Properties (e.g. Registries, Counters and Flags) [cols="1,3",options="header",width="100%"] |=== | Name | Description
| [[allPendingTasks]] allPendingTasks
| Indices of all the pending tasks to execute (regardless of their localization preferences).
Updated with an task index when TaskSetManager
<
| [[calculatedTasks]] calculatedTasks
| The number of the tasks that have already completed execution.
Starts from 0
when a <
| [[copiesRunning]] copiesRunning
| The number of task copies currently running per task (index in its task set).
The number of task copies of a task is increased when <
[[currentLocalityIndex]] currentLocalityIndex |
---|
| [[epoch]] epoch
| Current scheduler:MapOutputTracker.md#getEpoch[map output tracker epoch].
| [[failedExecutors]] failedExecutors
| Lookup table of TaskInfo indices that failed to executor ids and the time of the failure.
Used in <
| [[isZombie]] isZombie
| Disabled, i.e. false
, by default.
Read <
[[lastLaunchTime]] lastLaunchTime |
---|
[[localityWaits]] localityWaits |
---|
| [[myLocalityLevels]] myLocalityLevels
| scheduler:TaskSchedulerImpl.md#TaskLocality[TaskLocality
locality preferences] of the pending tasks in the <PROCESS_LOCAL
through NODE_LOCAL
, NO_PREF
, and RACK_LOCAL
to ANY
.
NOTE: myLocalityLevels
may contain only a few of all the available TaskLocality
preferences with ANY
as a mandatory task locality preference.
<
<
[[name]] name |
---|
| [[numFailures]] numFailures
| Array of the number of task failures per <
Incremented when TaskSetManager
<
| [[numTasks]] numTasks
| Number of <
| [[pendingTasksForExecutor]] pendingTasksForExecutor
| Lookup table of the indices of tasks pending execution per executor.
Updated with an task index and executor when TaskSetManager
<ExecutorCacheTaskLocation
or HDFSCacheTaskLocation
).
| [[pendingTasksForHost]] pendingTasksForHost
| Lookup table of the indices of tasks pending execution per host.
Updated with an task index and host when TaskSetManager
<
| [[pendingTasksForRack]] pendingTasksForRack
| Lookup table of the indices of tasks pending execution per rack.
Updated with an task index and rack when TaskSetManager
<
| [[pendingTasksWithNoPrefs]] pendingTasksWithNoPrefs
| Lookup table of the indices of tasks pending execution with no location preferences.
Updated with an task index when TaskSetManager
<
[[priority]] priority |
---|
[[recentExceptions]] recentExceptions |
---|
| [[runningTasksSet]] runningTasksSet
| Collection of running tasks that a TaskSetManager
manages.
Used to implement <runningTasksSet
but a required part of any spark-scheduler-Schedulable.md#contract[Schedulable]). runningTasksSet
is expanded when <
Used in scheduler:TaskSchedulerImpl.md#cancelTasks[TaskSchedulerImpl
to cancel tasks].
[[speculatableTasks]] speculatableTasks |
---|
| [[stageId]] stageId
| The stage's id a TaskSetManager
runs for.
Set when <
NOTE: stageId
is part of spark-scheduler-Schedulable.md#contract[Schedulable contract].
| [[successful]] successful
| Status of <true
or false
, per task).
All tasks start with their flags disabled, i.e. false
, when <
The flag for a task is turned on, i.e. true
, when a task finishes <
A flag is explicitly turned off only for <
| [[taskAttempts]] taskAttempts
| Registry of TaskInfos per every task attempt per task.
| [[taskInfos]] taskInfos
| Registry of TaskInfos per task id.
Updated with the task (id) and the corresponding TaskInfo
when TaskSetManager
<
NOTE: It appears that the entires stay forever, i.e. are never removed (perhaps because the maintenance overhead is not needed given a TaskSetManager
is a short-lived entity).
| [[tasks]] tasks
| Lookup table of scheduler:Task.md[Tasks] (per partition id) to schedule execution of.
NOTE: The tasks all belong to a single <
[[tasksSuccessful]] tasksSuccessful |
---|
| [[totalResultSize]] totalResultSize
| The current total size of the result of all the tasks that have finished.
Starts from 0
when <
Only increased with the size of a task result whenever a TaskSetManager
<
[[logging]] [TIP] ==== Enable DEBUG
logging level for org.apache.spark.scheduler.TaskSetManager
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.TaskSetManager=DEBUG
Refer to spark-logging.md[Logging].¶
spark.driver.maxResultSize¶
TaskSetManager
uses spark.driver.maxResultSize configuration property to check available memory for more task results.
=== [[isTaskBlacklistedOnExecOrNode]] isTaskBlacklistedOnExecOrNode
Internal Method
[source, scala]¶
isTaskBlacklistedOnExecOrNode( index: Int, execId: String, host: String): Boolean
isTaskBlacklistedOnExecOrNode
...FIXME
NOTE: isTaskBlacklistedOnExecOrNode
is used when TaskSetManager
is requested to <
=== [[getLocalityIndex]] getLocalityIndex
Method
[source, scala]¶
getLocalityIndex(locality: TaskLocality.TaskLocality): Int¶
getLocalityIndex
...FIXME
NOTE: getLocalityIndex
is used when TaskSetManager
is requested to <
=== [[dequeueSpeculativeTask]] dequeueSpeculativeTask
Internal Method
[source, scala]¶
dequeueSpeculativeTask( execId: String, host: String, locality: TaskLocality.Value): Option[(Int, TaskLocality.Value)]
dequeueSpeculativeTask
...FIXME
NOTE: dequeueSpeculativeTask
is used exclusively when TaskSetManager
is requested to <
=== [[executorAdded]] executorAdded
Method
[source, scala]¶
executorAdded(): Unit¶
executorAdded
simply <
NOTE: executorAdded
is used exclusively when TaskSchedulerImpl
is requested to scheduler:TaskSchedulerImpl.md#resourceOffers[resourceOffers].
=== [[abortIfCompletelyBlacklisted]] abortIfCompletelyBlacklisted
Internal Method
[source, scala]¶
abortIfCompletelyBlacklisted( hostToExecutors: HashMap[String, HashSet[String]]): Unit
abortIfCompletelyBlacklisted
...FIXME
NOTE: abortIfCompletelyBlacklisted
is used exclusively when TaskSchedulerImpl
is requested to scheduler:TaskSchedulerImpl.md#resourceOffers[resourceOffers].
=== [[schedulable]] TaskSetManager is Schedulable
TaskSetManager
is a spark-scheduler-Schedulable.md[Schedulable] with the following implementation:
name
isTaskSet_[taskSet.stageId.toString]
-
no
parent
is ever assigned, i.e. it is alwaysnull
. + It means that it can only be a leaf in the tree of Schedulables (with spark-scheduler-Pool.md[Pools] being the nodes). -
schedulingMode
always returnsSchedulingMode.NONE
(since there is nothing to schedule). weight
is always1
.minShare
is always0
.runningTasks
is the number of running tasks in the internalrunningTasksSet
.priority
is the priority of the owned scheduler:TaskSet.md[TaskSet] (usingtaskSet.priority
).-
stageId
is the stage id of the owned scheduler:TaskSet.md[TaskSet] (usingtaskSet.stageId
). -
schedulableQueue
returns no queue, i.e.null
. addSchedulable
andremoveSchedulable
do nothing.-
getSchedulableByName
always returnsnull
. -
getSortedTaskSetQueue
returns a one-element collection with the sole element being itself. -
<
> - <
>
=== [[handleTaskGettingResult]] Marking Task As Fetching Indirect Result -- handleTaskGettingResult
Method
[source, scala]¶
handleTaskGettingResult(tid: Long): Unit¶
handleTaskGettingResult
finds TaskInfo for tid
task in <DAGScheduler
].
NOTE: handleTaskGettingResult
is executed when scheduler:TaskSchedulerImpl.md#handleTaskGettingResult[TaskSchedulerImpl
is notified about fetching indirect task result].
=== [[addRunningTask]] Registering Running Task -- addRunningTask
Method
[source, scala]¶
addRunningTask(tid: Long): Unit¶
addRunningTask
adds tid
to <parent
pool to increase the number of running tasks] (if defined).
=== [[removeRunningTask]] Unregistering Running Task -- removeRunningTask
Method
[source, scala]¶
removeRunningTask(tid: Long): Unit¶
removeRunningTask
removes tid
from <parent
pool to decrease the number of running task] (if defined).
=== [[checkSpeculatableTasks]] Checking Speculatable Tasks -- checkSpeculatableTasks
Method
[source, scala]¶
checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean¶
NOTE: checkSpeculatableTasks
is part of the spark-scheduler-Schedulable.md#contract[Schedulable Contract].
checkSpeculatableTasks
checks whether there are speculatable tasks in a TaskSet
.
NOTE: checkSpeculatableTasks
is called when for speculative-execution-of-tasks.md[].
If the TaskSetManager is <
The method goes on with the assumption of no speculatable tasks by default.
It computes the minimum number of finished tasks for speculation (as configuration-properties.md#spark.speculation.quantile[spark.speculation.quantile] of all the finished tasks).
You should see the DEBUG message in the logs:
DEBUG Checking for speculative tasks: minFinished = [minFinishedForSpeculation]
It then checks whether the number is equal or greater than the number of tasks completed successfully (using tasksSuccessful
).
Having done that, it computes the median duration of all the successfully completed tasks (using <100
.
You should see the DEBUG message in the logs:
DEBUG Task length threshold for speculation: [threshold]
For each task (using <successful
) for which there is only one copy running (using copiesRunning
) and the task takes more time than the calculated threshold, but it was not in speculatableTasks
it is assumed speculatable.
You should see the following INFO message in the logs:
INFO Marking task [index] in stage [taskSet.id] (on [info.host]) as speculatable because it ran more than [threshold] ms
The task gets added to the internal speculatableTasks
collection. The method responds positively.
=== [[getAllowedLocalityLevel]] getAllowedLocalityLevel
Internal Method
[source, scala]¶
getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality¶
getAllowedLocalityLevel
...FIXME
NOTE: getAllowedLocalityLevel
is used exclusively when TaskSetManager
is requested to <
=== [[resourceOffer]] Finding Task For Execution (Given Resource Offer) -- resourceOffer
Method
[source, scala]¶
resourceOffer( execId: String, host: String, maxLocality: TaskLocality): Option[TaskDescription]
(only if <resourceOffer
requests TaskSetBlacklist
to check if the input spark-scheduler-TaskSetBlacklist.md#isExecutorBlacklistedForTaskSet[execId
executor] or spark-scheduler-TaskSetBlacklist.md#isNodeBlacklistedForTaskSet[host
node] are blacklisted.
When TaskSetManager
is a <resourceOffer
finds no tasks to execute (and returns no TaskDescription).
NOTE: resourceOffer
finds a task to schedule for a resource offer when neither TaskSetManager
is a <
resourceOffer
calculates the allowed task locality for task selection. When the input maxLocality
is not NO_PREF
task locality, resourceOffer
<
NOTE: scheduler:TaskSchedulerImpl.md[TaskLocality] can be the most localized PROCESS_LOCAL
, NODE_LOCAL
through NO_PREF
and RACK_LOCAL
to ANY
.
resourceOffer
<
If a task (index) is found, resourceOffer
takes the scheduler:Task.md[Task] (from <
resourceOffer
scheduler:TaskSchedulerImpl.md#newTaskId[requests TaskSchedulerImpl
for the id for the new task].
resourceOffer
increments the <
resourceOffer
creates a TaskInfo that is then registered in <
If the maximum acceptable task locality is not NO_PREF
, resourceOffer
<
resourceOffer
serializes the task.
NOTE: resourceOffer
uses core:SparkEnv.md#closureSerializer[SparkEnv
to access the closure Serializer
] and serializer:Serializer.md#newInstance[create an instance thereof].
If the task serialization fails, you should see the following ERROR message in the logs:
Failed to serialize task [taskId], not attempting to retry it.
resourceOffer
<TaskNotSerializableException
.
[options="wrap"]¶
Failed to serialize task [taskId], not attempting to retry it. Exception during serialization: [exception]¶
resourceOffer
checks the size of the serialized task. If it is greater than 100
kB, you should see the following WARN message in the logs:
[options="wrap"]¶
WARN Stage [id] contains a task of very large size ([size] KB). The maximum recommended task size is 100 KB.¶
NOTE: The size of the serializable task, i.e. 100
kB, is not configurable.
If however the serialization went well and the size is fine too, resourceOffer
<
You should see the following INFO message in the logs:
[options="wrap"]¶
INFO TaskSetManager: Starting [name] (TID [id], [host], executor [id], partition [id], [taskLocality], [size] bytes)¶
For example:
[options="wrap"]¶
INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1, PROCESS_LOCAL, 2054 bytes)¶
resourceOffer
scheduler:DAGScheduler.md#taskStarted[notifies DAGScheduler
that the task has been started].
IMPORTANT: This is the moment when TaskSetManager
informs DAGScheduler
that a task has started.
NOTE: resourceOffer
is used exclusively when TaskSchedulerImpl
is requested to scheduler:TaskSchedulerImpl.md#resourceOfferSingleTaskSet[resourceOfferSingleTaskSet].
=== [[dequeueTask]] Dequeueing Task For Execution (Given Locality Information) -- dequeueTask
Internal Method
[source, scala]¶
dequeueTask(execId: String, host: String, maxLocality: TaskLocality): Option[(Int, TaskLocality, Boolean)]¶
dequeueTask
tries to <dequeueTask
returns its index, PROCESS_LOCAL
task locality and the speculative marker disabled.
dequeueTask
then goes over all the possible scheduler:TaskSchedulerImpl.md#TaskLocality[task localities] and checks what locality is allowed given the input maxLocality
.
dequeueTask
checks out NODE_LOCAL
, NO_PREF
, RACK_LOCAL
and ANY
in that order.
For NODE_LOCAL
dequeueTask
tries to <NODE_LOCAL
task locality and the speculative marker disabled.
For NO_PREF
dequeueTask
tries to <PROCESS_LOCAL
task locality and the speculative marker disabled.
NOTE: For NO_PREF
the task locality is PROCESS_LOCAL
.
For RACK_LOCAL
dequeueTask
scheduler:TaskSchedulerImpl.md#getRackForHost[finds the rack for the input host
] and if available tries to <dequeueTask
returns its index, RACK_LOCAL
task locality and the speculative marker disabled.
For ANY
dequeueTask
tries to <ANY
task locality and the speculative marker disabled.
In the end, when no task could be found, dequeueTask
<
NOTE: The speculative marker is enabled for a task only when dequeueTask
did not manage to find a task for the available task localities and did find a speculative task.
NOTE: dequeueTask
is used exclusively when TaskSetManager
is requested to <
=== [[dequeueTaskFromList]] Finding Higest Task Index (Not Blacklisted, With No Copies Running and Not Completed Already) -- dequeueTaskFromList
Internal Method
[source, scala]¶
dequeueTaskFromList( execId: String, host: String, list: ArrayBuffer[Int]): Option[Int]
dequeueTaskFromList
takes task indices from the input list
backwards (from the last to the first entry). For every index dequeueTaskFromList
checks if it is not <host
>> and if not, checks that:
-
<
> is 0
-
the task has not been marked as <
>
If so, dequeueTaskFromList
returns the task index.
If dequeueTaskFromList
has checked all the indices and no index has passed the checks, dequeueTaskFromList
returns None
(to indicate that no index has met the requirements).
NOTE: dequeueTaskFromList
is used exclusively when TaskSetManager
is requested to <
=== [[getPendingTasksForExecutor]] Finding Tasks (Indices) Registered For Execution on Executor -- getPendingTasksForExecutor
Internal Method
[source, scala]¶
getPendingTasksForExecutor(executorId: String): ArrayBuffer[Int]¶
getPendingTasksForExecutor
finds pending tasks (indices) registered for execution on the input executorId
executor (in <
NOTE: getPendingTasksForExecutor
may find no matching tasks and return an empty collection.
NOTE: getPendingTasksForExecutor
is used exclusively when TaskSetManager
is requested to <
=== [[getPendingTasksForHost]] Finding Tasks (Indices) Registered For Execution on Host -- getPendingTasksForHost
Internal Method
[source, scala]¶
getPendingTasksForHost(host: String): ArrayBuffer[Int]¶
getPendingTasksForHost
finds pending tasks (indices) registered for execution on the input host
host (in <
NOTE: getPendingTasksForHost
may find no matching tasks and return an empty collection.
NOTE: getPendingTasksForHost
is used exclusively when TaskSetManager
is requested to <
=== [[getPendingTasksForRack]] Finding Tasks (Indices) Registered For Execution on Rack -- getPendingTasksForRack
Internal Method
[source, scala]¶
getPendingTasksForRack(rack: String): ArrayBuffer[Int]¶
getPendingTasksForRack
finds pending tasks (indices) registered for execution on the input rack
rack (in <
NOTE: getPendingTasksForRack
may find no matching tasks and return an empty collection.
NOTE: getPendingTasksForRack
is used exclusively when TaskSetManager
is requested to <
=== [[scheduling-tasks]] Scheduling Tasks in TaskSet
CAUTION: FIXME
For each submitted <TaskSetManager
is created. The TaskSetManager completely and exclusively owns a TaskSet submitted for execution.
CAUTION: FIXME A picture with TaskSetManager
owning TaskSet
CAUTION: FIXME What component knows about TaskSet and TaskSetManager. Isn't it that TaskSets are created by DAGScheduler while TaskSetManager is used by TaskSchedulerImpl only?
TaskSetManager keeps track of the tasks pending execution per executor, host, rack or with no locality preferences.
=== [[locality-aware-scheduling]] Locality-Aware Scheduling aka Delay Scheduling
TaskSetManager computes locality levels for the TaskSet for delay scheduling. While computing you should see the following DEBUG in the logs:
DEBUG Valid locality levels for [taskSet]: [levels]
CAUTION: FIXME What's delay scheduling?
=== [[events]] Events
Once a task has finished, TaskSetManager
informs scheduler:DAGScheduler.md#taskEnded[DAGScheduler].
CAUTION: FIXME
=== [[handleSuccessfulTask]] Recording Successful Task And Notifying DAGScheduler -- handleSuccessfulTask
Method
[source, scala]¶
handleSuccessfulTask( tid: Long, result: DirectTaskResult[_]): Unit
handleSuccessfulTask
records the tid
task as finished, scheduler:DAGScheduler.md#taskEnded[notifies the DAGScheduler
that the task has ended] and <
NOTE: handleSuccessfulTask
is executed after scheduler:TaskSchedulerImpl.md#handleSuccessfulTask[TaskSchedulerImpl
has been informed that tid
task finished successfully (and the task result was deserialized)].
Internally, handleSuccessfulTask
finds TaskInfo (in <FINISHED
.
It then removes tid
task from <
handleSuccessfulTask
scheduler:DAGScheduler.md#taskEnded[notifies DAGScheduler
that tid
task ended successfully] (with the Task
object from <Success
).
At this point, handleSuccessfulTask
finds the other <tid
task and scheduler:SchedulerBackend.md#killTask[requests SchedulerBackend
to kill them] (since they are no longer necessary now when at least one task attempt has completed successfully). You should see the following INFO message in the logs:
[options="wrap"]¶
INFO Killing attempt [attemptNumber] for task [id] in stage [id] (TID [id]) on [host] as the attempt [attemptNumber] succeeded on [host]¶
CAUTION: FIXME Review taskAttempts
If tid
has not yet been recorded as <handleSuccessfulTask
increases <
[options="wrap"]¶
INFO Finished task [id] in stage [id] (TID [taskId]) in [duration] ms on [host] (executor [executorId]) ([tasksSuccessful]/[numTasks])¶
tid
task is marked as <TaskSet
), the TaskSetManager
becomes a <
If tid
task was already recorded as <
[options="wrap"]¶
INFO Ignoring task-finished event for [id] in stage [id] because task [index] has already completed successfully¶
Ultimately, handleSuccessfulTask
<
NOTE: handleSuccessfulTask
is used exclusively when TaskSchedulerImpl
is requested to scheduler:TaskSchedulerImpl.md#handleSuccessfulTask[handleSuccessfulTask].
=== [[maybeFinishTaskSet]] Attempting to Mark TaskSet Finished -- maybeFinishTaskSet
Internal Method
[source, scala]¶
maybeFinishTaskSet(): Unit¶
maybeFinishTaskSet
scheduler:TaskSchedulerImpl.md#taskSetFinished[notifies TaskSchedulerImpl
that a TaskSet
has finished] when there are no other <
=== [[task-retries]] Retrying Tasks on Failure
CAUTION: FIXME
Up to configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] attempts
=== Task retries and spark.task.maxFailures
When you start Spark program you set up configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] for the number of failures that are acceptable until TaskSetManager
gives up and marks a job failed.
TIP: In Spark shell with local master, configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] is fixed to 1
and you need to use local/spark-local.md[local-with-retries master] to change it to some other value.
In the following example, you are going to execute a job with two partitions and keep one failing at all times (by throwing an exception). The aim is to learn the behavior of retrying task execution in a stage in TaskSet. You will only look at a single task execution, namely 0.0
.
$ ./bin/spark-shell --master "local[*, 5]"
...
scala> sc.textFile("README.md", 2).mapPartitionsWithIndex((idx, it) => if (idx == 0) throw new Exception("Partition 2 marked failed") else it).count
...
15/10/27 17:24:56 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 1 (MapPartitionsRDD[7] at mapPartitionsWithIndex at <console>:25)
15/10/27 17:24:56 DEBUG DAGScheduler: New pending partitions: Set(0, 1)
15/10/27 17:24:56 INFO TaskSchedulerImpl: Adding task set 1.0 with 2 tasks
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 2, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
...
15/10/27 17:24:56 INFO Executor: Running task 0.0 in stage 1.0 (TID 2)
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 2)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 4, localhost, partition 0,PROCESS_LOCAL, 2062 bytes)
15/10/27 17:24:56 INFO Executor: Running task 0.1 in stage 1.0 (TID 4)
15/10/27 17:24:56 INFO HadoopRDD: Input split: file:/Users/jacek/dev/oss/spark/README.md:0+1784
15/10/27 17:24:56 ERROR Executor: Exception in task 0.1 in stage 1.0 (TID 4)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 ERROR Executor: Exception in task 0.4 in stage 1.0 (TID 7)
java.lang.Exception: Partition 2 marked failed
...
15/10/27 17:24:56 INFO TaskSetManager: Lost task 0.4 in stage 1.0 (TID 7) on executor localhost: java.lang.Exception (Partition 2 marked failed) [duplicate 4]
15/10/27 17:24:56 ERROR TaskSetManager: Task 0 in stage 1.0 failed 5 times; aborting job
15/10/27 17:24:56 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
15/10/27 17:24:56 INFO TaskSchedulerImpl: Cancelling stage 1
15/10/27 17:24:56 INFO DAGScheduler: ResultStage 1 (count at <console>:25) failed in 0.058 s
15/10/27 17:24:56 DEBUG DAGScheduler: After removal of stage 1, remaining stages = 0
15/10/27 17:24:56 INFO DAGScheduler: Job 1 failed: count at <console>:25, took 0.085810 s
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 5 times, most recent failure: Lost task 0.4 in stage 1.0 (TID 7, localhost): java.lang.Exception: Partition 2 marked failed
=== [[zombie-state]] Zombie state
A TaskSetManager
is in zombie state when all tasks in a taskset have completed successfully (regardless of the number of task attempts), or if the taskset has been <
While in zombie state, a TaskSetManager
can launch no new tasks and <
A TaskSetManager
remains in the zombie state until all tasks have finished running, i.e. to continue to track and account for the running tasks.
=== [[abort]] Aborting TaskSet -- abort
Method
[source, scala]¶
abort( message: String, exception: Option[Throwable] = None): Unit
abort
informs scheduler:DAGScheduler.md#taskSetFailed[DAGScheduler
that the TaskSet
has been aborted].
CAUTION: FIXME image with DAGScheduler call
The TaskSetManager enters <
In the end, abort
<
abort
is used when:
-
TaskResultGetter
is requested to scheduler:TaskResultGetter.md#enqueueSuccessfulTask[enqueueSuccessfulTask] (that has failed) -
TaskSchedulerImpl
is requested to scheduler:TaskSchedulerImpl.md#cancelTasks[cancelTasks] and scheduler:TaskSchedulerImpl.md#error[error] -
TaskSetManager
is requested to <>, < >, < >, and < > -
DriverEndpoint
is requested to launch tasks on executors
Creating Instance¶
TaskSetManager
takes the following to be created:
- [[sched]] TaskSchedulerImpl.md[TaskSchedulerImpl]
- [[taskSet]] TaskSet.md[TaskSet]
- [[maxTaskFailures]] Number of task failures, i.e. how many times a <
> before an entire TaskSet is < > - [[blacklistTracker]] (optional) BlacklistTracker (default:
None
) - [[clock]]
Clock
(default:SystemClock
)
TaskSetManager
initializes the <
NOTE: maxTaskFailures
is 1
for local
run mode, maxFailures
for Spark local-with-retries, and configuration-properties.md#spark.task.maxFailures[spark.task.maxFailures] configuration property for Spark local-cluster and Spark with cluster managers (Spark Standalone, Mesos and YARN).
TaskSetManager
MapOutputTracker.md#getEpoch[requests the current epoch from MapOutputTracker
] and sets it on all tasks in the taskset.
NOTE: TaskSetManager
uses <MapOutputTracker
].
You should see the following DEBUG in the logs:
DEBUG Epoch for [taskSet]: [epoch]
CAUTION: FIXME Why is the epoch important?
NOTE: TaskSetManager
requests TaskSchedulerImpl.md#mapOutputTracker[MapOutputTracker
from TaskSchedulerImpl
] which is likely for unit testing only since core:SparkEnv.md#mapOutputTracker[MapOutputTracker
is available using SparkEnv
].
TaskSetManager
<
CAUTION: FIXME Why is reverse order important? The code says it's to execute tasks with low indices first.
=== [[handleFailedTask]] Getting Notified that Task Failed -- handleFailedTask
Method
[source, scala]¶
handleFailedTask( tid: Long, state: TaskState, reason: TaskFailedReason): Unit
handleFailedTask
finds TaskInfo of tid
task in <
.TaskSetManager Gets Notified that Task Has Failed image::TaskSetManager-handleFailedTask.png[align="center"]
NOTE: handleFailedTask
is executed after TaskSchedulerImpl.md#handleFailedTask[TaskSchedulerImpl
has been informed that tid
task failed] or <
handleFailedTask
<TaskInfo
as finished (passing in the input state
).
handleFailedTask
decrements the number of the running copies of tid
task (in <
NOTE: With speculative-execution-of-tasks.md[] enabled, there can be many copies of a task running simultaneuosly.
handleFailedTask
uses the following pattern as the reason of the failure:
Lost task [id] in stage [taskSetId] (TID [tid], [host], executor [executorId]): [reason]
handleFailedTask
then calculates the failure exception per the input reason
(follow the links for more details):
- <
> - <
> - <
> - <
>
NOTE: Description of how the final failure exception is "computed" was moved to respective sections below to make the reading slightly more pleasant and comprehensible.
handleFailedTask
DAGScheduler.md#taskEnded[informs DAGScheduler
that tid
task has ended] (passing on the Task
instance from <reason
, null
result, calculated accumUpdates
per failure, and the TaskInfo).
IMPORTANT: This is the moment when TaskSetManager
informs DAGScheduler
that a task has ended.
If tid
task has already been marked as completed (in <
[options="wrap"]¶
INFO Task [id] in stage [id] (TID [tid]) failed, but the task will not be re-executed (either because the task failed with a shuffle data fetch failure, so the previous stage needs to be re-run, or because a different copy of the task has already succeeded).¶
TIP: Read up on speculative-execution-of-tasks.md[] to find out why a single task could be executed multiple times.
If however tid
task was not recorded as <handleFailedTask
<
If the TaskSetManager
is not a <reason
should be counted towards the maximum number of times the task is allowed to fail before the stage is aborted (i.e. TaskFailedReason.countTowardsTaskFailures
attribute is enabled), the optional spark-scheduler-TaskSetBlacklist.md#updateBlacklistForFailedTask[TaskSetBlacklist
is notified] (passing on the host, executor and the task's index). handleFailedTask
then increments the <tid
task and checks if the number of failures is equal or greater than the <
If so, i.e. the number of task failures of tid
task reached the maximum value, you should see the following ERROR message in the logs:
ERROR Task [id] in stage [id] failed [maxTaskFailures] times; aborting job
And handleFailedTask
<
Task [index] in stage [id] failed [maxTaskFailures] times, most recent failure: [failureReason]
In the end (except when the number of failures of tid
task grew beyond the acceptable number), handleFailedTask
<
[NOTE]¶
handleFailedTask
is used when:
TaskSchedulerImpl
is requested to TaskSchedulerImpl.md#handleFailedTask[handle a failed task]
* TaskSetManager
is requested to <> and <>¶
==== [[handleFailedTask-FetchFailed]] FetchFailed
TaskFailedReason
For FetchFailed
you should see the following WARN message in the logs:
WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]
Unless tid
has already been marked as successful (in <
The TaskSetManager
enters <
The failure exception is empty.
==== [[handleFailedTask-ExceptionFailure]] ExceptionFailure
TaskFailedReason
For ExceptionFailure
, handleFailedTask
checks if the exception is of type NotSerializableException
. If so, you should see the following ERROR message in the logs:
ERROR Task [id] in stage [id] (TID [tid]) had a not serializable result: [description]; not retrying
And handleFailedTask
<
Otherwise, if the exception is not of type NotSerializableException
, handleFailedTask
accesses accumulators and calculates whether to print the WARN message (with the failure reason) or the INFO message.
If the failure has already been reported (and is therefore a duplication), configuration-properties.md#spark.logging.exceptionPrintInterval[spark.logging.exceptionPrintInterval] is checked before reprinting the duplicate exception in its entirety.
For full printout of the ExceptionFailure
, the following WARN appears in the logs:
WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]
Otherwise, the following INFO appears in the logs:
INFO Lost task [id] in stage [id] (TID [tid]) on [host], executor [id]: [className] ([description]) [duplicate [dupCount]]
The exception in ExceptionFailure
becomes the failure exception.
==== [[handleFailedTask-ExecutorLostFailure]] ExecutorLostFailure
TaskFailedReason
For ExecutorLostFailure
if not exitCausedByApp
, you should see the following INFO in the logs:
INFO Task [tid] failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.
The failure exception is empty.
==== [[handleFailedTask-TaskFailedReason]] Other TaskFailedReasons
For the other TaskFailedReasons, you should see the following WARN message in the logs:
WARN Lost task [id] in stage [id] (TID [tid], [host], executor [id]): [reason]
The failure exception is empty.
=== [[addPendingTask]] Registering Task As Pending Execution (Per Preferred Locations) -- addPendingTask
Internal Method
[source, scala]¶
addPendingTask(index: Int): Unit¶
addPendingTask
registers a index
task in the pending-task lists that the task should be eventually scheduled to (per its preferred locations).
Internally, addPendingTask
takes the Task.md#preferredLocations[preferred locations of the task] (given index
) and registers the task in the internal pending-task registries for every preferred location:
- <
> when the TaskLocation.md[TaskLocation] is ExecutorCacheTaskLocation
. - <
> for the hosts of a TaskLocation.md[TaskLocation]. - <
> for the TaskSchedulerImpl.md#getRackForHost[racks from TaskSchedulerImpl
per the host] (of a TaskLocation.md[TaskLocation]).
For a TaskLocation.md[TaskLocation] being HDFSCacheTaskLocation
, addPendingTask
TaskSchedulerImpl.md#getExecutorsAliveOnHost[requests TaskSchedulerImpl
for the executors on the host] (of a preferred location) and registers the task in <
You should see the following INFO message in the logs:
INFO Pending task [index] has a cached location at [host] , where there are executors [executors]
When addPendingTask
could not find executors for a HDFSCacheTaskLocation
preferred location, you should see the following DEBUG message in the logs:
DEBUG Pending task [index] has a cached location at [host] , but there are no executors alive there.
If the task has no location preferences, addPendingTask
registers it in <
addPendingTask
always registers the task in <
NOTE: addPendingTask
is used immediatelly when TaskSetManager
<
=== [[executorLost]] Re-enqueuing ShuffleMapTasks (with no ExternalShuffleService) and Reporting All Running Tasks on Lost Executor as Failed -- executorLost
Method
[source, scala]¶
executorLost(execId: String, host: String, reason: ExecutorLossReason): Unit¶
executorLost
re-enqueues all the ShuffleMapTask.md[ShuffleMapTasks] that have completed already on the lost executor (when external shuffle service is not in use) and <
NOTE: executorLost
is part of the spark-scheduler-Schedulable.md#contract[Schedulable contract] that TaskSchedulerImpl.md#removeExecutor[TaskSchedulerImpl
uses to inform TaskSetManagers
about lost executors].
NOTE: Since TaskSetManager
manages execution of the tasks in a single TaskSet.md[TaskSet], when an executor gets lost, the affected tasks that have been running on the failed executor need to be re-enqueued. executorLost
is the mechanism to "announce" the event to all TaskSetManagers
.
Internally, executorLost
first checks whether the <
NOTE: executorLost
checks out the first task in <
NOTE: executorLost
uses core:SparkEnv.md#blockManager[SparkEnv
to access the current BlockManager
] and finds out whether an storage:BlockManager.md#externalShuffleServiceEnabled[external shuffle service is enabled] or not (based on spark.shuffle.service.enabled configuration property).
If executorLost
is indeed due to an executor lost that executed tasks for a ShuffleMapStage.md[ShuffleMapStage] (that this TaskSetManager
manages) and no external shuffle server is enabled, executorLost
finds <
NOTE: executorLost
uses records every tasks on the lost executor in <false
) and decrements <
executorLost
<DAGScheduler
that the tasks (on the lost executor) have ended] (with DAGScheduler.md#handleTaskCompletion-Resubmitted[Resubmitted] reason).
NOTE: executorLost
uses TaskSchedulerImpl.md#dagScheduler[TaskSchedulerImpl
to access the DAGScheduler
]. TaskSchedulerImpl
is given when the <
Regardless of whether this TaskSetManager
manages ShuffleMapTasks
or not (it could also manage ResultTask.md[ResultTasks]) and whether the external shuffle service is used or not, executorLost
finds all <FAILED
).
NOTE: executorLost
finds out if the reason for the executor lost is due to application fault, i.e. assumes ExecutorExited
's exit status as the indicator, ExecutorKilled
for non-application's fault and any other reason is an application fault.
executorLost
<
=== [[recomputeLocality]] Recomputing Task Locality Preferences -- recomputeLocality
Method
[source, scala]¶
recomputeLocality(): Unit¶
recomputeLocality
recomputes the internal caches: <
CAUTION: FIXME But why are the caches important (and have to be recomputed)?
recomputeLocality
records the current TaskSchedulerImpl.md#TaskLocality[TaskLocality] level of this TaskSetManager
(that is <
NOTE: TaskLocality
is one of PROCESS_LOCAL
, NODE_LOCAL
, NO_PREF
, RACK_LOCAL
and ANY
values.
recomputeLocality
<
recomputeLocality
computes <
In the end, recomputeLocality
<
NOTE: recomputeLocality
is used when TaskSetManager
gets notified about status change in executors, i.e. when an executor is <
=== [[computeValidLocalityLevels]] Computing Locality Levels (for Scheduled Tasks) -- computeValidLocalityLevels
Internal Method
[source, scala]¶
computeValidLocalityLevels(): Array[TaskLocality]¶
computeValidLocalityLevels
computes valid locality levels for tasks that were registered in corresponding registries per locality level.
NOTE: TaskSchedulerImpl.md[TaskLocality] is a task locality preference and can be the most localized PROCESS_LOCAL
, NODE_LOCAL
through NO_PREF
and RACK_LOCAL
to ANY
.
.TaskLocalities and Corresponding Internal Registries [cols="1,2",options="header",width="100%"] |=== | TaskLocality | Internal Registry
| PROCESS_LOCAL
| <NODE_LOCAL
| <NO_PREF
| <RACK_LOCAL
| <
|===
computeValidLocalityLevels
walks over every internal registry and if it is not empty <TaskLocality
and proceeds with it only when the locality wait is not 0
.
For TaskLocality
with pending tasks, computeValidLocalityLevels
asks TaskSchedulerImpl
whether there is at least one executor alive (for TaskSchedulerImpl.md#isExecutorAlive[PROCESS_LOCAL], TaskSchedulerImpl.md#hasExecutorsAliveOnHost[NODE_LOCAL] and TaskSchedulerImpl.md#hasHostAliveOnRack[RACK_LOCAL]) and if so registers the TaskLocality
.
NOTE: computeValidLocalityLevels
uses <
computeValidLocalityLevels
always registers ANY
task locality level.
In the end, you should see the following DEBUG message in the logs:
DEBUG TaskSetManager: Valid locality levels for [taskSet]: [comma-separated levels]
NOTE: computeValidLocalityLevels
is used when TaskSetManager
<
=== [[getLocalityWait]] Finding Locality Wait -- getLocalityWait
Internal Method
[source, scala]¶
getLocalityWait(level: TaskLocality): Long¶
getLocalityWait
finds locality wait (in milliseconds) for a given TaskSchedulerImpl.md#TaskLocality[TaskLocality].
getLocalityWait
uses configuration-properties.md#spark.locality.wait[spark.locality.wait] (default: 3s
) when the TaskLocality
-specific property is not defined or 0
for NO_PREF
and ANY
.
NOTE: NO_PREF
and ANY
task localities have no locality wait.
.TaskLocalities and Corresponding Spark Properties [cols="1,2",options="header",width="100%"] |=== | TaskLocality | Spark Property
| PROCESS_LOCAL | configuration-properties.md#spark.locality.wait.process[spark.locality.wait.process]
| NODE_LOCAL | configuration-properties.md#spark.locality.wait.node[spark.locality.wait.node]
| RACK_LOCAL | configuration-properties.md#spark.locality.wait.rack[spark.locality.wait.rack] |===
NOTE: getLocalityWait
is used when TaskSetManager
calculates <
Checking Available Memory For More Task Results¶
canFetchMoreResults(
size: Long): Boolean
canFetchMoreResults
checks whether there is enough memory to fetch the result of a task.
Internally, canFetchMoreResults
increments the internal <size
(which is the size of the result of a task) and increments the internal <
If the current internal <canFetchMoreResults
prints out the following ERROR message to the logs:
Total size of serialized results of [calculatedTasks] tasks ([totalResultSize]) is bigger than spark.driver.maxResultSize ([maxResultSize])
In the end, canFetchMoreResults
<false
.
Otherwise, canFetchMoreResults
returns true
.
canFetchMoreResults
is used when TaskResultGetter
is requested to enqueue a successful task.