SortShuffleWriter — Fallback ShuffleWriter¶
SortShuffleWriter
is a "fallback" ShuffleWriter (when SortShuffleManager
is requested for a ShuffleWriter and the more specialized BypassMergeSortShuffleWriter and UnsafeShuffleWriter could not be used).
SortShuffleWriter[K, V, C]
is a parameterized type with K
keys, V
values, and C
combiner values.
Creating Instance¶
SortShuffleWriter
takes the following to be created:
SortShuffleWriter
is created when:
SortShuffleManager
is requested for a ShuffleWriter (for a given ShuffleHandle)
MapStatus¶
SortShuffleWriter
uses mapStatus
internal registry for a MapStatus after writing records.
Writing records itself does not return a value and SortShuffleWriter
uses the registry when requested to stop (which allows returning a MapStatus
).
Writing Records (Into Shuffle Partitioned File In Disk Store)¶
write(
records: Iterator[Product2[K, V]]): Unit
write
is part of the ShuffleWriter abstraction.
write
creates an ExternalSorter based on the ShuffleDependency (of the BaseShuffleHandle), namely the Map-Size Partial Aggregation flag. The ExternalSorter
uses the aggregator and key ordering when the flag is enabled.
write
requests the ExternalSorter
to insert all the given records.
write
...FIXME
Stopping SortShuffleWriter (and Calculating MapStatus)¶
stop(
success: Boolean): Option[MapStatus]
stop
is part of the ShuffleWriter abstraction.
stop
turns the stopping flag on and returns the internal mapStatus if the input success
is enabled.
Otherwise, when stopping flag is already enabled or the input success
is disabled, stop
returns no MapStatus
(i.e. None
).
In the end, stop
requests the ExternalSorter
to stop and increments the shuffle write time task metrics.
Requirements of BypassMergeSortShuffleHandle (as ShuffleHandle)¶
shouldBypassMergeSort(
conf: SparkConf,
dep: ShuffleDependency[_, _, _]): Boolean
shouldBypassMergeSort
returns true
when all of the following hold:
-
No map-side aggregation (the mapSideCombine flag of the given ShuffleDependency is off)
-
Number of partitions (of the Partitioner of the given ShuffleDependency) is not greater than spark.shuffle.sort.bypassMergeThreshold configuration property
Otherwise, shouldBypassMergeSort
does not hold (false
).
shouldBypassMergeSort
is used when:
SortShuffleManager
is requested to register a shuffle (and creates a ShuffleHandle)
stopping Flag¶
SortShuffleWriter
uses stopping
internal flag to indicate whether or not this SortShuffleWriter
has been stopped.
Logging¶
Enable ALL
logging level for org.apache.spark.shuffle.sort.SortShuffleWriter
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.shuffle.sort.SortShuffleWriter=ALL
Refer to Logging.