ExternalShuffleBlockResolver¶
ExternalShuffleBlockResolver
manages converting shuffle BlockIds into physical segments of local files (from a process outside of Executors).
Creating Instance¶
ExternalShuffleBlockResolver
takes the following to be created:
- TransportConf
-
registeredExecutor
File (Java's File) - Directory Cleaner
ExternalShuffleBlockResolver
is created when:
ExternalBlockHandler
is created
Executors¶
ExternalShuffleBlockResolver
uses a mapping of ExecutorShuffleInfo
s by AppExecId
.
ExternalShuffleBlockResolver
can (re)load this mapping from a registeredExecutor file or simply start from scratch.
A new mapping is added when registering an executor.
Directory Cleaner Executor¶
ExternalShuffleBlockResolver
can be given a Java Executor or use a single worker thread executor (with spark-shuffle-directory-cleaner thread prefix).
The Executor
is used to schedule a thread to clean up executor's local directories and non-shuffle and non-RDD files in executor's local directories.
spark.shuffle.service.fetch.rdd.enabled¶
ExternalShuffleBlockResolver
uses spark.shuffle.service.fetch.rdd.enabled configuration property to control whether or not to remove cached RDD files (alongside shuffle output files).
Registering Executor¶
void registerExecutor(
String appId,
String execId,
ExecutorShuffleInfo executorInfo)
registerExecutor
...FIXME
registerExecutor
is used when:
ExternalBlockHandler
is requested to handle a RegisterExecutor message and reregisterExecutor
Cleaning Up Local Directories for Removed Executor¶
void executorRemoved(
String executorId,
String appId)
executorRemoved
prints out the following INFO message to the logs:
Clean up non-shuffle and non-RDD files associated with the finished executor [executorId]
executorRemoved
looks up the executor in the executors internal registry.
When found, executorRemoved
prints out the following INFO message to the logs and requests the Directory Cleaner Executor to execute asynchronous deletion of the executor's local directories (on a separate thread).
Cleaning up non-shuffle and non-RDD files in executor [AppExecId]'s [localDirs] local dirs
When not found, executorRemoved
prints out the following INFO message to the logs:
Executor is not registered (appId=[appId], execId=[executorId])
executorRemoved
is used when:
ExternalBlockHandler
is requested to executorRemoved
deleteNonShuffleServiceServedFiles¶
void deleteNonShuffleServiceServedFiles(
String[] dirs)
deleteNonShuffleServiceServedFiles
creates a Java FilenameFilter for files that meet all of the following:
- A file name does not end with
.index
or.data
- With rddFetchEnabled is enabled, a file name does not start with
rdd_
prefix
deleteNonShuffleServiceServedFiles
deletes files and directories (based on the FilenameFilter
) in every directory (in the input dirs
).
deleteNonShuffleServiceServedFiles
prints out the following DEBUG message to the logs:
Successfully cleaned up files not served by shuffle service in directory: [localDir]
In case of any exceptions, deleteNonShuffleServiceServedFiles
prints out the following ERROR message to the logs:
Failed to delete files not served by shuffle service in directory: [localDir]
Application Removed Notification¶
void applicationRemoved(
String appId,
boolean cleanupLocalDirs)
applicationRemoved
...FIXME
applicationRemoved
is used when:
ExternalBlockHandler
is requested to applicationRemoved
deleteExecutorDirs¶
void deleteExecutorDirs(
String[] dirs)
deleteExecutorDirs
...FIXME
Fetching Block Data¶
ManagedBuffer getBlockData(
String appId,
String execId,
int shuffleId,
long mapId,
int reduceId)
getBlockData
...FIXME
getBlockData
is used when:
ManagedBufferIterator
is createdShuffleManagedBufferIterator
is requested for next ManagedBuffer
Logging¶
Enable ALL
logging level for org.apache.spark.network.shuffle.ExternalShuffleBlockResolver
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.network.shuffle.ExternalShuffleBlockResolver=ALL
Refer to Logging.