ShuffleExternalSorter¶
ShuffleExternalSorter is a specialized cache-efficient sorter that sorts arrays of compressed record pointers and partition ids.
ShuffleExternalSorter uses only 8 bytes of space per record in the sorting array to fit more of the array into cache.
ShuffleExternalSorter is created and used by UnsafeShuffleWriter only.

MemoryConsumer¶
ShuffleExternalSorter is a MemoryConsumer with page size of 128 MB (unless TaskMemoryManager uses smaller).
ShuffleExternalSorter can spill to disk to free up execution memory.
Configuration Properties¶
spark.shuffle.file.buffer¶
ShuffleExternalSorter uses spark.shuffle.file.buffer configuration property for...FIXME
spark.shuffle.spill.numElementsForceSpillThreshold¶
ShuffleExternalSorter uses spark.shuffle.spill.numElementsForceSpillThreshold configuration property for...FIXME
Creating Instance¶
ShuffleExternalSorter takes the following to be created:
- TaskMemoryManager
- BlockManager
- TaskContext
- Initial Size
- Number of Partitions
- SparkConf
- ShuffleWriteMetricsReporter
ShuffleExternalSorter is created when UnsafeShuffleWriter is requested to open a ShuffleExternalSorter.
ShuffleInMemorySorter¶
ShuffleExternalSorter manages a ShuffleInMemorySorter:
-
ShuffleInMemorySorteris created immediately whenShuffleExternalSorteris -
ShuffleInMemorySorteris requested to free up memory and dereferenced (nulled) whenShuffleExternalSorteris requested to cleanupResources and closeAndGetSpills
ShuffleExternalSorter uses the ShuffleInMemorySorter for the following:
Spilling To Disk¶
long spill(
long size,
MemoryConsumer trigger)
spill returns the memory bytes spilled (spill size).
spill prints out the following INFO message to the logs:
Thread [threadId] spilling sort data of [memoryUsage] to disk ([spillsSize] [time|times] so far)
spill writeSortedFile (with the isLastFile flag disabled).
spill frees up execution memory (and records the memory bytes spilled as spillSize).
spill requests the ShuffleInMemorySorter to reset.
In the end, spill requests the TaskContext for TaskMetrics to increase the memory bytes spilled.
spill is part of the MemoryConsumer abstraction.
closeAndGetSpills¶
SpillInfo[] closeAndGetSpills()
closeAndGetSpills...FIXME
closeAndGetSpills is used when UnsafeShuffleWriter is requested to closeAndWriteOutput.
getMemoryUsage¶
long getMemoryUsage()
getMemoryUsage...FIXME
getMemoryUsage is used when ShuffleExternalSorter is created and requested to spill and updatePeakMemoryUsed.
updatePeakMemoryUsed¶
void updatePeakMemoryUsed()
updatePeakMemoryUsed...FIXME
updatePeakMemoryUsed is used when ShuffleExternalSorter is requested to getPeakMemoryUsedBytes and freeMemory.
writeSortedFile¶
void writeSortedFile(
boolean isLastFile)
writeSortedFile...FIXME
writeSortedFile is used when ShuffleExternalSorter is requested to spill and closeAndGetSpills.
cleanupResources¶
void cleanupResources()
cleanupResources...FIXME
cleanupResources is used when UnsafeShuffleWriter is requested to write records and stop.
Inserting Serialized Record Into ShuffleInMemorySorter¶
void insertRecord(
Object recordBase,
long recordOffset,
int length,
int partitionId)
insertRecord...FIXME
insertRecord growPointerArrayIfNecessary.
insertRecord...FIXME
insertRecord acquireNewPageIfNecessary.
insertRecord...FIXME
insertRecord is used when UnsafeShuffleWriter is requested to insertRecordIntoSorter
growPointerArrayIfNecessary¶
void growPointerArrayIfNecessary()
growPointerArrayIfNecessary...FIXME
acquireNewPageIfNecessary¶
void acquireNewPageIfNecessary(
int required)
acquireNewPageIfNecessary...FIXME
freeMemory¶
long freeMemory()
freeMemory...FIXME
freeMemory is used when ShuffleExternalSorter is requested to spill, cleanupResources, and closeAndGetSpills.
Peak Memory Used¶
long getPeakMemoryUsedBytes()
getPeakMemoryUsedBytes...FIXME
getPeakMemoryUsedBytes is used when UnsafeShuffleWriter is requested to updatePeakMemoryUsed.
Logging¶
Enable ALL logging level for org.apache.spark.shuffle.sort.ShuffleExternalSorter logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.shuffle.sort.ShuffleExternalSorter=ALL
Refer to Logging.