NettyBlockTransferService¶
NettyBlockTransferService is a BlockTransferService that uses Netty for uploading and fetching blocks of data.

Creating Instance¶
NettyBlockTransferService takes the following to be created:
- SparkConf
-
SecurityManager - Bind Address
- Host Name
- Port
- Number of CPU Cores
- Driver RpcEndpointRef
NettyBlockTransferService is created when:
SparkEnvutility is used to create a SparkEnv (for the driver and executors and creates a BlockManager)
Initializing¶
init(
blockDataManager: BlockDataManager): Unit
init is part of the BlockTransferService abstraction.
init creates a NettyBlockRpcServer (with the application ID, a JavaSerializer and the given BlockDataManager).
init creates a TransportContext (with the NettyBlockRpcServer just created) and requests it for a TransportClientFactory.
init createServer.
In the end, init prints out the following INFO message to the logs:
Server created on [hostName]:[port]
Fetching Blocks¶
fetchBlocks(
host: String,
port: Int,
execId: String,
blockIds: Array[String],
listener: BlockFetchingListener,
tempFileManager: DownloadFileManager): Unit
fetchBlocks is part of the BlockStoreClient abstraction.
fetchBlocks prints out the following TRACE message to the logs:
Fetch blocks from [host]:[port] (executor id [execId])
fetchBlocks creates a BlockFetchStarter.
fetchBlocks requests the TransportConf for the maxIORetries.
With the maxIORetries above zero, fetchBlocks creates a RetryingBlockFetcher (with the BlockFetchStarter, the blockIds and the BlockFetchingListener) and starts it.
Otherwise, fetchBlocks requests the BlockFetchStarter to createAndStart (with the blockIds and the BlockFetchingListener).
In case of any Exception, fetchBlocks prints out the following ERROR message to the logs and the given BlockFetchingListener gets notified.
Exception while beginning fetchBlocks
Uploading Block¶
uploadBlock(
hostname: String,
port: Int,
execId: String,
blockId: BlockId,
blockData: ManagedBuffer,
level: StorageLevel,
classTag: ClassTag[_]): Future[Unit]
uploadBlock is part of the BlockTransferService abstraction.
uploadBlock creates a TransportClient (with the given hostname and port).
uploadBlock serializes the given StorageLevel and ClassTag (using a JavaSerializer).
uploadBlock uses a stream to transfer shuffle blocks when one of the following holds:
- The size of the block data (
ManagedBuffer) is above spark.network.maxRemoteBlockSizeFetchToMem configuration property - The given BlockId is a shuffle block
For stream transfer uploadBlock requests the TransportClient to uploadStream. Otherwise, uploadBlock requests the TransportClient to sendRpc a UploadBlock message.
Note
UploadBlock message is processed by NettyBlockRpcServer.
With the upload successful, uploadBlock prints out the following TRACE message to the logs:
Successfully uploaded block [blockId] [as stream]
With the upload failed, uploadBlock prints out the following ERROR message to the logs:
Error while uploading block [blockId] [as stream]
Logging¶
Enable ALL logging level for org.apache.spark.network.netty.NettyBlockTransferService logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL
Refer to Logging.