CoarseGrainedSchedulerBackend¶
CoarseGrainedSchedulerBackend is a base SchedulerBackend for coarse-grained schedulers.
CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.
CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on CoarseGrainedExecutorBackend).
CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.
CoarseGrainedSchedulerBackend registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.
Note
Active executors are executors that are not pending to be removed or lost.
Implementations¶
KubernetesClusterSchedulerBackend(Spark on Kubernetes)- MesosCoarseGrainedSchedulerBackend
- StandaloneSchedulerBackend
- YarnSchedulerBackend
Creating Instance¶
CoarseGrainedSchedulerBackend takes the following to be created:
Creating DriverEndpoint¶
createDriverEndpoint(
properties: Seq[(String, String)]): DriverEndpoint
createDriverEndpoint creates a DriverEndpoint.
Note
The purpose of createDriverEndpoint is to let CoarseGrainedSchedulerBackends to provide custom implementations (e.g. KubernetesClusterSchedulerBackend).
createDriverEndpoint is used when CoarseGrainedSchedulerBackend is created (and initializes the driverEndpoint internal reference).
decommissionExecutors¶
decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String]
decommissionExecutors is part of the ExecutorAllocationClient abstraction.
decommissionExecutors...FIXME
totalRegisteredExecutors Registry¶
totalRegisteredExecutors: AtomicInteger
totalRegisteredExecutors is an internal registry of the number of registered executors (a Java AtomicInteger).
totalRegisteredExecutors starts from 0.
totalRegisteredExecutors is incremented when:
DriverEndpointis requested to handle a RegisterExecutor message
totalRegisteredExecutors is decremented when:
DriverEndpointis requested to remove an executor
isReady¶
isReady(): Boolean
isReady is part of the SchedulerBackend abstraction.
isReady...FIXME
Sufficient Resources Registered¶
sufficientResourcesRegistered(): Boolean
sufficientResourcesRegistered is true (and is supposed to be overriden by custom CoarseGrainedSchedulerBackends).
Minimum Resources Available Ratio¶
minRegisteredRatio: Double
minRegisteredRatio is a ratio of the minimum resources available to the total expected resources for the CoarseGrainedSchedulerBackend to be ready for scheduling tasks (for execution).
minRegisteredRatio uses spark.scheduler.minRegisteredResourcesRatio configuration property if defined or defaults to 0.0.
minRegisteredRatio can be between 0.0 and 1.0 (inclusive).
minRegisteredRatio is used when:
CoarseGrainedSchedulerBackendis requested to isReadyStandaloneSchedulerBackendis requested tosufficientResourcesRegisteredKubernetesClusterSchedulerBackendis requested tosufficientResourcesRegisteredMesosCoarseGrainedSchedulerBackendis requested tosufficientResourcesRegisteredYarnSchedulerBackendis requested tosufficientResourcesRegistered
DriverEndpoint¶
driverEndpoint: RpcEndpointRef
CoarseGrainedSchedulerBackend creates a DriverEndpoint when created.
The DriverEndpoint is used to communicate with the driver (by sending RPC messages).
Available Executors Registry¶
executorDataMap: HashMap[String, ExecutorData]
CoarseGrainedSchedulerBackend tracks available executors using executorDataMap registry (of ExecutorDatas by executor id).
A new entry is added when DriverEndpoint is requested to handle RegisterExecutor message.
An entry is removed when DriverEndpoint is requested to handle RemoveExecutor message or a remote host (with one or many executors) disconnects.
Revive Messages Scheduler Service¶
reviveThread: ScheduledExecutorService
CoarseGrainedSchedulerBackend creates a Java ScheduledExecutorService when created.
The ScheduledExecutorService is used by DriverEndpoint RPC Endpoint to post ReviveOffers messages regularly.
Maximum Size of RPC Message¶
maxRpcMessageSize is the value of spark.rpc.message.maxSize configuration property.
Making Fake Resource Offers on Executors¶
makeOffers(): Unit
makeOffers(
executorId: String): Unit
makeOffers takes the active executors (out of the <WorkerOffer resource offers for each (one per executor with the executor's id, host and free cores).
CAUTION: Only free cores are considered in making offers. Memory is not! Why?!
It then requests TaskSchedulerImpl.md#resourceOffers[TaskSchedulerImpl to process the resource offers] to create a collection of TaskDescription collections that it in turn uses to launch tasks.
Getting Executor Ids¶
When called, getExecutorIds simply returns executor ids from the internal <
NOTE: It is called when SparkContext.md#getExecutorIds[SparkContext calculates executor ids].
Requesting Executors¶
requestExecutors(
numAdditionalExecutors: Int): Boolean
requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
requestExecutors method is part of the ExecutorAllocationClient abstraction.
When called, you should see the following INFO message followed by DEBUG message in the logs:
Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
Number of pending executors is now [numPendingExecutors]
<numAdditionalExecutors.
requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal <
If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
NOTE: It is a final method that no other scheduler backends could customize further.
NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.
Requesting Exact Number of Executors¶
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).
requestTotalExecutors is part of the ExecutorAllocationClient abstraction.
It sets the internal <numExecutors and the <
If numExecutors is negative, a IllegalArgumentException is thrown:
Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
NOTE: It is a final method that no other scheduler backends could customize further.
NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.
Finding Default Level of Parallelism¶
defaultParallelism(): Int
defaultParallelism is part of the SchedulerBackend abstraction.
defaultParallelism is spark.default.parallelism configuration property if defined.
Otherwise, defaultParallelism is the maximum of totalCoreCount or 2.
Killing Task¶
killTask(
taskId: Long,
executorId: String,
interruptThread: Boolean): Unit
killTask is part of the SchedulerBackend abstraction.
killTask simply sends a KillTask message to <
Stopping All Executors¶
stopExecutors sends a blocking <
NOTE: It is called exclusively while CoarseGrainedSchedulerBackend is <
You should see the following INFO message in the logs:
Shutting down all executors
Reset State¶
reset resets the internal state:
- Sets <
> to 0 - Clears
executorsPendingToRemove - Sends a blocking <
> message to < > for every executor (in the internal executorDataMap) to inform it aboutSlaveLostwith the message: +Stale executor after cluster manager re-registered.
reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].
Remove Executor¶
removeExecutor(executorId: String, reason: ExecutorLossReason)
removeExecutor sends a blocking <
NOTE: It is called by subclasses spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend], spark-mesos/spark-mesos.md#CoarseMesosSchedulerBackend[CoarseMesosSchedulerBackend], and yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].
CoarseGrainedScheduler RPC Endpoint¶
When <
driverEndpoint is a DriverEndpoint.
Note
CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).
It is called standalone scheduler's driver endpoint internally.
It tracks:
It uses driver-revive-thread daemon single-thread thread pool for ...FIXME
CAUTION: FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.
Starting CoarseGrainedSchedulerBackend¶
start(): Unit
start is part of the SchedulerBackend abstraction.
start takes all spark.-prefixed properties and registers the <

