ExternalShuffleService¶
ExternalShuffleService
is a Spark service that can serve RDD and shuffle blocks.
ExternalShuffleService
manages shuffle output files so they are available to executors. As the shuffle output files are managed externally to the executors it offers an uninterrupted access to the shuffle output files regardless of executors being killed or down (esp. with Dynamic Allocation of Executors).
ExternalShuffleService
can be launched from command line.
ExternalShuffleService
is enabled on the driver and executors using spark.shuffle.service.enabled configuration property.
Note
Spark on YARN uses a custom external shuffle service (YarnShuffleService
).
Launching ExternalShuffleService¶
ExternalShuffleService
can be launched as a standalone application using spark-class.
spark-class org.apache.spark.deploy.ExternalShuffleService
main Entry Point¶
main(
args: Array[String]): Unit
main
is the entry point of ExternalShuffleService
standalone application.
main
prints out the following INFO message to the logs:
Started daemon with process name: [name]
main
registers signal handlers for TERM
, HUP
, INT
signals.
main
loads the default Spark properties.
main
creates a SecurityManager
.
main
turns spark.shuffle.service.enabled to true
explicitly (since this service is started from the command line for a reason).
main
creates an ExternalShuffleService and starts it.
main
prints out the following DEBUG message to the logs:
Adding shutdown hook
main
registers a shutdown hook. When triggered, the shutdown hook prints the following INFO message to the logs and requests the ExternalShuffleService
to stop.
Shutting down shuffle service.
Creating Instance¶
ExternalShuffleService
takes the following to be created:
- SparkConf
-
SecurityManager
ExternalShuffleService
is created when:
ExternalShuffleService
standalone application is startedWorker
(Spark Standalone) is created (and initializes anExternalShuffleService
)
TransportServer¶
server: TransportServer
ExternalShuffleService
uses an internal reference to a TransportServer that is created when ExternalShuffleService
is started.
ExternalShuffleService
uses an ExternalBlockHandler to handle RPC messages (and serve RDD blocks and shuffle blocks).
TransportServer
is closed when ExternalShuffleService
is requested to stop.
TransportServer
is used for metrics.
Port¶
ExternalShuffleService
uses spark.shuffle.service.port configuration property for the port to listen to when started.
spark.shuffle.service.enabled¶
ExternalShuffleService
uses spark.shuffle.service.enabled configuration property to control whether or not is enabled (and should be started when requested).
ExternalBlockHandler¶
blockHandler: ExternalBlockHandler
ExternalShuffleService
creates an ExternalBlockHandler when created.
With spark.shuffle.service.db.enabled and spark.shuffle.service.enabled configuration properties enabled, the ExternalBlockHandler
is given a local directory with a registeredExecutors.ldb file.
blockHandler
is used to create a TransportContext that creates the TransportServer.
blockHandler
is used when:
findRegisteredExecutorsDBFile¶
findRegisteredExecutorsDBFile(
dbName: String): File
findRegisteredExecutorsDBFile
returns one of the local directories (defined using spark.local.dir configuration property) with the input dbName
file or null
when no directories defined.
findRegisteredExecutorsDBFile
searches the local directories (defined using spark.local.dir configuration property) for the input dbName
file. Unless found, findRegisteredExecutorsDBFile
takes the first local directory.
With no local directories defined in spark.local.dir configuration property, findRegisteredExecutorsDBFile
prints out the following WARN message to the logs and returns null
.
'spark.local.dir' should be set first when we use db in ExternalShuffleService. Note that this only affects standalone mode.
Starting ExternalShuffleService¶
start(): Unit
start
prints out the following INFO message to the logs:
Starting shuffle service on port [port] (auth enabled = [authEnabled])
start
creates a AuthServerBootstrap
with authentication enabled (using SecurityManager).
start
creates a TransportContext (with the ExternalBlockHandler) and requests it to create a server (on the port).
start
...FIXME
start
is used when:
ExternalShuffleService
is requested to startIfEnabled and is launched (as a command-line application)
startIfEnabled¶
startIfEnabled(): Unit
startIfEnabled
starts the external shuffle service if enabled.
startIfEnabled
is used when:
Worker
(Spark Standalone) is requested tostartExternalShuffleService
Executor Removed Notification¶
executorRemoved(
executorId: String,
appId: String): Unit
executorRemoved
requests the ExternalBlockHandler to executorRemoved.
executorRemoved
is used when:
Worker
(Spark Standalone) is requested to handleExecutorStateChanged
Application Finished Notification¶
applicationRemoved(
appId: String): Unit
applicationRemoved
requests the ExternalBlockHandler to applicationRemoved (with cleanupLocalDirs
flag enabled).
applicationRemoved
is used when:
Worker
(Spark Standalone) is requested to handle WorkDirCleanup message and maybeCleanupApplication
Logging¶
Enable ALL
logging level for org.apache.spark.deploy.ExternalShuffleService
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.deploy.ExternalShuffleService=ALL
Refer to Logging.