Skip to content

RDD — Description of Distributed Computation

RDD[T] is an abstraction of fault-tolerant resilient distributed datasets that are mere descriptions of computations over a distributed collection of records (of type T).

Contract

Computing Partition

compute(
  split: Partition,
  context: TaskContext): Iterator[T]

Computes the input Partition (with the TaskContext) to produce values (of type T).

Used when:

Partitions

getPartitions: Array[Partition]

Used when:

Implementations

Creating Instance

RDD takes the following to be created:

Abstract Class

RDD is an abstract class and cannot be created directly. It is created indirectly for the concrete RDDs.

partitions

partitions: Array[Partition]

partitions...FIXME

partitions is used when:

Recursive Dependencies

toDebugString: String

toDebugString...FIXME

doCheckpoint

doCheckpoint(): Unit

doCheckpoint...FIXME

doCheckpoint is used when:

iterator

iterator(
  split: Partition,
  context: TaskContext): Iterator[T]

iterator...FIXME

Final Method

iterator is a final method and may not be overridden in subclasses. See 5.2.6 final in the Scala Language Specification.

getOrCompute

getOrCompute(
  partition: Partition,
  context: TaskContext): Iterator[T]

getOrCompute...FIXME

computeOrReadCheckpoint

computeOrReadCheckpoint(
  split: Partition,
  context: TaskContext): Iterator[T]

computeOrReadCheckpoint...FIXME

Implicit Methods

rddToOrderedRDDFunctions

rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](
  rdd: RDD[(K, V)]): OrderedRDDFunctions[K, V, (K, V)]

rddToOrderedRDDFunctions is an Scala implicit method that creates an OrderedRDDFunctions.

rddToOrderedRDDFunctions is used (implicitly) when:

Review Me

== [[extensions]][[implementations]] (Subset of) Available RDDs

[cols="30,70",options="header",width="100%"] |=== | RDD | Description

| CoGroupedRDD | [[CoGroupedRDD]]

| CoalescedRDD | [[CoalescedRDD]] Result of spark-rdd-partitions.md#repartition[repartition] or spark-rdd-partitions.md#coalesce[coalesce] transformations

| HadoopRDD.md[HadoopRDD] | [[HadoopRDD]] Allows for reading data stored in HDFS using the older MapReduce API. The most notable use case is the return RDD of SparkContext.textFile.

