Skip to content

CoarseGrainedSchedulerBackend

CoarseGrainedSchedulerBackend is a base SchedulerBackend for coarse-grained schedulers.

CoarseGrainedSchedulerBackend is an ExecutorAllocationClient.

CoarseGrainedSchedulerBackend is responsible for requesting resources from a cluster manager for executors that it in turn uses to launch tasks (on CoarseGrainedExecutorBackend).

CoarseGrainedSchedulerBackend holds executors for the duration of the Spark job rather than relinquishing executors whenever a task is done and asking the scheduler to launch a new executor for each new task.

CoarseGrainedSchedulerBackend registers CoarseGrainedScheduler RPC Endpoint that executors use for RPC communication.

Note

Active executors are executors that are not pending to be removed or lost.

Implementations

Creating Instance

CoarseGrainedSchedulerBackend takes the following to be created:

Creating DriverEndpoint

createDriverEndpoint(
  properties: Seq[(String, String)]): DriverEndpoint

createDriverEndpoint creates a DriverEndpoint.

Note

The purpose of createDriverEndpoint is to let CoarseGrainedSchedulerBackends to provide custom implementations (e.g. KubernetesClusterSchedulerBackend).

createDriverEndpoint is used when CoarseGrainedSchedulerBackend is created (and initializes the driverEndpoint internal reference).

decommissionExecutors

decommissionExecutors(
  executorsAndDecomInfo: Array[(String, ExecutorDecommissionInfo)],
  adjustTargetNumExecutors: Boolean,
  triggeredByExecutor: Boolean): Seq[String]

decommissionExecutors is part of the ExecutorAllocationClient abstraction.

decommissionExecutors...FIXME

totalRegisteredExecutors Registry

totalRegisteredExecutors: AtomicInteger

totalRegisteredExecutors is an internal registry of the number of registered executors (a Java AtomicInteger).

totalRegisteredExecutors starts from 0.

totalRegisteredExecutors is incremented when:

totalRegisteredExecutors is decremented when:

isReady

isReady(): Boolean

isReady is part of the SchedulerBackend abstraction.

isReady...FIXME

Sufficient Resources Registered

sufficientResourcesRegistered(): Boolean

sufficientResourcesRegistered is true (and is supposed to be overriden by custom CoarseGrainedSchedulerBackends).

Minimum Resources Available Ratio

minRegisteredRatio: Double

minRegisteredRatio is a ratio of the minimum resources available to the total expected resources for the CoarseGrainedSchedulerBackend to be ready for scheduling tasks (for execution).

minRegisteredRatio uses spark.scheduler.minRegisteredResourcesRatio configuration property if defined or defaults to 0.0.

minRegisteredRatio can be between 0.0 and 1.0 (inclusive).

minRegisteredRatio is used when:

  • CoarseGrainedSchedulerBackend is requested to isReady
  • StandaloneSchedulerBackend is requested to sufficientResourcesRegistered
  • KubernetesClusterSchedulerBackend is requested to sufficientResourcesRegistered
  • MesosCoarseGrainedSchedulerBackend is requested to sufficientResourcesRegistered
  • YarnSchedulerBackend is requested to sufficientResourcesRegistered

DriverEndpoint

driverEndpoint: RpcEndpointRef

CoarseGrainedSchedulerBackend creates a DriverEndpoint when created.

The DriverEndpoint is used to communicate with the driver (by sending RPC messages).

Available Executors Registry

executorDataMap: HashMap[String, ExecutorData]

CoarseGrainedSchedulerBackend tracks available executors using executorDataMap registry (of ExecutorDatas by executor id).

A new entry is added when DriverEndpoint is requested to handle RegisterExecutor message.

An entry is removed when DriverEndpoint is requested to handle RemoveExecutor message or a remote host (with one or many executors) disconnects.

Revive Messages Scheduler Service

reviveThread: ScheduledExecutorService

CoarseGrainedSchedulerBackend creates a Java ScheduledExecutorService when created.

The ScheduledExecutorService is used by DriverEndpoint RPC Endpoint to post ReviveOffers messages regularly.

Maximum Size of RPC Message

maxRpcMessageSize is the value of spark.rpc.message.maxSize configuration property.

Making Fake Resource Offers on Executors

makeOffers(): Unit
makeOffers(
  executorId: String): Unit

