MapOutputTracker¶
MapOutputTracker is an base abstraction of shuffle map output location registries.
Contract¶
getMapSizesByExecutorId¶
getMapSizesByExecutorId(
shuffleId: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
Used when:
SortShuffleManageris requested for a ShuffleReader
getMapSizesByRange¶
getMapSizesByRange(
shuffleId: Int,
startMapIndex: Int,
endMapIndex: Int,
startPartition: Int,
endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
Used when:
SortShuffleManageris requested for a ShuffleReader
unregisterShuffle¶
unregisterShuffle(
shuffleId: Int): Unit
Deletes map output status information for the specified shuffle stage
Used when:
ContextCleaneris requested to doCleanupShuffleBlockManagerSlaveEndpointis requested to handle a RemoveShuffle message
Implementations¶
Creating Instance¶
MapOutputTracker takes the following to be created:
Abstract Class
MapOutputTracker is an abstract class and cannot be created directly. It is created indirectly for the concrete MapOutputTrackers.
Accessing MapOutputTracker¶
MapOutputTracker is available using SparkEnv (on the driver and executors).
SparkEnv.get.mapOutputTracker
MapOutputTracker RPC Endpoint¶
trackerEndpoint is a RpcEndpointRef of the MapOutputTracker RPC endpoint.
trackerEndpoint is initialized (registered or looked up) when SparkEnv is created for the driver and executors.
trackerEndpoint is used to communicate (synchronously).
trackerEndpoint is cleared (null) when MapOutputTrackerMaster is requested to stop.
Deregistering Map Output Status Information of Shuffle Stage¶
unregisterShuffle(
shuffleId: Int): Unit
Deregisters map output status information for the given shuffle stage
Used when:
-
ContextCleaneris requested for shuffle cleanup -
BlockManagerSlaveEndpointis requested to remove a shuffle
Stopping MapOutputTracker¶
stop(): Unit
stop does nothing at all.
stop is used when SparkEnv is requested to stop (and stops all the services, incl. MapOutputTracker).
Converting MapStatuses To BlockManagerIds with ShuffleBlockIds and Their Sizes¶
convertMapStatuses(
shuffleId: Int,
startPartition: Int,
endPartition: Int,
statuses: Array[MapStatus]): Seq[(BlockManagerId, Seq[(BlockId, Long)])]
convertMapStatuses iterates over the input statuses array (of MapStatus entries indexed by map id) and creates a collection of BlockManagerIds (for each MapStatus entry) with a ShuffleBlockId (with the input shuffleId, a mapId, and partition ranging from the input startPartition and endPartition) and estimated size for the reduce block for every status and partitions.
For any empty MapStatus, convertMapStatuses prints out the following ERROR message to the logs:
Missing an output location for shuffle [id]
And convertMapStatuses throws a MetadataFetchFailedException (with shuffleId, startPartition, and the above error message).
convertMapStatuses is used when:
MapOutputTrackerMasteris requested for the sizes of shuffle map outputs by executor and rangeMapOutputTrackerWorkeris requested to sizes of shuffle map outputs by executor and range
Sending Blocking Messages To trackerEndpoint RpcEndpointRef¶
askTracker[T](message: Any): T
askTracker sends the input message to trackerEndpoint RpcEndpointRef and waits for a result.
When an exception happens, askTracker prints out the following ERROR message to the logs and throws a SparkException.
Error communicating with MapOutputTracker
askTracker is used when MapOutputTracker is requested to fetches map outputs for ShuffleDependency remotely and sends a one-way message.
Epoch¶
Starts from 0 when MapOutputTracker is created.
Can be updated (on MapOutputTrackerWorkers) or incremented (on the driver's MapOutputTrackerMaster).
sendTracker¶
sendTracker(
message: Any): Unit
sendTracker...FIXME
sendTracker is used when:
MapOutputTrackerMasteris requested to stop
Utilities¶
serializeMapStatuses¶
serializeMapStatuses(
statuses: Array[MapStatus],
broadcastManager: BroadcastManager,
isLocal: Boolean,
minBroadcastSize: Int,
conf: SparkConf): (Array[Byte], Broadcast[Array[Byte]])
serializeMapStatuses serializes the given array of map output locations into an efficient byte format (to send to reduce tasks). serializeMapStatuses compresses the serialized bytes using GZIP. They are supposed to be pretty compressible because many map outputs will be on the same hostname.
Internally, serializeMapStatuses creates a Java ByteArrayOutputStream.
serializeMapStatuses writes out 0 (direct) first.
serializeMapStatuses creates a Java GZIPOutputStream (with the ByteArrayOutputStream created) and writes out the given statuses array.
serializeMapStatuses decides whether to return the output array (of the output stream) or use a broadcast variable based on the size of the byte array.
If the size of the result byte array is the given minBroadcastSize threshold or bigger, serializeMapStatuses requests the input BroadcastManager to create a broadcast variable.
serializeMapStatuses resets the ByteArrayOutputStream and starts over.
serializeMapStatuses writes out 1 (broadcast) first.
serializeMapStatuses creates a new Java GZIPOutputStream (with the ByteArrayOutputStream created) and writes out the broadcast variable.
serializeMapStatuses prints out the following INFO message to the logs:
Broadcast mapstatuses size = [length], actual size = [length]
serializeMapStatuses is used when ShuffleStatus is requested to serialize shuffle map output statuses.
deserializeMapStatuses¶
deserializeMapStatuses(
bytes: Array[Byte],
conf: SparkConf): Array[MapStatus]
deserializeMapStatuses...FIXME
deserializeMapStatuses is used when:
MapOutputTrackerWorkeris requested to getStatuses