ExecutorMonitor¶
ExecutorMonitor is a SparkListener and a CleanerListener.
Creating Instance¶
ExecutorMonitor takes the following to be created:
ExecutorMonitor is created when:
ExecutorAllocationManageris created
shuffleIds Registry¶
shuffleIds: Set[Int]
ExecutorMonitor uses a mutable HashSet to track shuffle IDs...FIXME
shuffleIds is initialized only when shuffleTrackingEnabled is enabled.
shuffleIds is used by Tracker internal class for the following:
updateTimeout,addShuffle,removeShuffleandupdateActiveShuffles
Executors Registry¶
executors: ConcurrentHashMap[String, Tracker]
ExecutorMonitor uses a Java ConcurrentHashMap to track available executors.
An executor is added when (via ensureExecutorIsTracked):
An executor is removed when onExecutorRemoved.
All executors are removed when reset.
executors is used when:
- onOtherEvent (cleanupShuffle)
- executorCount
- executorsKilled
- onUnpersistRDD
- onTaskEnd
- onJobStart
- onJobEnd
- pendingRemovalCount
- timedOutExecutors
fetchFromShuffleSvcEnabled Flag¶
fetchFromShuffleSvcEnabled: Boolean
ExecutorMonitor initializes fetchFromShuffleSvcEnabled internal flag based on the values of spark.shuffle.service.enabled and spark.shuffle.service.fetch.rdd.enabled configuration properties.
fetchFromShuffleSvcEnabled is enabled (true) when the aforementioned configuration properties are.
fetchFromShuffleSvcEnabled is used when:
shuffleTrackingEnabled Flag¶
shuffleTrackingEnabled: Boolean
ExecutorMonitor initializes shuffleTrackingEnabled internal flag based on the values of spark.shuffle.service.enabled and spark.dynamicAllocation.shuffleTracking.enabled configuration properties.
shuffleTrackingEnabled is enabled (true) when the following holds:
- spark.shuffle.service.enabled is disabled
- spark.dynamicAllocation.shuffleTracking.enabled is enabled
When enabled, shuffleTrackingEnabled is used to skip execution of the following (making them noops):
When disabled, shuffleTrackingEnabled is used for the following:
spark.dynamicAllocation.cachedExecutorIdleTimeout¶
ExecutorMonitor reads spark.dynamicAllocation.cachedExecutorIdleTimeout configuration property for Tracker to updateTimeout.
onBlockUpdated¶
onBlockUpdated(
event: SparkListenerBlockUpdated): Unit
onBlockUpdated is part of the SparkListenerInterface abstraction.
onBlockUpdated...FIXME
onExecutorAdded¶
onExecutorAdded(
event: SparkListenerExecutorAdded): Unit
onExecutorAdded is part of the SparkListenerInterface abstraction.
onExecutorAdded...FIXME
onExecutorRemoved¶
onExecutorRemoved(
event: SparkListenerExecutorRemoved): Unit
onExecutorRemoved is part of the SparkListenerInterface abstraction.
onExecutorRemoved...FIXME
onJobEnd¶
onJobEnd(
event: SparkListenerJobEnd): Unit
onJobEnd is part of the SparkListenerInterface abstraction.
onJobEnd...FIXME
onJobStart¶
onJobStart(
event: SparkListenerJobStart): Unit
onJobStart is part of the SparkListenerInterface abstraction.
Note
onJobStart does nothing and simply returns when the shuffleTrackingEnabled flag is turned off (false).
onJobStart requests the input SparkListenerJobStart for the StageInfos and converts...FIXME
onOtherEvent¶
onOtherEvent(
event: SparkListenerEvent): Unit
onOtherEvent is part of the SparkListenerInterface abstraction.
onOtherEvent...FIXME
cleanupShuffle¶
cleanupShuffle(
id: Int): Unit
cleanupShuffle...FIXME
cleanupShuffle is used when onOtherEvent
onTaskEnd¶
onTaskEnd(
event: SparkListenerTaskEnd): Unit
onTaskEnd is part of the SparkListenerInterface abstraction.
onTaskEnd...FIXME
onTaskStart¶
onTaskStart(
event: SparkListenerTaskStart): Unit
onTaskStart is part of the SparkListenerInterface abstraction.
onTaskStart...FIXME
onUnpersistRDD¶
onUnpersistRDD(
event: SparkListenerUnpersistRDD): Unit
onUnpersistRDD is part of the SparkListenerInterface abstraction.
onUnpersistRDD...FIXME
reset¶
reset(): Unit
reset...FIXME
reset is used when:
- FIXME
shuffleCleaned¶
shuffleCleaned(
shuffleId: Int): Unit
shuffleCleaned is part of the CleanerListener abstraction.
shuffleCleaned...FIXME
timedOutExecutors¶
timedOutExecutors(): Seq[String]
timedOutExecutors(
when: Long): Seq[String]
timedOutExecutors...FIXME
timedOutExecutors is used when:
ExecutorAllocationManageris requested to schedule
executorCount¶
executorCount: Int
executorCount...FIXME
executorCount is used when:
ExecutorAllocationManageris requested to addExecutors and removeExecutorsExecutorAllocationManagerSourceis requested for numberAllExecutors performance metric
pendingRemovalCount¶
pendingRemovalCount: Int
pendingRemovalCount...FIXME
pendingRemovalCount is used when:
ExecutorAllocationManageris requested to removeExecutorsExecutorAllocationManagerSourceis requested for numberExecutorsPendingToRemove performance metric
executorsKilled¶
executorsKilled(
ids: Seq[String]): Unit
executorsKilled...FIXME
executorsKilled is used when:
ExecutorAllocationManageris requested to removeExecutors
ensureExecutorIsTracked¶
ensureExecutorIsTracked(
id: String,
resourceProfileId: Int): Tracker
ensureExecutorIsTracked...FIXME
ensureExecutorIsTracked is used when:
getResourceProfileId¶
getResourceProfileId(
executorId: String): Int
getResourceProfileId...FIXME
getResourceProfileId is used for testing only.