| MapPartitionsRDD.md[MapPartitionsRDD] | [[MapPartitionsRDD]] Result of calling map-like operations (e.g. map, flatMap, filter, spark-rdd-transformations.md#mapPartitions[mapPartitions])

| ParallelCollectionRDD.md[ParallelCollectionRDD] | [[ParallelCollectionRDD]]

| ShuffledRDD.md[ShuffledRDD] | [[ShuffledRDD]] Result of "shuffle" operators (e.g. spark-rdd-partitions.md#repartition[repartition] or spark-rdd-partitions.md#coalesce[coalesce])

|===

== [[storageLevel]][[getStorageLevel]] StorageLevel

RDD can have a storage:StorageLevel.md[StorageLevel] specified. The default StorageLevel is storage:StorageLevel.md#NONE[NONE].

storageLevel can be specified using <> method.

storageLevel becomes NONE again after <>.

The current StorageLevel is available using getStorageLevel method.

[source, scala]

getStorageLevel: StorageLevel

== [[id]] Unique Identifier

[source, scala]

id: Int

id is an unique identifier (aka RDD ID) in the given <<_sc, SparkContext>>.

id requests the <> for SparkContext.md#newRddId[newRddId] right when RDD is created.

== [[isBarrier_]][[isBarrier]] Barrier Stage

An RDD can be part of a spark-barrier-execution-mode.md#barrier-stage[barrier stage]. By default, isBarrier flag is enabled (true) when:

. There are no ShuffleDependencies among the <>

. There is at least one parent RDD that has the flag enabled

ShuffledRDD.md[ShuffledRDD] has the flag always disabled.

MapPartitionsRDD.md[MapPartitionsRDD] is the only one RDD that can have the flag enabled.

== [[getOrCompute]] Getting Or Computing RDD Partition

[source, scala]

getOrCompute( partition: Partition, context: TaskContext): Iterator[T]


getOrCompute creates a storage:BlockId.md#RDDBlockId[RDDBlockId] for the <> and the partition index.

getOrCompute requests the BlockManager to storage:BlockManager.md#getOrElseUpdate[getOrElseUpdate] for the block ID (with the <> and the makeIterator function).

NOTE: getOrCompute uses core:SparkEnv.md#get[SparkEnv] to access the current core:SparkEnv.md#blockManager[BlockManager].

[[getOrCompute-readCachedBlock]] getOrCompute records whether...FIXME (readCachedBlock)

getOrCompute branches off per the response from the storage:BlockManager.md#getOrElseUpdate[BlockManager] and whether the internal readCachedBlock flag is now on or still off. In either case, getOrCompute creates an spark-InterruptibleIterator.md[InterruptibleIterator].

NOTE: spark-InterruptibleIterator.md[InterruptibleIterator] simply delegates to a wrapped internal Iterator, but allows for task killing functionality.

For a BlockResult available and readCachedBlock flag on, getOrCompute...FIXME

For a BlockResult available and readCachedBlock flag off, getOrCompute...FIXME

NOTE: The BlockResult could be found in a local block manager or fetched from a remote block manager. It may also have been stored (persisted) just now. In either case, the BlockResult is available (and storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Left value with the BlockResult).

For Right(iter) (regardless of the value of readCachedBlock flag since...FIXME), getOrCompute...FIXME

NOTE: storage:BlockManager.md#getOrElseUpdate[BlockManager.getOrElseUpdate] gives a Right(iter) value to indicate an error with a block.

NOTE: getOrCompute is used on Spark executors.

NOTE: getOrCompute is used exclusively when RDD is requested for the <>.

== [[dependencies]] RDD Dependencies

[source, scala]

dependencies: Seq[Dependency[_]]

dependencies returns the dependencies of a RDD.

NOTE: dependencies is a final method that no class in Spark can ever override.

Internally, dependencies checks out whether the RDD is checkpointed and acts accordingly.

For a RDD being checkpointed, dependencies returns a single-element collection with a OneToOneDependency.

For a non-checkpointed RDD, dependencies collection is computed using <getDependencies method>>.

NOTE: getDependencies method is an abstract method that custom RDDs are required to provide.

== [[checkpointRDD]] Getting CheckpointRDD

[source, scala]

checkpoint Option[CheckpointRDD[T]]

checkpointRDD gives the CheckpointRDD from the <> internal registry if available (if the RDD was checkpointed).

checkpointRDD is used when RDD is requested for the <>, <> and <>.

== [[isCheckpointedAndMaterialized]] isCheckpointedAndMaterialized Method

[source, scala]

isCheckpointedAndMaterialized: Boolean

isCheckpointedAndMaterialized...FIXME

isCheckpointedAndMaterialized is used when RDD is requested to <>, <> and <>.

== [[getNarrowAncestors]] getNarrowAncestors Method

[source, scala]

getNarrowAncestors: Seq[RDD[_]]

getNarrowAncestors...FIXME

getNarrowAncestors is used when StageInfo is requested to fromStage.

== [[persist]] Persisting RDD

[source, scala]

persist(): this.type persist( newLevel: StorageLevel): this.type


Refer to spark-rdd-caching.md#persist[Persisting RDD].

== [[persist-internal]] persist Internal Method

[source, scala]

persist( newLevel: StorageLevel, allowOverride: Boolean): this.type


persist...FIXME

persist (private) is used when RDD is requested to <> and <>.

== [[computeOrReadCheckpoint]] Computing Partition or Reading From Checkpoint

[source, scala]

computeOrReadCheckpoint( split: Partition, context: TaskContext): Iterator[T]


computeOrReadCheckpoint reads split partition from a checkpoint (<>) or <> yourself.

computeOrReadCheckpoint is used when RDD is requested to <> or <>.

== [[preferredLocations]] Defining Placement Preferences of RDD Partition

[source, scala]

preferredLocations( split: Partition): Seq[String]


preferredLocations requests the CheckpointRDD for <> (if the RDD is checkpointed) or <>.

preferredLocations is a template method that uses <> that custom RDDs can override to specify placement preferences for a partition. getPreferredLocations defines no placement preferences by default.

preferredLocations is mainly used when DAGScheduler is requested to scheduler:DAGScheduler.md#getPreferredLocs[compute the preferred locations for missing partitions].

== [[partitions]] Accessing RDD Partitions

[source, scala]

partitions: Array[Partition]

partitions returns the spark-rdd-partitions.md[Partitions] of a RDD.

partitions requests CheckpointRDD for the <> (if the RDD is checkpointed) or <> and cache (in <> internal registry that is used next time).

Partitions have the property that their internal index should be equal to their position in the owning RDD.

== [[checkpoint]] Reliable Checkpointing -- checkpoint Method

[source, scala]

checkpoint(): Unit

checkpoint...FIXME

checkpoint is used when...FIXME

Back to top