RpcEnv¶
RpcEnv is an abstraction of RPC environments.
Contract¶
address¶
address: RpcAddress
RpcAddress of this RPC environments
asyncSetupEndpointRefByURI¶
asyncSetupEndpointRefByURI(
uri: String): Future[RpcEndpointRef]
Looking up a RpcEndpointRef of the RPC endpoint by URI (asynchronously)
Used when:
WorkerWatcheris createdCoarseGrainedExecutorBackendis requested to onStartRpcEnvis requested to setupEndpointRefByURI
awaitTermination¶
awaitTermination(): Unit
Blocks the current thread till the RPC environment terminates
Used when:
SparkEnvis requested to stopClientAppis requested to startLocalSparkClusteris requested to stop- Master and Worker are launched
CoarseGrainedExecutorBackendis requested to run
deserialize¶
deserialize[T](
deserializationAction: () => T): T
Used when:
PersistenceEngineis requested toreadPersistedDataNettyRpcEnvis requested to deserialize
endpointRef¶
endpointRef(
endpoint: RpcEndpoint): RpcEndpointRef
Used when:
RpcEndpointis requested for the RpcEndpointRef to itself
RpcEnvFileServer¶
fileServer: RpcEnvFileServer
RpcEnvFileServer of this RPC environment
Used when:
SparkContextis requested to addFile, addJar and is created (and registers the REPL's output directory)
openChannel¶
openChannel(
uri: String): ReadableByteChannel
Opens a channel to download a file at the given URI
Used when:
Utilsutility is used to doFetchFileExecutorClassLoaderis requested togetClassFileInputStreamFromSparkRPC
setupEndpoint¶
setupEndpoint(
name: String,
endpoint: RpcEndpoint): RpcEndpointRef
shutdown¶
shutdown(): Unit
Shuts down this RPC environment asynchronously (and to make sure this RpcEnv exits successfully, use awaitTermination)
Used when:
SparkEnvis requested to stopLocalSparkClusteris requested to stopDriverWrapperis launchedCoarseGrainedExecutorBackendis launchedNettyRpcEnvFactoryis requested to create an RpcEnv (in server mode and failed to assign a port)
Stopping RpcEndpointRef¶
stop(
endpoint: RpcEndpointRef): Unit
Used when:
SparkContextis requested to stopRpcEndpointis requested to stopBlockManageris requested to stop- in Spark SQL
Implementations¶
Creating Instance¶
RpcEnv takes the following to be created:
RpcEnv is created using RpcEnv.create utility.
Abstract Class
RpcEnv is an abstract class and cannot be created directly. It is created indirectly for the concrete RpcEnvs.
Creating RpcEnv¶
create(
name: String,
host: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
clientMode: Boolean = false): RpcEnv // (1)
create(
name: String,
bindAddress: String,
advertiseAddress: String,
port: Int,
conf: SparkConf,
securityManager: SecurityManager,
numUsableCores: Int,
clientMode: Boolean): RpcEnv
- Uses
0fornumUsableCores
create creates a NettyRpcEnvFactory and requests it to create an RpcEnv (with a new RpcEnvConfig with all the given arguments).
create is used when:
SparkEnvutility is requested to create a SparkEnv (clientModeflag is turned on for executors and off for the driver)-
With
clientModeflagtrue:CoarseGrainedExecutorBackendis requested to runClientAppis requested to start- Spark Standalone's
Masteris requested to startRpcEnvAndEndpoint - Spark Standalone's
Workeris requested to startRpcEnvAndEndpoint DriverWrapperis launchedApplicationMaster(Spark on YARN) is requested torunExecutorLauncher(in client deploy mode)
Default Endpoint Lookup Timeout¶
RpcEnv uses the default lookup timeout for...FIXME
When a remote endpoint is resolved, a local RPC environment connects to the remote one (endpoint lookup). To configure the time needed for the endpoint lookup you can use the following settings.
It is a prioritized list of lookup timeout properties (the higher on the list, the more important):