Skip to content

NettyBlockTransferService

NettyBlockTransferService is a BlockTransferService that uses Netty for uploading and fetching blocks of data.

NettyBlockTransferService, SparkEnv and BlockManager

Creating Instance

NettyBlockTransferService takes the following to be created:

NettyBlockTransferService is created when:

Initializing

init(
  blockDataManager: BlockDataManager): Unit

init is part of the BlockTransferService abstraction.

init creates a NettyBlockRpcServer (with the application ID, a JavaSerializer and the given BlockDataManager).

init creates a TransportContext (with the NettyBlockRpcServer just created) and requests it for a TransportClientFactory.

init createServer.

In the end, init prints out the following INFO message to the logs:

Server created on [hostName]:[port]

Fetching Blocks

fetchBlocks(
  host: String,
  port: Int,
  execId: String,
  blockIds: Array[String],
  listener: BlockFetchingListener,
  tempFileManager: DownloadFileManager): Unit

fetchBlocks is part of the BlockStoreClient abstraction.

fetchBlocks prints out the following TRACE message to the logs:

Fetch blocks from [host]:[port] (executor id [execId])

fetchBlocks creates a BlockFetchStarter.

fetchBlocks requests the TransportConf for the maxIORetries.

With the maxIORetries above zero, fetchBlocks creates a RetryingBlockFetcher (with the BlockFetchStarter, the blockIds and the BlockFetchingListener) and starts it.

Otherwise, fetchBlocks requests the BlockFetchStarter to createAndStart (with the blockIds and the BlockFetchingListener).

In case of any Exception, fetchBlocks prints out the following ERROR message to the logs and the given BlockFetchingListener gets notified.

Exception while beginning fetchBlocks

Uploading Block

uploadBlock(
  hostname: String,
  port: Int,
  execId: String,
  blockId: BlockId,
  blockData: ManagedBuffer,
  level: StorageLevel,
  classTag: ClassTag[_]): Future[Unit]

uploadBlock is part of the BlockTransferService abstraction.

uploadBlock creates a TransportClient (with the given hostname and port).

uploadBlock serializes the given StorageLevel and ClassTag (using a JavaSerializer).

uploadBlock uses a stream to transfer shuffle blocks when one of the following holds:

  1. The size of the block data (ManagedBuffer) is above spark.network.maxRemoteBlockSizeFetchToMem configuration property
  2. The given BlockId is a shuffle block

For stream transfer uploadBlock requests the TransportClient to uploadStream. Otherwise, uploadBlock requests the TransportClient to sendRpc a UploadBlock message.

Note

UploadBlock message is processed by NettyBlockRpcServer.

With the upload successful, uploadBlock prints out the following TRACE message to the logs:

Successfully uploaded block [blockId] [as stream]

With the upload failed, uploadBlock prints out the following ERROR message to the logs:

Error while uploading block [blockId] [as stream]

Logging

Enable ALL logging level for org.apache.spark.network.netty.NettyBlockTransferService logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.network.netty.NettyBlockTransferService=ALL

Refer to Logging.

Back to top