Worker¶
Worker
is a logical worker node in a Spark Standalone cluster.
Worker
can be launched from command line.
Worker RPC Endpoint¶
Worker
is a ThreadSafeRpcEndpoint and is registered under Worker name (when launched as a command-line application and requested to set up an RPC environment).
Launching Standalone Worker¶
Worker
can be launched as a standalone application using spark-class.
./bin/spark-class org.apache.spark.deploy.worker.Worker
Note
At least one master URL is required.
main Entry Point¶
main(
args: Array[String]): Unit
main
is the entry point of Worker
standalone application.
main
prints out the following INFO message to the logs:
Started daemon with process name: [processName]
main
registers signal handlers for TERM
, HUP
, INT
signals.
main
parses command-line options (using WorkerArguments
) and initializes an RpcEnv.
main
asserts that:
- External shuffle service is not used (based on spark.shuffle.service.enabled configuration property)
- Number of worker instances is
1
(based onSPARK_WORKER_INSTANCES
environment variable)
main
throws an IllegalArgumentException
when the above does not hold:
Starting multiple workers on one host is failed because we may launch no more than one external shuffle service on each host, please set spark.shuffle.service.enabled to false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.
In the end, main
requests the RpcEnv
to be notified when terminated.
Command-Line Options¶
Worker
supports command-line options.
Usage: Worker [options] <master>
Master must be a URL of the form spark://hostname:port
Options:
-c CORES, --cores CORES Number of cores to use
-m MEM, --memory MEM Amount of memory to use (e.g. 1000M, 2G)
-d DIR, --work-dir DIR Directory to run apps in (default: SPARK_HOME/work)
-i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)
-h HOST, --host HOST Hostname to listen on
-p PORT, --port PORT Port to listen on (default: random)
--webui-port PORT Port for web UI (default: 8081)
--properties-file FILE Path to a custom Spark properties file.
Default is conf/spark-defaults.conf.
cores¶
host¶
ip¶
Master URLs¶
(required) Comma-separated standalone Master's URLs in the form:
spark://host1:port1,host2:port2,...
memory¶
port¶
properties-file¶
webui-port¶
work-dir¶
Creating Instance¶
Worker
takes the following to be created:
- RpcEnv
- web UI's Port
- Number of CPU cores
- Memory
- RpcAddresses of Masters
- Endpoint Name
- Work Dir Path (default:
null
) - SparkConf
-
SecurityManager
- Optional Resource File (default: (undefined))
- Supplier of ExternalShuffleService (default:
null
)
Worker
is created when:
Worker
utility is requested to startRpcEnvAndEndpoint
ExternalShuffleService¶
Worker
initializes an ExternalShuffleService (directly or indirectly using a Supplier if given).
ExternalShuffleService
is started when Worker
is requested to startExternalShuffleService.
ExternalShuffleService
is used as follows:
-
Informed about an application removed when
Worker
handles a WorkDirCleanup message or maybeCleanupApplication -
Informed about an executor removed when
Worker
is requested to handleExecutorStateChanged
ExternalShuffleService
is stopped when Worker
is requested to stop.
Starting Up RPC Environment¶
startRpcEnvAndEndpoint(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
masterUrls: Array[String],
workDir: String,
workerNumber: Option[Int] = None,
conf: SparkConf = new SparkConf,
resourceFileOpt: Option[String] = None): RpcEnv
startRpcEnvAndEndpoint
creates an RpcEnv with the name sparkWorker
and the given host
and port
.
startRpcEnvAndEndpoint
translates the given masterUrls to RpcAddresses.
startRpcEnvAndEndpoint
creates a Worker and requests the RpcEnv
to set it up as an RPC endpoint under the Worker name.
startRpcEnvAndEndpoint
is used when:
onStart¶
onStart(): Unit
onStart
is part of the RpcEndpoint abstraction.
onStart
...FIXME
Creating Work Directory¶
createWorkDir(): Unit
createWorkDir
sets <work
subdirectory.
In the end, createWorkDir
creates <
createWorkDir
reports...FIXME
Messages¶
ApplicationFinished¶
DriverStateChanged¶
ExecutorStateChanged¶
ExecutorStateChanged(
appId: String,
execId: Int,
state: ExecutorState,
message: Option[String],
exitStatus: Option[Int])
Message Handler: handleExecutorStateChanged
Posted when:
ExecutorRunner
is requested to killProcess and fetchAndRunExecutor
KillDriver¶
KillExecutor¶
LaunchDriver¶
LaunchExecutor¶
MasterChanged¶
ReconnectWorker¶
RegisterWorkerResponse¶
ReregisterWithMaster¶
RequestWorkerState¶
SendHeartbeat¶
WorkDirCleanup¶
handleExecutorStateChanged¶
handleExecutorStateChanged(
executorStateChanged: ExecutorStateChanged): Unit
handleExecutorStateChanged
...FIXME
handleExecutorStateChanged
is used when:
Worker
is requested to handle ExecutorStateChanged message
maybeCleanupApplication¶
maybeCleanupApplication(
id: String): Unit
maybeCleanupApplication
...FIXME
maybeCleanupApplication
is used when:
Worker
is requested to handle a ApplicationFinished message and handleExecutorStateChanged
Logging¶
Enable ALL
logging level for org.apache.spark.deploy.worker.Worker
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.deploy.worker.Worker=ALL
Refer to Logging.