BlockStoreShuffleReader¶
BlockStoreShuffleReader
is a ShuffleReader.
Creating Instance¶
BlockStoreShuffleReader
takes the following to be created:
- BaseShuffleHandle
- Blocks by Address (
Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])]
) - TaskContext
-
ShuffleReadMetricsReporter
- SerializerManager
- BlockManager
- MapOutputTracker
-
shouldBatchFetch
flag (default:false
)
BlockStoreShuffleReader
is created when:
SortShuffleManager
is requested for a ShuffleReader (for aShuffleHandle
and a range of reduce partitions)
Reading Combined Records (for Reduce Task)¶
read(): Iterator[Product2[K, C]]
read
is part of the ShuffleReader abstraction.
read
creates a ShuffleBlockFetcherIterator.
read
...FIXME
fetchContinuousBlocksInBatch¶
fetchContinuousBlocksInBatch: Boolean
fetchContinuousBlocksInBatch
...FIXME
Review Me¶
=== [[read]] Reading Combined Records For Reduce Task
Internally, read
first storage:ShuffleBlockFetcherIterator.md#creating-instance[creates a ShuffleBlockFetcherIterator
] (passing in the values of <
NOTE: read
uses scheduler:MapOutputTracker.md#getMapSizesByExecutorId[MapOutputTracker
to find the BlockManagers with the shuffle blocks and sizes] to create ShuffleBlockFetcherIterator
.
read
creates a new serializer:SerializerInstance.md[SerializerInstance] (using Serializer
from ShuffleDependency).
read
creates a key/value iterator by deserializeStream
every shuffle block stream.
read
updates the context task metrics for each record read.
NOTE: read
uses CompletionIterator
(to count the records read) and spark-InterruptibleIterator.md[InterruptibleIterator] (to support task cancellation).
If the ShuffleDependency
has an Aggregator
defined, read
wraps the current iterator inside an iterator defined by Aggregator.combineCombinersByKey (for mapSideCombine
enabled) or Aggregator.combineValuesByKey otherwise.
NOTE: run
reports an exception when ShuffleDependency
has no Aggregator
defined with mapSideCombine
flag enabled.
For keyOrdering defined in the ShuffleDependency
, run
does the following:
- shuffle:ExternalSorter.md#creating-instance[Creates an
ExternalSorter
] - shuffle:ExternalSorter.md#insertAll[Inserts all the records] into the
ExternalSorter
- Updates context
TaskMetrics
- Returns a
CompletionIterator
for theExternalSorter