Executor¶
Executor is a process that is used for executing scheduler:Task.md[tasks].
Executor typically runs for the entire lifetime of a Spark application which is called static allocation of executors (but you could also opt in for dynamic allocation).
Executors are managed by executor:ExecutorBackend.md[executor backends].
Executors <
Executors provide in-memory storage for RDDs that are cached in Spark applications (via storage:BlockManager.md[]).
When started, an executor first registers itself with the driver that establishes a communication channel directly to the driver to accept tasks for execution.
../images/executor/executor-taskrunner-executorbackend.png)
Executor offers are described by executor id and the host on which an executor runs (see <
Executors can run multiple tasks over its lifetime, both in parallel and sequentially. They track executor:TaskRunner.md[running tasks] (by their task ids in <
Executors use a <
Executors send <
It is recommended to have as many executors as data nodes and as many cores as you can get from the cluster.
Executors are described by their id, hostname, environment (as SparkEnv
), and classpath (and, less importantly, and more for internal optimization, whether they run in spark-local:index.md[local] or spark-cluster.md[cluster] mode).
Creating Instance¶
Executor
takes the following to be created:
- Executor ID
- Host name
- SparkEnv
- User-defined jars (default:
empty
) - isLocal flag (default:
false
) - Java's UncaughtExceptionHandler (default:
SparkUncaughtExceptionHandler
) - Resources (
Map[String, ResourceInformation]
)
Executor
is created when:
-
CoarseGrainedExecutorBackend
is requested to handle a RegisteredExecutor message (after having registered with the driver) -
LocalEndpoint
is created
When Created¶
When created, Executor
prints out the following INFO messages to the logs:
Starting executor ID [executorId] on host [executorHostname]
(only for non-local modes) Executor
sets SparkUncaughtExceptionHandler
as the default handler invoked when a thread abruptly terminates due to an uncaught exception.
(only for non-local modes) Executor
requests the BlockManager to initialize (with the Spark application id of the SparkConf).
(only for non-local modes) Executor
requests the MetricsSystem to register the ExecutorSource and shuffleMetricsSource of the BlockManager.
Executor
uses SparkEnv
to access the MetricsSystem and BlockManager.
Executor
creates a task class loader (optionally with REPL support) and requests the system Serializer
to use as the default classloader (for deserializing tasks).
Executor
starts sending heartbeats with the metrics of active tasks.
Fetching File and Jar Dependencies¶
updateDependencies(
newFiles: Map[String, Long],
newJars: Map[String, Long]): Unit
updateDependencies
fetches missing or outdated extra files (in the given newFiles
). For every name-timestamp pair that...FIXME..., updateDependencies
prints out the following INFO message to the logs:
Fetching [name] with timestamp [timestamp]
updateDependencies
fetches missing or outdated extra jars (in the given newJars
). For every name-timestamp pair that...FIXME..., updateDependencies
prints out the following INFO message to the logs:
Fetching [name] with timestamp [timestamp]
updateDependencies
fetches the file to the SparkFiles root directory.
updateDependencies
...FIXME
updateDependencies
is used when:
TaskRunner
is requested to start (and run a task)
spark.driver.maxResultSize¶
Executor
uses the spark.driver.maxResultSize for TaskRunner
when requested to run a task (and decide on a serialized task result).
Maximum Size of Direct Results¶
Executor
uses the minimum of spark.task.maxDirectResultSize and spark.rpc.message.maxSize when TaskRunner
is requested to run a task (and decide on the type of a serialized task result).
Logging¶
Enable ALL
logging level for org.apache.spark.executor.Executor
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.executor.Executor=ALL
Refer to Logging.
Review Me¶
== [[isLocal]] isLocal Flag
Executor is given a isLocal flag when created. This is how the executor knows whether it runs in local or cluster mode. It is disabled by default.
The flag is turned on for spark-local:index.md[Spark local] (via spark-local:spark-LocalEndpoint.md[LocalEndpoint]).
== [[userClassPath]] User-Defined Jars
Executor is given user-defined jars when created. There are no jars defined by default.
The jars are specified using configuration-properties.md#spark.executor.extraClassPath[spark.executor.extraClassPath] configuration property (via executor:CoarseGrainedExecutorBackend.md#main[--user-class-path] command-line option of CoarseGrainedExecutorBackend).
Running Tasks Registry¶
runningTasks: Map[Long, TaskRunner]
Executor
tracks TaskRunners by task IDs.
HeartbeatReceiver RPC Endpoint Reference¶
RPC endpoint reference to HeartbeatReceiver on the driver.
Set when Executor <
Used exclusively when Executor <
== [[launchTask]] Launching Task
[source, scala]¶
launchTask( context: ExecutorBackend, taskDescription: TaskDescription): Unit
launchTask simply creates a executor:TaskRunner.md[] (with the given executor:ExecutorBackend.md[] and the TaskDescription) and adds it to the <
In the end, launchTask requests the <
.Launching tasks on executor using TaskRunners image::executor-taskrunner-executorbackend.png[align="center"]
launchTask is used when:
-
CoarseGrainedExecutorBackend is requested to executor:CoarseGrainedExecutorBackend.md#LaunchTask[handle a LaunchTask message]
-
LocalEndpoint RPC endpoint (of spark-local:spark-LocalSchedulerBackend.md#[LocalSchedulerBackend]) is requested to spark-local:spark-LocalEndpoint.md#reviveOffers[reviveOffers]
-
MesosExecutorBackend is requested to spark-on-mesos:spark-executor-backends-MesosExecutorBackend.md#launchTask[launchTask]
== [[heartbeater]] Heartbeat Sender Thread
heartbeater is a daemon {java-javadoc-url}/java/util/concurrent/ScheduledThreadPoolExecutor.html[ScheduledThreadPoolExecutor] with a single thread.
The name of the thread pool is driver-heartbeater.
== [[coarse-grained-executor]] Coarse-Grained Executors
Coarse-grained executors are executors that use executor:CoarseGrainedExecutorBackend.md[] for task scheduling.
== [[resource-offers]] Resource Offers
Read scheduler:TaskSchedulerImpl.md#resourceOffers[resourceOffers] in TaskSchedulerImpl and scheduler:TaskSetManager.md#resourceOffers[resourceOffer] in TaskSetManager.
== [[threadPool]] Executor task launch worker Thread Pool
Executor uses threadPool daemon cached thread pool with the name Executor task launch worker-[ID] (with ID
being the task id) for <
threadPool is created when <
== [[memory]] Executor Memory
You can control the amount of memory per executor using configuration-properties.md#spark.executor.memory[spark.executor.memory] configuration property. It sets the available memory equally for all executors per application.
The amount of memory per executor is looked up when SparkContext.md#creating-instance[SparkContext is created].
You can change the assigned memory per executor per node in spark-standalone:index.md[standalone cluster] using SparkContext.md#environment-variables[SPARK_EXECUTOR_MEMORY] environment variable.
You can find the value displayed as Memory per Node in spark-standalone:Master.md[web UI for standalone Master] (as depicted in the figure below).
.Memory per Node in Spark Standalone's web UI image::spark-standalone-webui-memory-per-node.png[align="center"]
The above figure shows the result of running tools:spark-shell.md[Spark shell] with the amount of memory per executor defined explicitly (on command line), i.e.
./bin/spark-shell --master spark://localhost:7077 -c spark.executor.memory=2g
Metrics¶
Every executor registers its own executor:ExecutorSource.md[] to report metrics.
== [[stop]] Stopping Executor
[source, scala]¶
stop(): Unit¶
stop requests core:SparkEnv.md#metricsSystem[MetricsSystem] for a report.
stop shuts <
stop shuts <
(only when <SparkEnv
to stop].
stop is used when executor:CoarseGrainedExecutorBackend.md#Shutdown[CoarseGrainedExecutorBackend] and spark-local:spark-LocalEndpoint.md#StopExecutor[LocalEndpoint] are requested to stop their managed executors.
== [[computeTotalGcTime]] computeTotalGcTime Method
[source, scala]¶
computeTotalGcTime(): Long¶
computeTotalGcTime...FIXME
computeTotalGcTime is used when:
-
TaskRunner is requested to executor:TaskRunner.md#collectAccumulatorsAndResetStatusOnFailure[collectAccumulatorsAndResetStatusOnFailure] and executor:TaskRunner.md#run[run]
-
Executor is requested to <
>
== [[createClassLoader]] createClassLoader Method
[source, scala]¶
createClassLoader(): MutableURLClassLoader¶
createClassLoader...FIXME
createClassLoader is used when...FIXME
== [[addReplClassLoaderIfNeeded]] addReplClassLoaderIfNeeded Method
[source, scala]¶
addReplClassLoaderIfNeeded( parent: ClassLoader): ClassLoader
addReplClassLoaderIfNeeded...FIXME
addReplClassLoaderIfNeeded is used when...FIXME
== [[reportHeartBeat]] Heartbeating With Partial Metrics For Active Tasks To Driver
[source, scala]¶
reportHeartBeat(): Unit¶
reportHeartBeat collects executor:TaskRunner.md[TaskRunners] for <
executor:TaskRunner.md[] has TaskRunner.md#task[task] deserialized when it executor:TaskRunner.md#run[runs the task].
For every running task, reportHeartBeat takes its scheduler:Task.md#metrics[TaskMetrics] and:
- Requests executor:TaskMetrics.md#mergeShuffleReadMetrics[ShuffleRead metrics to be merged]
- executor:TaskMetrics.md#setJvmGCTime[Sets jvmGCTime metrics]
reportHeartBeat then records the latest values of executor:TaskMetrics.md#accumulators[internal and external accumulators] for every task.
NOTE: Internal accumulators are a task's metrics while external accumulators are a Spark application's accumulators that a user has created.
reportHeartBeat sends a blocking Heartbeat message to <
NOTE: A Heartbeat
message contains the executor identifier, the accumulator updates, and the identifier of the storage:BlockManager.md[].
If the response (from <BlockManager
, you should see the following INFO message in the logs and reportHeartBeat requests the BlockManager to storage:BlockManager.md#reregister[re-register] (which will register the blocks the BlockManager
manages with the driver).
[source,plaintext]¶
Told to re-register on heartbeat¶
HeartbeatResponse requests the BlockManager to re-register when either scheduler:TaskScheduler.md#executorHeartbeatReceived[TaskScheduler] or HeartbeatReceiver know nothing about the executor.
When posting the Heartbeat
was successful, reportHeartBeat resets <
In case of a non-fatal exception, you should see the following WARN message in the logs (followed by the stack trace).
Issue communicating with driver in heartbeater
Every failure reportHeartBeat increments <56
.
Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
reportHeartBeat is used when Executor is requested to <
== [[startDriverHeartbeater]][[heartbeats-and-active-task-metrics]] Sending Heartbeats and Active Tasks Metrics
Executors keep sending <10s
with some random initial delay so the heartbeats from different executors do not pile up on the driver).
.Executors use HeartbeatReceiver endpoint to report task metrics image::executor-heartbeatReceiver-endpoint.png[align="center"]
An executor sends heartbeats using the <
.HeartbeatReceiver's Heartbeat Message Handler image::HeartbeatReceiver-Heartbeat.png[align="center"]
For each scheduler:Task.md[task] in executor:TaskRunner.md[] (in <mergeShuffleReadMetrics
and setJvmGCTime
) that become part of the heartbeat (with accumulators).
NOTE: Executors track the executor:TaskRunner.md[] that run scheduler:Task.md[tasks]. A executor:TaskRunner.md#run[task might not be assigned to a TaskRunner yet] when the executor sends a heartbeat.
A blocking Heartbeat message that holds the executor id, all accumulator updates (per task id), and storage:BlockManagerId.md[] is sent to HeartbeatReceiver RPC endpoint (with <
If the response requests to reregister BlockManager, you should see the following INFO message in the logs:
Told to re-register on heartbeat
BlockManager is requested to storage:BlockManager.md#reregister[reregister].
The internal <0
).
If there are any issues with communicating with the driver, you should see the following WARN message in the logs:
[source,plaintext]¶
Issue communicating with driver in heartbeater¶
The internal <spark.executor.heartbeat.maxFailures
Spark property). If the number is greater, the following ERROR is printed out to the logs:
Exit as unable to send heartbeats to driver more than [HEARTBEAT_MAX_FAILURES] times
The executor exits (using System.exit
and exit code 56).
== [[internal-properties]] Internal Properties
=== [[executorSource]] ExecutorSource
executor:ExecutorSource.md[]
=== [[heartbeatFailures]] heartbeatFailures