makeOffers takes the active executors (out of the <> internal registry) and creates WorkerOffer resource offers for each (one per executor with the executor's id, host and free cores).

CAUTION: Only free cores are considered in making offers. Memory is not! Why?!

It then requests TaskSchedulerImpl.md#resourceOffers[TaskSchedulerImpl to process the resource offers] to create a collection of TaskDescription collections that it in turn uses to launch tasks.

Getting Executor Ids

When called, getExecutorIds simply returns executor ids from the internal <> registry.

NOTE: It is called when SparkContext.md#getExecutorIds[SparkContext calculates executor ids].

Requesting Executors

requestExecutors(
  numAdditionalExecutors: Int): Boolean

requestExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

requestExecutors method is part of the ExecutorAllocationClient abstraction.

When called, you should see the following INFO message followed by DEBUG message in the logs:

Requesting [numAdditionalExecutors] additional executor(s) from the cluster manager
Number of pending executors is now [numPendingExecutors]

<> is increased by the input numAdditionalExecutors.

requestExecutors requests executors from a cluster manager (that reflects the current computation needs). The "new executor total" is a sum of the internal <> and <> decreased by the <>.

If numAdditionalExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of additional executor(s) [numAdditionalExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Requesting Exact Number of Executors

requestTotalExecutors(
  numExecutors: Int,
  localityAwareTasks: Int,
  hostToLocalTaskCount: Map[String, Int]): Boolean

requestTotalExecutors is a "decorator" method that ultimately calls a cluster-specific doRequestTotalExecutors method and returns whether the request was acknowledged or not (it is assumed false by default).

requestTotalExecutors is part of the ExecutorAllocationClient abstraction.

It sets the internal <> and <> registries. It then calculates the exact number of executors which is the input numExecutors and the <> decreased by the number of <>.

If numExecutors is negative, a IllegalArgumentException is thrown:

Attempted to request a negative number of executor(s) [numExecutors] from the cluster manager. Please specify a positive number!

NOTE: It is a final method that no other scheduler backends could customize further.

NOTE: The method is a synchronized block that makes multiple concurrent requests be handled in a serial fashion, i.e. one by one.

Finding Default Level of Parallelism

defaultParallelism(): Int

defaultParallelism is part of the SchedulerBackend abstraction.

defaultParallelism is spark.default.parallelism configuration property if defined.

Otherwise, defaultParallelism is the maximum of totalCoreCount or 2.

Killing Task

killTask(
  taskId: Long,
  executorId: String,
  interruptThread: Boolean): Unit

killTask is part of the SchedulerBackend abstraction.

killTask simply sends a KillTask message to <>.

Stopping All Executors

stopExecutors sends a blocking <> message to <> (if already initialized).

NOTE: It is called exclusively while CoarseGrainedSchedulerBackend is <>.

You should see the following INFO message in the logs:

Shutting down all executors

Reset State

reset resets the internal state:

  1. Sets <> to 0
  2. Clears executorsPendingToRemove
  3. Sends a blocking <> message to <> for every executor (in the internal executorDataMap) to inform it about SlaveLost with the message: +
    Stale executor after cluster manager re-registered.
    

reset is a method that is defined in CoarseGrainedSchedulerBackend, but used and overriden exclusively by yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

Remove Executor

removeExecutor(executorId: String, reason: ExecutorLossReason)

removeExecutor sends a blocking <> message to <>.

NOTE: It is called by subclasses spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend], spark-mesos/spark-mesos.md#CoarseMesosSchedulerBackend[CoarseMesosSchedulerBackend], and yarn/spark-yarn-yarnschedulerbackend.md[YarnSchedulerBackend].

CoarseGrainedScheduler RPC Endpoint

When <>, it registers CoarseGrainedScheduler RPC endpoint to be the driver's communication endpoint.

driverEndpoint is a DriverEndpoint.

Note

CoarseGrainedSchedulerBackend is created while SparkContext is being created that in turn lives inside a Spark driver. That explains the name driverEndpoint (at least partially).

It is called standalone scheduler's driver endpoint internally.

It tracks:

It uses driver-revive-thread daemon single-thread thread pool for ...FIXME

CAUTION: FIXME A potential issue with driverEndpoint.asInstanceOf[NettyRpcEndpointRef].toURI - doubles spark:// prefix.

Starting CoarseGrainedSchedulerBackend

start(): Unit

start is part of the SchedulerBackend abstraction.

start takes all spark.-prefixed properties and registers the <CoarseGrainedScheduler RPC endpoint>> (backed by DriverEndpoint ThreadSafeRpcEndpoint).

CoarseGrainedScheduler Endpoint

NOTE: start uses <> to access the current SparkContext.md[SparkContext] and in turn SparkConf.md[SparkConf].

NOTE: start uses <> that was given when <CoarseGrainedSchedulerBackend was created>>.

Checking If Sufficient Compute Resources Available Or Waiting Time PassedMethod

isReady(): Boolean

isReady is part of the SchedulerBackend abstraction.

isReady allows to delay task launching until <> or <> passes.

Internally, isReady <>.

NOTE: <> by default responds that sufficient resources are available.

If the <>, you should see the following INFO message in the logs and isReady is positive.

SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: [minRegisteredRatio]

If there are no sufficient resources available yet (the above requirement does not hold), isReady checks whether the time since <> passed <> to give a way to launch tasks (even when <> not being reached yet).

You should see the following INFO message in the logs and isReady is positive.

SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: [maxRegisteredWaitingTimeMs](ms)

Otherwise, when <> and <> has not elapsed, isReady is negative.

Reviving Resource Offers

reviveOffers(): Unit

reviveOffers is part of the SchedulerBackend abstraction.

reviveOffers simply sends a ReviveOffers message to CoarseGrainedSchedulerBackend RPC endpoint.

CoarseGrainedExecutorBackend Revives Offers

Stopping SchedulerBackend

stop(): Unit

stop is part of the SchedulerBackend abstraction.

stop <> and <CoarseGrainedScheduler RPC endpoint>> (by sending a blocking StopDriver message).

In case of any Exception, stop reports a SparkException with the message:

Error stopping standalone scheduler's driver endpoint

createDriverEndpointRef

createDriverEndpointRef(
  properties: ArrayBuffer[(String, String)]): RpcEndpointRef

createDriverEndpointRef <> and rpc:index.md#setupEndpoint[registers it] as CoarseGrainedScheduler.

createDriverEndpointRef is used when CoarseGrainedSchedulerBackend is requested to <>.

Checking Whether Executor is Active

isExecutorActive(
  id: String): Boolean

isExecutorActive is part of the ExecutorAllocationClient abstraction.

isExecutorActive...FIXME

Requesting Executors from Cluster Manager

doRequestTotalExecutors(
  requestedTotal: Int): Future[Boolean]

doRequestTotalExecutors returns a completed Future with false value.

doRequestTotalExecutors is used when:

Logging

Enable ALL logging level for org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend=ALL

Refer to Logging.

Internal Properties

[cols="1,1,2",options="header",width="100%"] |=== | Name | Initial Value | Description

[[currentExecutorIdCounter]] currentExecutorIdCounter
The last (highest) identifier of all <>.

Used exclusively in yarn/spark-yarn-cluster-YarnSchedulerEndpoint.md#RetrieveLastAllocatedExecutorId[YarnSchedulerEndpoint to respond to RetrieveLastAllocatedExecutorId message].

| [[createTime]] createTime | Current time | The time <CoarseGrainedSchedulerBackend was created>>.

| [[defaultAskTimeout]] defaultAskTimeout | rpc:index.md#spark.rpc.askTimeout[spark.rpc.askTimeout] or rpc:index.md#spark.network.timeout[spark.network.timeout] or 120s | Default timeout for blocking RPC messages (aka ask messages).

| [[driverEndpoint]] driverEndpoint | (uninitialized) a| rpc:RpcEndpointRef.md[RPC endpoint reference] to CoarseGrainedScheduler RPC endpoint (with DriverEndpoint as the message handler).

Initialized when CoarseGrainedSchedulerBackend <>.

Used when CoarseGrainedSchedulerBackend executes the following (asynchronously, i.e. on a separate thread):

  • <>
  • <>
  • <>
  • <>
  • <>
  • <>

| [[executorsPendingToRemove]] executorsPendingToRemove | empty | Executors marked as removed but the confirmation from a cluster manager has not arrived yet.

| [[hostToLocalTaskCount]] hostToLocalTaskCount | empty | Registry of hostnames and possible number of task running on them.

| [[localityAwareTasks]] localityAwareTasks | 0 | Number of pending tasks...FIXME

| [[maxRegisteredWaitingTimeMs]] maxRegisteredWaitingTimeMs | <> |

| [[numPendingExecutors]] numPendingExecutors | 0 |

| [[totalCoreCount]] totalCoreCount | 0 | Total number of CPU cores, i.e. the sum of all the cores on all executors. |===

Back to top