MapOutputTrackerMaster¶
MapOutputTrackerMaster
is a MapOutputTracker for the driver.
MapOutputTrackerMaster
is the source of truth of shuffle map output locations.
Creating Instance¶
MapOutputTrackerMaster
takes the following to be created:
- SparkConf
- BroadcastManager
-
isLocal
flag (to indicate whetherMapOutputTrackerMaster
runs in local or a cluster)
When created, MapOutputTrackerMaster
starts dispatcher threads on the map-output-dispatcher thread pool.
MapOutputTrackerMaster
is created when:
SparkEnv
utility is used to create a SparkEnv for the driver
maxRpcMessageSize¶
maxRpcMessageSize
is...FIXME
BroadcastManager¶
MapOutputTrackerMaster
is given a BroadcastManager to be created.
Shuffle Map Output Status Registry¶
MapOutputTrackerMaster
uses an internal registry of ShuffleStatuses by shuffle stages.
MapOutputTrackerMaster
adds a new shuffle when requested to register one (when DAGScheduler
is requested to create a ShuffleMapStage for a ShuffleDependency).
MapOutputTrackerMaster
uses the registry when requested for the following:
-
unregisterMapOutput, unregisterAllMapOutput, unregisterShuffle, removeOutputsOnHost, removeOutputsOnExecutor, containsShuffle, getNumAvailableOutputs, findMissingPartitions, getLocationsWithLargestOutputs, getMapSizesByExecutorId
MapOutputTrackerMaster
removes (clears) all shuffles when requested to stop.
Configuration Properties¶
MapOutputTrackerMaster
uses the following configuration properties:
Map and Reduce Task Thresholds for Preferred Locations¶
MapOutputTrackerMaster
defines 1000 (tasks) as the hardcoded threshold of the number of map and reduce tasks when requested to compute preferred locations with spark.shuffle.reduceLocality.enabled.
Map Output Threshold for Preferred Location of Reduce Tasks¶
MapOutputTrackerMaster
defines 0.2
as the fraction of total map output that must be at a location for it to considered as a preferred location for a reduce task.
Making this larger will focus on fewer locations where most data can be read locally, but may lead to more delay in scheduling if those locations are busy.
MapOutputTrackerMaster
uses the fraction when requested for the preferred locations of shuffle RDDs.
GetMapOutputMessage Queue¶
MapOutputTrackerMaster
uses a blocking queue (a Java LinkedBlockingQueue) for requests for map output statuses.
GetMapOutputMessage(
shuffleId: Int,
context: RpcCallContext)
GetMapOutputMessage
holds the shuffle ID and the RpcCallContext
of the caller.
A new GetMapOutputMessage
is added to the queue when MapOutputTrackerMaster
is requested to post one.
MapOutputTrackerMaster
uses MessageLoop Dispatcher Threads to process GetMapOutputMessages
.
MessageLoop Dispatcher Thread¶
MessageLoop
is a thread of execution to handle GetMapOutputMessages until a PoisonPill
marker message arrives (when MapOutputTrackerMaster
is requested to stop).
MessageLoop
takes a GetMapOutputMessage
and prints out the following DEBUG message to the logs:
Handling request to send map output locations for shuffle [shuffleId] to [hostPort]
MessageLoop
then finds the ShuffleStatus by the shuffle ID in the shuffleStatuses internal registry and replies back (to the RPC client) with a serialized map output status (with the BroadcastManager and spark.shuffle.mapOutput.minSizeForBroadcast configuration property).
MessageLoop
threads run on the map-output-dispatcher Thread Pool.
map-output-dispatcher Thread Pool¶
threadpool: ThreadPoolExecutor
threadpool
is a daemon fixed thread pool registered with map-output-dispatcher thread name prefix.
threadpool
uses spark.shuffle.mapOutput.dispatcher.numThreads configuration property for the number of MessageLoop dispatcher threads to process received GetMapOutputMessage
messages.
The dispatcher threads are started immediately when MapOutputTrackerMaster
is created.
The thread pool is shut down when MapOutputTrackerMaster
is requested to stop.
Epoch Number¶
MapOutputTrackerMaster
uses an epoch number to...FIXME
getEpoch
is used when:
-
DAGScheduler
is requested to removeExecutorAndUnregisterOutputs -
TaskSetManager is created (and sets the epoch to tasks)
Enqueueing GetMapOutputMessage¶
post(
message: GetMapOutputMessage): Unit
post
simply adds the input GetMapOutputMessage
to the mapOutputRequests internal queue.
post
is used when MapOutputTrackerMasterEndpoint
is requested to handle a GetMapOutputStatuses message.
Stopping MapOutputTrackerMaster¶
stop(): Unit
stop
...FIXME
stop
is part of the MapOutputTracker abstraction.
Unregistering Shuffle Map Output¶
unregisterMapOutput(
shuffleId: Int,
mapId: Int,
bmAddress: BlockManagerId): Unit
unregisterMapOutput
...FIXME
unregisterMapOutput
is used when DAGScheduler
is requested to handle a task completion (due to a fetch failure).
Computing Preferred Locations¶
getPreferredLocationsForShuffle(
dep: ShuffleDependency[_, _, _],
partitionId: Int): Seq[String]
getPreferredLocationsForShuffle
computes the locations (BlockManagers) with the most shuffle map outputs for the input ShuffleDependency and Partition.
getPreferredLocationsForShuffle
computes the locations when all of the following are met:
-
spark.shuffle.reduceLocality.enabled configuration property is enabled
-
The number of "map" partitions (of the RDD of the input ShuffleDependency) is below SHUFFLE_PREF_MAP_THRESHOLD
-
The number of "reduce" partitions (of the Partitioner of the input ShuffleDependency) is below SHUFFLE_PREF_REDUCE_THRESHOLD
Note
getPreferredLocationsForShuffle
is simply getLocationsWithLargestOutputs with a guard condition.
Internally, getPreferredLocationsForShuffle
checks whether spark.shuffle.reduceLocality.enabled configuration property is enabled with the number of partitions of the RDD of the input ShuffleDependency
and partitions in the partitioner of the input ShuffleDependency
both being less than 1000
.
Note
The thresholds for the number of partitions in the RDD and of the partitioner when computing the preferred locations are 1000
and are not configurable.
If the condition holds, getPreferredLocationsForShuffle
finds locations with the largest number of shuffle map outputs for the input ShuffleDependency
and partitionId
(with the number of partitions in the partitioner of the input ShuffleDependency
and 0.2
) and returns the hosts of the preferred BlockManagers
.
Note
0.2
is the fraction of total map output that must be at a location to be considered as a preferred location for a reduce task. It is not configurable.
getPreferredLocationsForShuffle
is used when ShuffledRDD and Spark SQL's ShuffledRowRDD
are requested for preferred locations of a partition.
Finding Locations with Largest Number of Shuffle Map Outputs¶
getLocationsWithLargestOutputs(
shuffleId: Int,
reducerId: Int,
numReducers: Int,
fractionThreshold: Double): Option[Array[BlockManagerId]]
getLocationsWithLargestOutputs
returns BlockManagerIds with the largest size (of all the shuffle blocks they manage) above the input fractionThreshold
(given the total size of all the shuffle blocks for the shuffle across all BlockManagers).
Note
getLocationsWithLargestOutputs
may return no BlockManagerId
if their shuffle blocks do not total up above the input fractionThreshold
.
Note
The input numReducers
is not used.
Internally, getLocationsWithLargestOutputs
queries the mapStatuses internal cache for the input shuffleId
.
Note
One entry in mapStatuses
internal cache is a MapStatus array indexed by partition id.
MapStatus
includes information about the BlockManager
(as BlockManagerId
) and estimated size of the reduce blocks.
getLocationsWithLargestOutputs
iterates over the MapStatus
array and builds an interim mapping between BlockManagerId and the cumulative sum of shuffle blocks across BlockManagers.
Incrementing Epoch¶
incrementEpoch(): Unit
incrementEpoch
increments the internal epoch.
incrementEpoch
prints out the following DEBUG message to the logs:
Increasing epoch to [epoch]
incrementEpoch
is used when:
-
MapOutputTrackerMaster
is requested to unregisterMapOutput, unregisterAllMapOutput, removeOutputsOnHost and removeOutputsOnExecutor -
DAGScheduler
is requested to handle a ShuffleMapTask completion (of aShuffleMapStage
)
Checking Availability of Shuffle Map Output Status¶
containsShuffle(
shuffleId: Int): Boolean
containsShuffle
checks if the input shuffleId
is registered in the cachedSerializedStatuses or mapStatuses internal caches.
containsShuffle
is used when DAGScheduler
is requested to create a createShuffleMapStage (for a ShuffleDependency).
Registering Shuffle¶
registerShuffle(
shuffleId: Int,
numMaps: Int): Unit
registerShuffle
registers a new ShuffleStatus (for the given shuffle ID and the number of partitions) to the shuffleStatuses internal registry.
registerShuffle
throws an IllegalArgumentException
when the shuffle ID has already been registered:
Shuffle ID [shuffleId] registered twice
registerShuffle
is used when:
DAGScheduler
is requested to create a ShuffleMapStage (for a ShuffleDependency)
Registering Map Outputs for Shuffle (Possibly with Epoch Change)¶
registerMapOutputs(
shuffleId: Int,
statuses: Array[MapStatus],
changeEpoch: Boolean = false): Unit
registerMapOutputs
registers the input statuses
(as the shuffle map output) with the input shuffleId
in the mapStatuses internal cache.
registerMapOutputs
increments epoch if the input changeEpoch
is enabled (it is not by default).
registerMapOutputs
is used when DAGScheduler
handles successful ShuffleMapTask
completion and executor lost events.
Finding Serialized Map Output Statuses (And Possibly Broadcasting Them)¶
getSerializedMapOutputStatuses(
shuffleId: Int): Array[Byte]
getSerializedMapOutputStatuses
finds cached serialized map statuses for the input shuffleId
.
If found, getSerializedMapOutputStatuses
returns the cached serialized map statuses.
Otherwise, getSerializedMapOutputStatuses
acquires the shuffle lock for shuffleId
and finds cached serialized map statuses again since some other thread could not update the cachedSerializedStatuses internal cache.
getSerializedMapOutputStatuses
returns the serialized map statuses if found.
If not, getSerializedMapOutputStatuses
serializes the local array of MapStatuses
(from checkCachedStatuses).
getSerializedMapOutputStatuses
prints out the following INFO message to the logs:
Size of output statuses for shuffle [shuffleId] is [bytes] bytes
getSerializedMapOutputStatuses
saves the serialized map output statuses in cachedSerializedStatuses internal cache if the epoch has not changed in the meantime. getSerializedMapOutputStatuses
also saves its broadcast version in cachedSerializedBroadcast internal cache.
If the epoch has changed in the meantime, the serialized map output statuses and their broadcast version are not saved, and getSerializedMapOutputStatuses
prints out the following INFO message to the logs:
Epoch changed, not caching!
getSerializedMapOutputStatuses
removes the broadcast.
getSerializedMapOutputStatuses
returns the serialized map statuses.
getSerializedMapOutputStatuses
is used when MapOutputTrackerMaster responds to GetMapOutputMessage
requests and DAGScheduler
creates ShuffleMapStage
for ShuffleDependency
(copying the shuffle map output locations from previous jobs to avoid unnecessarily regenerating data).
Finding Cached Serialized Map Statuses¶
checkCachedStatuses(): Boolean
checkCachedStatuses
is an internal helper method that <statuses
, retBytes
and epochGotten
(that getSerializedMapOutputStatuses uses).
Internally, checkCachedStatuses
acquires the MapOutputTracker.md#epochLock[epochLock
lock] and checks the status of <
If epoch
is younger (i.e. greater), checkCachedStatuses
clears <cacheEpoch
to be epoch
.
checkCachedStatuses
gets the serialized map output statuses for the shuffleId
(of the owning <
When the serialized map output status is found, checkCachedStatuses
saves it in a local retBytes
and returns true
.
When not found, you should see the following DEBUG message in the logs:
cached status not found for : [shuffleId]
checkCachedStatuses
uses MapOutputTracker.md#mapStatuses[mapStatuses] internal cache to get map output statuses for the shuffleId
(of the owning <statuses
. checkCachedStatuses
sets the local epochGotten
to the current <false
.
Registering Shuffle Map Output¶
registerMapOutput(
shuffleId: Int,
mapId: Int,
status: MapStatus): Unit
registerMapOutput
finds the ShuffleStatus by the given shuffle ID and adds the given MapStatus:
-
The given mapId is the partitionId of the ShuffleMapTask that finished.
-
The given shuffleId is the shuffleId of the ShuffleDependency of the ShuffleMapStage (for which the
ShuffleMapTask
completed)
registerMapOutput
is used when DAGScheduler
is requested to handle a ShuffleMapTask completion.
Map Output Statistics for ShuffleDependency¶
getStatistics(
dep: ShuffleDependency[_, _, _]): MapOutputStatistics
getStatistics
looks up the ShuffleStatus for the shuffleId (of the input ShuffleDependency) in the shuffleStatuses registry.
Note
It is assumed that the shuffleStatuses registry does have the ShuffleStatus
. That makes me believe "someone else" is taking care of whether it is available or not.
getStatistics
requests the ShuffleStatus
for the MapStatuses (of the ShuffleDependency
).
getStatistics
uses the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property to decide on parallelism to calculate the statistics.
With no parallelism, getStatistics
simply traverses over the MapStatus
es and requests them (one by one) for the size of every reduce shuffle block.
Note
getStatistics
requests the given ShuffleDependency
for the Partitioner that in turn is requested for the number of partitions.
The number of reduce blocks is the number of MapStatus
es multiplied by the number of partitions.
And hence the need for parallelism based on the spark.shuffle.mapOutput.parallelAggregationThreshold configuration property.
In the end, getStatistics
creates a MapOutputStatistics
with the shuffle ID and the total sizes (sumed up for every partition).
getStatistics
is used when:
DAGScheduler
is requested to handle a successful ShuffleMapStage submission and markMapStageJobsAsFinished
Deregistering All Map Outputs of Shuffle Stage¶
unregisterAllMapOutput(
shuffleId: Int): Unit
unregisterAllMapOutput
...FIXME
unregisterAllMapOutput
is used when DAGScheduler
is requested to handle a task completion (due to a fetch failure).
Deregistering Shuffle¶
unregisterShuffle(
shuffleId: Int): Unit
unregisterShuffle
...FIXME
unregisterShuffle
is part of the MapOutputTracker abstraction.
Deregistering Shuffle Outputs Associated with Host¶
removeOutputsOnHost(
host: String): Unit
removeOutputsOnHost
...FIXME
removeOutputsOnHost
is used when DAGScheduler
is requested to removeExecutorAndUnregisterOutputs and handle a worker removal.
Deregistering Shuffle Outputs Associated with Executor¶
removeOutputsOnExecutor(
execId: String): Unit
removeOutputsOnExecutor
...FIXME
removeOutputsOnExecutor
is used when DAGScheduler
is requested to removeExecutorAndUnregisterOutputs.
Number of Partitions with Shuffle Map Outputs Available¶
getNumAvailableOutputs(
shuffleId: Int): Int
getNumAvailableOutputs
...FIXME
getNumAvailableOutputs
is used when ShuffleMapStage
is requested for the number of partitions with shuffle outputs available.
Finding Missing Partitions¶
findMissingPartitions(
shuffleId: Int): Option[Seq[Int]]
findMissingPartitions
...FIXME
findMissingPartitions
is used when ShuffleMapStage
is requested for missing partitions.
Finding Locations with Blocks and Sizes¶
getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long)])]
getMapSizesByExecutorId
is part of the MapOutputTracker abstraction.
getMapSizesByExecutorId
returns a collection of BlockManagerIds with their blocks and sizes.
When executed, getMapSizesByExecutorId
prints out the following DEBUG message to the logs:
Fetching outputs for shuffle [id], partitions [startPartition]-[endPartition]
getMapSizesByExecutorId
finds map outputs for the input shuffleId
.
Note
getMapSizesByExecutorId
gets the map outputs for all the partitions (despite the method's signature).
In the end, getMapSizesByExecutorId
converts shuffle map outputs (as MapStatuses
) into the collection of BlockManagerIds with their blocks and sizes.
Logging¶
Enable ALL
logging level for org.apache.spark.MapOutputTrackerMaster
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.MapOutputTrackerMaster=ALL
Refer to Logging.