CoarseGrainedSchedulerBackend¶
CoarseGrainedSchedulerBackend
is a base SchedulerBackend for coarse-grained schedulers.
CoarseGrainedSchedulerBackend
is an ExecutorAllocationClient.
CoarseGrainedSchedulerBackend
is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on CoarseGrainedExecutorBackend).
CoarseGrainedSchedulerBackend
holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.
CoarseGrainedSchedulerBackend
registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.
Note
Active executors are executors that are not pending to be removed or lost.
Implementations¶
KubernetesClusterSchedulerBackend
(Spark on Kubernetes)- MesosCoarseGrainedSchedulerBackend
- StandaloneSchedulerBackend
- YarnSchedulerBackend
Creating Instance¶
CoarseGrainedSchedulerBackend
takes the following to be created:
Creating DriverEndpoint¶
createDriverEndpoint(
properties: Seq[(String, String)]): DriverEndpoint
createDriverEndpoint
creates a DriverEndpoint.
Note
The purpose of createDriverEndpoint
is to let CoarseGrainedSchedulerBackends to provide custom implementations (e.g. KubernetesClusterSchedulerBackend
).
createDriverEndpoint
is used when CoarseGrainedSchedulerBackend
is created (and initializes the driverEndpoint internal reference).
decommissionExecutors¶
decommissionExecutors(
executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
adjustTargetNumExecutors: Boolean,
triggeredByExecutor: Boolean): Seq[String]
decommissionExecutors
is part of the ExecutorAllocationClient abstraction.
decommissionExecutors
...FIXME
totalRegisteredExecutors Registry¶
totalRegisteredExecutors: AtomicInteger
totalRegisteredExecutors
is an internal registry of the number of registered executors (a Java AtomicInteger).
totalRegisteredExecutors
starts from 0
.
totalRegisteredExecutors
is incremented when:
DriverEndpoint
is requested to handle a RegisterExecutor message
totalRegisteredExecutors
is decremented when:
DriverEndpoint
is requested to remove an executor
isReady¶
isReady(): Boolean
isReady
is part of the SchedulerBackend abstraction.
isReady
...FIXME
Sufficient Resources Registered¶
sufficientResourcesRegistered(): Boolean
sufficientResourcesRegistered
is true
(and is supposed to be overriden by custom CoarseGrainedSchedulerBackends).
Minimum Resources Available Ratio¶
minRegisteredRatio: Double
minRegisteredRatio
is a ratio of the minimum resources available to the total expected resources for the CoarseGrainedSchedulerBackend
to be ready for scheduling tasks (for execution).
minRegisteredRatio
uses spark.scheduler.minRegisteredResourcesRatio configuration property if defined or defaults to 0.0
.
minRegisteredRatio
can be between 0.0
and 1.0
(inclusive).
minRegisteredRatio
is used when:
CoarseGrainedSchedulerBackend
is requested to isReadyStandaloneSchedulerBackend
is requested tosufficientResourcesRegistered
KubernetesClusterSchedulerBackend
is requested tosufficientResourcesRegistered
MesosCoarseGrainedSchedulerBackend
is requested tosufficientResourcesRegistered
YarnSchedulerBackend
is requested tosufficientResourcesRegistered
DriverEndpoint¶
driverEndpoint: RpcEndpointRef
CoarseGrainedSchedulerBackend
creates a DriverEndpoint when created.
The DriverEndpoint
is used to communicate with the driver (by sending RPC messages).
Available Executors Registry¶
executorDataMap: HashMap[String, ExecutorData]
CoarseGrainedSchedulerBackend
tracks available executors using executorDataMap
registry (of ExecutorDatas by executor id).
A new entry is added when DriverEndpoint
is requested to handle RegisterExecutor message.
An entry is removed when DriverEndpoint
is requested to handle RemoveExecutor message or a remote host (with one or many executors) disconnects.
Revive Messages Scheduler Service¶
reviveThread: ScheduledExecutorService
CoarseGrainedSchedulerBackend
creates a Java ScheduledExecutorService when created.
The ScheduledExecutorService
is used by DriverEndpoint
RPC Endpoint to post ReviveOffers messages regularly.
Maximum Size of RPC Message¶
maxRpcMessageSize
is the value of spark.rpc.message.maxSize configuration property.
Making Fake Resource Offers on Executors¶
makeOffers(): Unit
makeOffers(
executorId: String): Unit
makeOffers
takes the active executors (out of the <WorkerOffer
resource offers for each (one per executor with the executor's id, host and free cores).
CAUTION: Only free cores are considered in making offers. Memory is not! Why?!
It then requests TaskSchedulerImpl.md#resourceOffers[TaskSchedulerImpl
to process the resource offers] to create a collection of TaskDescription collections that it in turn uses to launch tasks.
Getting Executor Ids¶
When called, getExecutorIds
simply returns executor ids from the internal <
NOTE: It is called when SparkContext.md#getExecutorIds[SparkContext calculates executor ids].
Requesting Executors¶
requestExecutors(
numAdditionalExecutors: Int): Boolean
requestExecutors
is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false
by default).
requestExecutors
method is part of the ExecutorAllocationClient abstraction.
When called, you should see the following INFO message followed by DEBUG message in the logs:
Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
Number of pending executors is now [numPendingExecutors]
<numAdditionalExecutors
.
requestExecutors
requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal <
If numAdditionalExecutors
is negative, a IllegalArgumentException
is thrown:
Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!
NOTE: It is a final method that no other scheduler backends could customize further.
NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.
Requesting Exact Number of Executors¶
requestTotalExecutors(
numExecutors: Int,
localityAwareTasks: Int,
hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors
is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false
by default).
requestTotalExecutors
is part of the ExecutorAllocationClient abstraction.
It sets the internal <numExecutors
and the <
If numExecutors
is negative, a IllegalArgumentException
is thrown:
Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!
NOTE: It is a final method that no other scheduler backends could customize further.
NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.
Finding Default Level of Parallelism¶
defaultParallelism(): Int
defaultParallelism
is part of the SchedulerBackend abstraction.
defaultParallelism
is spark.default.parallelism configuration property if defined.
Otherwise, defaultParallelism
is the maximum of totalCoreCount or 2
.
Killing Task¶
killTask(
taskId: Long,
executorId: String,
interruptThread: Boolean): Unit
killTask
is part of the SchedulerBackend abstraction.
killTask
simply sends a KillTask message to <
Stopping All Executors¶
stopExecutors
sends a blocking <
NOTE: It is called exclusively while CoarseGrainedSchedulerBackend
is <
You should see the following INFO message in the logs:
Shutting down all executors
Reset State¶
reset
resets the internal state:
- Sets <
> to 0 - Clears
executorsPendingToRemove
- Sends a blocking <
> message to < > for every executor (in the internal executorDataMap
) to inform it aboutSlaveLost
with the message: +Stale executor after cluster manager re-registered.
reset
is a method that is defined in CoarseGrainedSchedulerBackend
, but used and overriden exclusively by yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].
Remove Executor¶
removeExecutor(executorId: String, reason: ExecutorLossReason)
removeExecutor
sends a blocking <
NOTE: It is called by subclasses spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend], spark-mesos/spark-mesos.md#CoarseMesosSchedulerBackend[CoarseMesosSchedulerBackend], and yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].
CoarseGrainedScheduler RPC Endpoint¶
When <
driverEndpoint
is a DriverEndpoint.
Note
CoarseGrainedSchedulerBackend
is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint
(at least partially).
It is called standalone scheduler's driver endpoint internally.
It tracks:
It uses driver-revive-thread
daemon single-thread thread pool for ...FIXME
CAUTION: FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI
- doubles spark://
prefix.
Starting CoarseGrainedSchedulerBackend¶
start(): Unit
start
is part of the SchedulerBackend abstraction.
start
takes all spark.
-prefixed properties and registers the <
NOTE: start
uses <
NOTE: start
uses <
Checking If Sufficient Compute Resources Available Or Waiting Time PassedMethod¶
isReady(): Boolean
isReady
is part of the SchedulerBackend abstraction.
isReady
allows to delay task launching until <
Internally, isReady
<
NOTE: <
If the <isReady
is positive.
SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]
If there are no sufficient resources available yet (the above requirement does not hold), isReady
checks whether the time since <
You should see the following INFO message in the logs and isReady
is positive.
SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)
Otherwise, when <isReady
is negative.
Reviving Resource Offers¶
reviveOffers(): Unit
reviveOffers
is part of the SchedulerBackend abstraction.
reviveOffers
simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.
Stopping SchedulerBackend¶
stop(): Unit
stop
is part of the SchedulerBackend abstraction.
stop
<
In case of any Exception
, stop
reports a SparkException
with the message:
Error stopping standalone scheduler's driver endpoint
createDriverEndpointRef¶
createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef
createDriverEndpointRef
<
createDriverEndpointRef
is used when CoarseGrainedSchedulerBackend
is requested to <
Checking Whether Executor is Active¶
isExecutorActive(
id: String): Boolean
isExecutorActive
is part of the ExecutorAllocationClient abstraction.
isExecutorActive
...FIXME
Requesting Executors from Cluster Manager¶
doRequestTotalExecutors(
requestedTotal: Int): Future[Boolean]
doRequestTotalExecutors
returns a completed Future
with false
value.
doRequestTotalExecutors
is used when:
CoarseGrainedSchedulerBackend
is requested to requestExecutors, requestTotalExecutors and killExecutors
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=ALL
Refer to Logging.
Internal Properties¶
[cols="1,1,2",options="header",width="100%"] |=== | Name | Initial Value | Description
[[currentExecutorIdCounter]] currentExecutorIdCounter |
---|
The last (highest) identifier of all < |
Used exclusively in yarn/spark-yarn-cluster-YarnSchedulerEndpoint.md#RetrieveLastAllocatedExecutorId[YarnSchedulerEndpoint
to respond to RetrieveLastAllocatedExecutorId
message].
| [[createTime]] createTime
| Current time | The time <
| [[defaultAskTimeout]] defaultAskTimeout
| rpc:index.md#spark.rpc.askTimeout[spark.rpc.askTimeout] or rpc:index.md#spark.network.timeout[spark.network.timeout] or 120s
| Default timeout for blocking RPC messages (aka ask messages).
| [[driverEndpoint]] driverEndpoint
| (uninitialized) a| rpc:RpcEndpointRef.md[RPC endpoint reference] to CoarseGrainedScheduler
RPC endpoint (with DriverEndpoint as the message handler).
Initialized when CoarseGrainedSchedulerBackend
<
Used when CoarseGrainedSchedulerBackend
executes the following (asynchronously, i.e. on a separate thread):
- <
> - <
> - <
> - <
> - <
> - <
>
| [[executorsPendingToRemove]] executorsPendingToRemove
| empty | Executors marked as removed but the confirmation from a cluster manager has not arrived yet.
| [[hostToLocalTaskCount]] hostToLocalTaskCount
| empty | Registry of hostnames and possible number of task running on them.
| [[localityAwareTasks]] localityAwareTasks
| 0
| Number of pending tasks...FIXME
| [[maxRegisteredWaitingTimeMs]] maxRegisteredWaitingTimeMs
| <
| [[numPendingExecutors]] numPendingExecutors
| 0
|
| [[totalCoreCount]] totalCoreCount
| 0
| Total number of CPU cores, i.e. the sum of all the cores on all executors. |===