BlockStoreShuffleReader¶
BlockStoreShuffleReader is a ShuffleReader.
Creating Instance¶
BlockStoreShuffleReader takes the following to be created:
- BaseShuffleHandle
- Blocks by Address (
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]) - TaskContext
-
ShuffleReadMetricsReporter - SerializerManager
- BlockManager
- MapOutputTracker
-
shouldBatchFetchflag (default:false)
BlockStoreShuffleReader is created when:
SortShuffleManageris requested for a ShuffleReader (for aShuffleHandleand a range of reduce partitions)
Reading Combined Records (for Reduce Task)¶
read(): Iterator[Product2[K, C]]
read is part of the ShuffleReader abstraction.
read creates a ShuffleBlockFetcherIterator.
read...FIXME
fetchContinuousBlocksInBatch¶
fetchContinuousBlocksInBatch: Boolean
fetchContinuousBlocksInBatch...FIXME
Review Me¶
=== [[read]] Reading Combined Records For Reduce Task
Internally, read first storage:ShuffleBlockFetcherIterator.md#creating-instance[creates a ShuffleBlockFetcherIterator] (passing in the values of <
NOTE: read uses scheduler:MapOutputTracker.md#getMapSizesByExecutorId[MapOutputTracker to find the BlockManagers with the shuffle blocks and sizes] to create ShuffleBlockFetcherIterator.
read creates a new serializer:SerializerInstance.md[SerializerInstance] (using Serializer from ShuffleDependency).
read creates a key/value iterator by deserializeStream every shuffle block stream.
read updates the context task metrics for each record read.
NOTE: read uses CompletionIterator (to count the records read) and spark-InterruptibleIterator.md[InterruptibleIterator] (to support task cancellation).
If the ShuffleDependency has an Aggregator defined, read wraps the current iterator inside an iterator defined by Aggregator.combineCombinersByKey (for mapSideCombine enabled) or Aggregator.combineValuesByKey otherwise.
NOTE: run reports an exception when ShuffleDependency has no Aggregator defined with mapSideCombine flag enabled.
For keyOrdering defined in the ShuffleDependency, run does the following:
- shuffle:ExternalSorter.md#creating-instance[Creates an
ExternalSorter] - shuffle:ExternalSorter.md#insertAll[Inserts all the records] into the
ExternalSorter - Updates context
TaskMetrics - Returns a
CompletionIteratorfor theExternalSorter