Skip to content

TorrentBroadcast

TorrentBroadcast is a Broadcast that uses a BitTorrent-like protocol for broadcast blocks distribution.

TorrentBroadcast -- Broadcasting using BitTorrent

Creating Instance

TorrentBroadcast takes the following to be created:

  • Broadcast Value (of type T)
  • Identifier

TorrentBroadcast is created when:

BroadcastBlockId

TorrentBroadcast creates a BroadcastBlockId (with the id) when created

Number of Block Chunks

TorrentBroadcast uses numBlocks for the number of blocks of a broadcast variable (that was blockified into when created).

Transient Lazy Broadcast Value

_value: T

TorrentBroadcast uses _value transient registry for the value that is computed on demand (and cached afterwards).

_value is a @transient private lazy val and uses the following Scala language features:

  1. It is not serialized when the TorrentBroadcast is serialized to be sent over the wire to executors (and has to be re-computed afterwards)
  2. It is lazily instantiated when first requested and cached afterwards

Value

getValue(): T

getValue uses the _value transient registry for the value if available (non-null).

Otherwise, getValue reads the broadcast block (from the local BroadcastManager, BlockManager or falls back to readBlocks).

getValue saves the object in the _value registry.


getValue is part of the Broadcast abstraction.

Reading Broadcast Block

readBroadcastBlock(): T

readBroadcastBlock looks up the BroadcastBlockId in (the cache of) BroadcastManager and returns the value if found.

Otherwise, readBroadcastBlock setConf and requests the BlockManager for the locally-stored broadcast data.

If the broadcast block is found locally, readBroadcastBlock requests the BroadcastManager to cache it and returns the value.

If not found locally, readBroadcastBlock multiplies the numBlocks by the blockSize for an estimated size of the broadcast block. readBroadcastBlock prints out the following INFO message to the logs:

Started reading broadcast variable [id] with [numBlocks] pieces
(estimated total size [estimatedTotalSize])

readBroadcastBlock readBlocks and prints out the following INFO message to the logs:

Reading broadcast variable [id] took [time] ms

readBroadcastBlock unblockifies the block chunks into an object (using the Serializer and the CompressionCodec).

readBroadcastBlock requests the BlockManager to store the merged copy (so other tasks on this executor don't need to re-fetch it). readBroadcastBlock uses MEMORY_AND_DISK storage level and the tellMaster flag off.

readBroadcastBlock requests the BroadcastManager to cache it and returns the value.

Unblockifying Broadcast Value

unBlockifyObject(
  blocks: Array[InputStream],
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): T

unBlockifyObject...FIXME

Reading Broadcast Block Chunks

readBlocks(): Array[BlockData]

readBlocks creates a collection of BlockDatas for numBlocks block chunks.

For every block (randomly-chosen by block ID between 0 and numBlocks), readBlocks creates a BroadcastBlockId for the id (of the broadcast variable) and the chunk (identified by the piece prefix followed by the ID).

readBlocks prints out the following DEBUG message to the logs:

Reading piece [pieceId] of [broadcastId]

readBlocks first tries to look up the piece locally by requesting the BlockManager to getLocalBytes and, if found, stores the reference in the local block array (for the piece ID).

If not found in the local BlockManager, readBlocks requests the BlockManager to getRemoteBytes.

With checksumEnabled, readBlocks...FIXME

readBlocks requests the BlockManager to store the chunk (so other tasks on this executor don't need to re-fetch it) using MEMORY_AND_DISK_SER storage level and reporting to the driver (so other executors can pull these chunks from this executor as well).

readBlocks creates a ByteBufferBlockData for the chunk (and stores it in the blocks array).


readBlocks throws a SparkException for blocks neither available locally nor remotely:

Failed to get [pieceId] of [broadcastId]

CompressionCodec

compressionCodec: Option[CompressionCodec]

TorrentBroadcast uses the spark.broadcast.compress configuration property for the CompressionCodec to use for writeBlocks and readBroadcastBlock.

Broadcast Block Chunk Size

TorrentBroadcast uses the spark.broadcast.blockSize configuration property for the size of the chunks (pieces) of a broadcast block.

TorrentBroadcast uses the size for writeBlocks and readBroadcastBlock.

Persisting Broadcast (to BlockManager)

writeBlocks(
  value: T): Int

writeBlocks returns the number of blocks (chunks) this broadcast variable (was blockified into).

The whole broadcast value is stored in the local BlockManager with MEMORY_AND_DISK storage level while the block chunks with MEMORY_AND_DISK_SER storage level.

writeBlocks is used when:

  • TorrentBroadcast is created (that happens on the driver only)

writeBlocks requests the BlockManager to store the given broadcast value (to be identified as the broadcastId and with the MEMORY_AND_DISK storage level).

writeBlocks blockify the object (into chunks of the block size, the Serializer, and the optional compressionCodec).

With checksumEnabled writeBlocks...FIXME

For every block, writeBlocks creates a BroadcastBlockId for the id and piece[index] identifier, and requests the BlockManager to store the chunk bytes (with MEMORY_AND_DISK_SER storage level and reporting to the driver).

Blockifying Broadcast Variable

blockifyObject(
  obj: T,
  blockSize: Int,
  serializer: Serializer,
  compressionCodec: Option[CompressionCodec]): Array[ByteBuffer]

blockifyObject divides (blockifies) the input obj broadcast value into blocks (ByteBuffer chunks). blockifyObject uses the given Serializer to write the value in a serialized format to a ChunkedByteBufferOutputStream of the given blockSize size with the optional CompressionCodec.

Error Handling

In case of any error, writeBlocks prints out the following ERROR message to the logs and requests the local BlockManager to remove the broadcast.

Store broadcast [broadcastId] fail, remove all pieces of the broadcast

In case of an error while storing the value itself, writeBlocks throws a SparkException:

Failed to store [broadcastId] in BlockManager

In case of an error while storing the chunks of the blockified value, writeBlocks throws a SparkException:

Failed to store [pieceId] of [broadcastId] in local BlockManager

Destroying Variable

doDestroy(
  blocking: Boolean): Unit

doDestroy removes the persisted state (associated with the broadcast variable) on all the nodes in a Spark application (the driver and executors).

doDestroy is part of the Broadcast abstraction.

Unpersisting Variable

doUnpersist(
  blocking: Boolean): Unit

doUnpersist removes the persisted state (associated with the broadcast variable) on executors only.

doUnpersist is part of the Broadcast abstraction.

Removing Persisted State (Broadcast Blocks) of Broadcast Variable

unpersist(
  id: Long,
  removeFromDriver: Boolean,
  blocking: Boolean): Unit

unpersist prints out the following DEBUG message to the logs:

Unpersisting TorrentBroadcast [id]

In the end, unpersist requests the BlockManagerMaster to remove the blocks of the given broadcast.

unpersist is used when:

setConf

setConf(
  conf: SparkConf): Unit

setConf uses the given SparkConf to initialize the compressionCodec, the blockSize and the checksumEnabled.

setConf is used when:

Logging

Enable ALL logging level for org.apache.spark.broadcast.TorrentBroadcast logger to see what happens inside.

Add the following line to conf/log4j.properties:

log4j.logger.org.apache.spark.broadcast.TorrentBroadcast=ALL

Refer to Logging.

Back to top