NOTE: start uses <
NOTE: start uses <
Checking If Sufficient Compute Resources Available Or Waiting Time PassedMethod¶
isReady(): Boolean
isReady is part of the SchedulerBackend abstraction.
isReady allows to delay task launching until <
Internally, isReady <
NOTE: <
If the <isReady is positive.
SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since <
You should see the following INFO message in the logs and isReady is positive.
SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)
Otherwise, when <isReady is negative.
Reviving Resource Offers¶
reviveOffers(): Unit
reviveOffers is part of the SchedulerBackend abstraction.
reviveOffers simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

Stopping SchedulerBackend¶
stop(): Unit
stop is part of the SchedulerBackend abstraction.
stop <
In case of any Exception, stop reports a SparkException with the message:
Error stopping standalone scheduler's driver endpoint
createDriverEndpointRef¶
createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef
createDriverEndpointRef <
createDriverEndpointRef is used when CoarseGrainedSchedulerBackend is requested to <
Checking Whether Executor is Active¶
isExecutorActive(
id: String): Boolean
isExecutorActive is part of the ExecutorAllocationClient abstraction.
isExecutorActive...FIXME
Requesting Executors from Cluster Manager¶
doRequestTotalExecutors(
requestedTotal: Int): Future[Boolean]
doRequestTotalExecutors returns a completed Future with false value.
doRequestTotalExecutors is used when:
CoarseGrainedSchedulerBackendis requested to requestExecutors, requestTotalExecutors and killExecutors
Logging¶
Enable ALL logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=ALL
Refer to Logging.
Internal Properties¶
[cols="1,1,2",options="header",width="100%"] |=== | Name | Initial Value | Description
[[currentExecutorIdCounter]] currentExecutorIdCounter |
|---|
| The last (highest) identifier of all < |
Used exclusively in yarn/spark-yarn-cluster-YarnSchedulerEndpoint.md#RetrieveLastAllocatedExecutorId[YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message].
| [[createTime]] createTime | Current time | The time <
| [[defaultAskTimeout]] defaultAskTimeout | rpc:index.md#spark.rpc.askTimeout[spark.rpc.askTimeout] or rpc:index.md#spark.network.timeout[spark.network.timeout] or 120s | Default timeout for blocking RPC messages (aka ask messages).
| [[driverEndpoint]] driverEndpoint | (uninitialized) a| rpc:RpcEndpointRef.md[RPC endpoint reference] to CoarseGrainedScheduler RPC endpoint (with DriverEndpoint as the message handler).
Initialized when CoarseGrainedSchedulerBackend <
Used when CoarseGrainedSchedulerBackend executes the following (asynchronously, i.e. on a separate thread):
- <
> - <
> - <
> - <
> - <
> - <
>
| [[executorsPendingToRemove]] executorsPendingToRemove | empty | Executors marked as removed but the confirmation from a cluster manager has not arrived yet.
| [[hostToLocalTaskCount]] hostToLocalTaskCount | empty | Registry of hostnames and possible number of task running on them.
| [[localityAwareTasks]] localityAwareTasks | 0 | Number of pending tasks...FIXME
| [[maxRegisteredWaitingTimeMs]] maxRegisteredWaitingTimeMs | <
| [[numPendingExecutors]] numPendingExecutors | 0 |
| [[totalCoreCount]] totalCoreCount | 0 | Total number of CPU cores, i.e. the sum of all the cores on all executors. |===