SparkContext¶
SparkContext is the entry point to all of the components of Apache Spark (execution engine) and so the heart of a Spark application. In fact, you can consider an application a Spark application only when it uses a SparkContext (directly or indirectly).

Important
There should be one active SparkContext per JVM and Spark developers should use SparkContext.getOrCreate utility for sharing it (e.g. across threads).
Creating Instance¶
SparkContext takes the following to be created:
SparkContext is created (directly or indirectly using getOrCreate utility).
While being created, SparkContext sets up core services and establishes a connection to a Spark execution environment.
Local Properties¶
localProperties: InheritableThreadLocal[Properties]
SparkContext uses an InheritableThreadLocal (Java) of key-value pairs of thread-local properties to pass extra information from a parent thread (on the driver) to child threads.
localProperties is meant to be used by developers using SparkContext.setLocalProperty and SparkContext.getLocalProperty.
Local Properties are available using TaskContext.getLocalProperty.
Local Properties are available to SparkListeners using the following events:
localProperties are passed down when SparkContext is requested for the following:
- Running Job (that in turn makes the local properties available to the DAGScheduler to run a job)
- Running Approximate Job
- Submitting Job
- Submitting MapStage
DAGScheduler passes down local properties when scheduling:
Spark (Core) defines the following local properties.
| Name | Default Value | Setter |
|---|---|---|
callSite.long | ||
callSite.short | SparkContext.setCallSite | |
spark.job.description | callSite.short | SparkContext.setJobDescription ( SparkContext.setJobGroup) |
spark.job.interruptOnCancel | SparkContext.setJobGroup | |
spark.jobGroup.id | SparkContext.setJobGroup | |
spark.scheduler.pool |
Services¶
-
ExecutorAllocationManager (optional)
-
others
ShuffleDriverComponents¶
SparkContext creates a ShuffleDriverComponents when created.
SparkContext loads the ShuffleDataIO that is in turn requested for the ShuffleDriverComponents. SparkContext requests the ShuffleDriverComponents to initialize.
The ShuffleDriverComponents is used when:
ShuffleDependencyis createdSparkContextcreates the ContextCleaner (if enabled)
SparkContext requests the ShuffleDriverComponents to clean up when stopping.
Static Files¶
addFile¶
addFile(
path: String,
recursive: Boolean): Unit
// recursive = false
addFile(
path: String): Unit
Firstly, addFile validate the schema of given path. For a no-schema path, addFile converts it to a canonical form. For a local schema path, addFile prints out the following WARN message to the logs and exits.
File with 'local' scheme is not supported to add to file server, since it is already available on every node.
addFile creates a Hadoop Path from the given path. addFile Will validate the URL if the path is an HTTP, HTTPS or FTP URI.
addFile Will throw SparkException with below message if path is local directories but not in local mode.
addFile does not support local directories when not running local mode.
addFile Will throw SparkException with below message if path is directories but not turn on recursive flag.
Added file $hadoopPath is a directory and recursive is not turned on.
In the end, addFile adds the file to the addedFiles internal registry (with the current timestamp):
-
For new files,
addFileprints out the following INFO message to the logs, fetches the file (to the root directory and without using the cache) and postEnvironmentUpdate.Added file [path] at [key] with timestamp [timestamp] -
For files that were already added,
addFileprints out the following WARN message to the logs:The path [path] has been added already. Overwriting of added paths is not supported in the current version.
addFile is used when:
SparkContextis created
listFiles¶
listFiles(): Seq[String]
listFiles is the files added.
addedFiles Internal Registry¶
addedFiles: Map[String, Long]
addedFiles is a collection of static files by the timestamp the were added at.
addedFiles is used when:
SparkContextis requested to postEnvironmentUpdate and listFilesTaskSetManageris created (and resourceOffer)
files¶
files: Seq[String]
files is a collection of file paths defined by spark.files configuration property.
Posting SparkListenerEnvironmentUpdate Event¶
postEnvironmentUpdate(): Unit
postEnvironmentUpdate...FIXME
postEnvironmentUpdate is used when:
getOrCreate Utility¶
getOrCreate(): SparkContext
getOrCreate(
config: SparkConf): SparkContext
getOrCreate...FIXME
PluginContainer¶
SparkContext creates a PluginContainer when created.
PluginContainer is created (for the driver where SparkContext lives) using PluginContainer.apply utility.
PluginContainer is then requested to registerMetrics with the applicationId.
PluginContainer is requested to shutdown when SparkContext is requested to stop.
Creating SchedulerBackend and TaskScheduler¶
createTaskScheduler(
sc: SparkContext,
master: String,
deployMode: String): (SchedulerBackend, TaskScheduler)
createTaskScheduler creates a SchedulerBackend and a TaskScheduler for the given master URL and deployment mode.

Internally, createTaskScheduler branches off per the given master URL (master URL) to select the requested implementations.
createTaskScheduler accepts the following master URLs:
local- local mode with 1 thread onlylocal[n]orlocal[*]- local mode withnthreadslocal[n, m]orlocal[*, m]-- local mode withnthreads andmnumber of failuresspark://hostname:portfor Spark Standalonelocal-cluster[n, m, z]-- local cluster withnworkers,mcores per worker, andzmemory per worker- Other URLs are simply handed over to getClusterManager to load an external cluster manager if available
createTaskScheduler is used when SparkContext is created.
Loading ExternalClusterManager¶
getClusterManager(
url: String): Option[ExternalClusterManager]
getClusterManager uses Java's ServiceLoader to find and load an ExternalClusterManager that supports the given master URL.
ExternalClusterManager Service Discovery
For ServiceLoader to find ExternalClusterManagers, they have to be registered using the following file:
META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
getClusterManager throws a SparkException when multiple cluster managers were found:
Multiple external cluster managers registered for the url [url]: [serviceLoaders]
getClusterManager is used when SparkContext is requested for a SchedulerBackend and TaskScheduler.
Running Job Synchronously¶
runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U): Array[U]
runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int]): Array[U]
runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U): Array[U]
runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit): Unit
runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int]): Array[U]

runJob finds the call site and cleans up the given func function.
runJob prints out the following INFO message to the logs:
Starting job: [callSite]
With spark.logLineage enabled, runJob requests the given RDD for the recursive dependencies and prints out the following INFO message to the logs:
RDD's recursive dependencies:
[toDebugString]
runJob requests the DAGScheduler to run a job.
runJob requests the ConsoleProgressBar to finishAll if defined.
In the end, runJob requests the given RDD to doCheckpoint.
runJob throws an IllegalStateException when SparkContext is stopped:
SparkContext has been shutdown
Demo¶
runJob is essentially executing a func function on all or a subset of partitions of an RDD and returning the result as an array (with elements being the results per partition).
sc.setLocalProperty("callSite.short", "runJob Demo")
val partitionsNumber = 4
val rdd = sc.parallelize(
Seq("hello world", "nice to see you"),
numSlices = partitionsNumber)
import org.apache.spark.TaskContext
val func = (t: TaskContext, ss: Iterator[String]) => 1
val result = sc.runJob(rdd, func)
assert(result.length == partitionsNumber)
sc.clearCallSite()
Call Site¶
getCallSite(): CallSite
getCallSite...FIXME
getCallSite is used when:
SparkContextis requested to broadcast, runJob, runApproximateJob, submitJob and submitMapStageAsyncRDDActionsis requested to takeAsyncRDDis created
Closure Cleaning¶
clean(
f: F,
checkSerializable: Boolean = true): F
clean cleans up the given f closure (using ClosureCleaner.clean utility).
Tip
Enable DEBUG logging level for org.apache.spark.util.ClosureCleaner logger to see what happens inside the class.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.util.ClosureCleaner=DEBUG
Refer to Logging.
With DEBUG logging level you should see the following messages in the logs:
+++ Cleaning closure [func] ([func.getClass.getName]) +++
+ declared fields: [declaredFields.size]
[field]
...
+++ closure [func] ([func.getClass.getName]) is now cleaned +++
Logging¶
Enable ALL logging level for org.apache.spark.SparkContext logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.SparkContext=ALL
Refer to Logging.
To Be Reviewed¶
SparkContext offers the following functions:
-
Getting current status of a Spark application ** <
> ** < > ** < > ** < > ** < > ** < > ** < > that specifies the number of spark-rdd-partitions.md[partitions] in RDDs when they are created without specifying the number explicitly by a user. ** < > ** < > ** < > ** < > ** < > -
Setting Configuration ** <
> ** Local Properties ** < > ** < > -
Creating Distributed Entities ** <
> ** < > ** < > -
Accessing services, e.g. <
>, < >, scheduler:LiveListenerBus.md[], storage:BlockManager.md[BlockManager], scheduler:SchedulerBackend.md[SchedulerBackends], shuffle:ShuffleManager.md[ShuffleManager] and the < >. -
<
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
> - <
>
TIP: Read the scaladoc of http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext[org.apache.spark.SparkContext].
Removing RDD Blocks from BlockManagerMaster¶
unpersistRDD(
rddId: Int,
blocking: Boolean = true): Unit
unpersistRDD requests BlockManagerMaster to storage:BlockManagerMaster.md#removeRdd[remove the blocks for the RDD] (given rddId).
NOTE: unpersistRDD uses SparkEnv core:SparkEnv.md#blockManager[to access the current BlockManager] that is in turn used to storage:BlockManager.md#master[access the current BlockManagerMaster].
unpersistRDD removes rddId from <
In the end, unpersistRDD posts a SparkListener.md#SparkListenerUnpersistRDD[SparkListenerUnpersistRDD] (with rddId) to <
[NOTE]¶
unpersistRDD is used when:
ContextCleanerdoes core:ContextCleaner.md#doCleanupRDD[doCleanupRDD]-
SparkContext <
> (i.e. marks an RDD as non-persistent)¶
== [[applicationId]] Unique Identifier of Spark Application -- applicationId Method
CAUTION: FIXME
== [[postApplicationStart]] postApplicationStart Internal Method
[source, scala]¶
postApplicationStart(): Unit¶
postApplicationStart...FIXME
postApplicationStart is used exclusively when SparkContext is created.
== [[postApplicationEnd]] postApplicationEnd Method
CAUTION: FIXME
== [[clearActiveContext]] clearActiveContext Method
CAUTION: FIXME
== [[getPersistentRDDs]] Accessing persistent RDDs -- getPersistentRDDs Method
[source, scala]¶
getPersistentRDDs: Map[Int, RDD[_]]¶
getPersistentRDDs returns the collection of RDDs that have marked themselves as persistent via spark-rdd-caching.md#cache[cache].
Internally, getPersistentRDDs returns <
== [[cancelJob]] Cancelling Job -- cancelJob Method
[source, scala]¶
cancelJob(jobId: Int)¶
cancelJob requests DAGScheduler scheduler:DAGScheduler.md#cancelJob[to cancel a Spark job].
== [[cancelStage]] Cancelling Stage -- cancelStage Methods
[source, scala]¶
cancelStage(stageId: Int): Unit cancelStage(stageId: Int, reason: String): Unit
cancelStage simply requests DAGScheduler scheduler:DAGScheduler.md#cancelJob[to cancel a Spark stage] (with an optional reason).
NOTE: cancelStage is used when StagesTab spark-webui-StagesTab.md#handleKillRequest[handles a kill request] (from a user in web UI).
Programmable Dynamic Allocation¶
SparkContext offers the following methods as the developer API for Dynamic Allocation of Executors:
- <
> - <
> - <
> - (private!) <
>
=== [[requestExecutors]] Requesting New Executors -- requestExecutors Method
[source, scala]¶
requestExecutors(numAdditionalExecutors: Int): Boolean¶
requestExecutors requests numAdditionalExecutors executors from scheduler:CoarseGrainedSchedulerBackend.md[CoarseGrainedSchedulerBackend].
=== [[killExecutors]] Requesting to Kill Executors -- killExecutors Method
[source, scala]¶
killExecutors(executorIds: Seq[String]): Boolean¶
CAUTION: FIXME
=== [[requestTotalExecutors]] Requesting Total Executors -- requestTotalExecutors Method
[source, scala]¶
requestTotalExecutors( numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]): Boolean
requestTotalExecutors is a private[spark] method that scheduler:CoarseGrainedSchedulerBackend.md#requestTotalExecutors[requests the exact number of executors from a coarse-grained scheduler backend].
NOTE: It works for scheduler:CoarseGrainedSchedulerBackend.md[coarse-grained scheduler backends] only.
When called for other scheduler backends you should see the following WARN message in the logs:
Requesting executors is only supported in coarse-grained mode
Executor IDs¶
getExecutorIds is a private[spark] method that is part of ExecutorAllocationClient contract. It simply passes the call on to the current coarse-grained scheduler backend, i.e. calls getExecutorIds.
Important
It works for coarse-grained scheduler backends only.
When called for other scheduler backends you should see the following WARN message in the logs:
Requesting executors is only supported in coarse-grained mode
CAUTION: FIXME Why does SparkContext implement the method for coarse-grained scheduler backends? Why doesn't SparkContext throw an exception when the method is called? Nobody seems to be using it (!)
=== [[getOrCreate]] Getting Existing or Creating New SparkContext -- getOrCreate Methods
[source, scala]¶
getOrCreate(): SparkContext getOrCreate(conf: SparkConf): SparkContext
getOrCreate methods allow you to get the existing SparkContext or create a new one.
[source, scala]¶
import org.apache.spark.SparkContext val sc = SparkContext.getOrCreate()
// Using an explicit SparkConf object import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App") val sc = SparkContext.getOrCreate(conf)
The no-param getOrCreate method requires that the two mandatory Spark settings - <
=== [[constructors]] Constructors
[source, scala]¶
SparkContext() SparkContext(conf: SparkConf) SparkContext(master: String, appName: String, conf: SparkConf) SparkContext( master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
You can create a SparkContext instance using the four constructors.
[source, scala]¶
import org.apache.spark.SparkConf val conf = new SparkConf() .setMaster("local[*]") .setAppName("SparkMe App")
import org.apache.spark.SparkContext val sc = new SparkContext(conf)
When a Spark context starts up you should see the following INFO in the logs (amongst the other messages that come from the Spark services):
Running Spark version 2.0.0-SNAPSHOT
NOTE: Only one SparkContext may be running in a single JVM (check out https://issues.apache.org/jira/browse/SPARK-2243[SPARK-2243 Support multiple SparkContexts in the same JVM]). Sharing access to a SparkContext in the JVM is the solution to share data within Spark (without relying on other means of data sharing using external data stores).
== [[env]] Accessing Current SparkEnv -- env Method
CAUTION: FIXME
== [[getConf]] Getting Current SparkConf -- getConf Method
[source, scala]¶
getConf: SparkConf¶
getConf returns the current SparkConf.md[SparkConf].
NOTE: Changing the SparkConf object does not change the current configuration (as the method returns a copy).
== [[master]][[master-url]] Deployment Environment -- master Method
[source, scala]¶
master: String¶
master method returns the current value of configuration-properties.md#spark.master[spark.master] which is the spark-deployment-environments.md[deployment environment] in use.
== [[appName]] Application Name -- appName Method
[source, scala]¶
appName: String¶
appName gives the value of the mandatory SparkConf.md#spark.app.name[spark.app.name] setting.
NOTE: appName is used when spark-standalone.md#SparkDeploySchedulerBackend[SparkDeploySchedulerBackend starts], spark-webui-SparkUI.md#createLiveUI[SparkUI creates a web UI], when postApplicationStart is executed, and for Mesos and checkpointing in Spark Streaming.
== [[applicationAttemptId]] Unique Identifier of Execution Attempt -- applicationAttemptId Method
[source, scala]¶
applicationAttemptId: Option[String]¶
applicationAttemptId gives the unique identifier of the execution attempt of a Spark application.
[NOTE]¶
applicationAttemptId is used when:
- scheduler:ShuffleMapTask.md#creating-instance[ShuffleMapTask] and scheduler:ResultTask.md#creating-instance[ResultTask] are created
* SparkContext <>¶
== [[getExecutorStorageStatus]] Storage Status (of All BlockManagers) -- getExecutorStorageStatus Method
[source, scala]¶
getExecutorStorageStatus: Array[StorageStatus]¶
getExecutorStorageStatus storage:BlockManagerMaster.md#getStorageStatus[requests BlockManagerMaster for storage status] (of all storage:BlockManager.md[BlockManagers]).
NOTE: getExecutorStorageStatus is a developer API.
getExecutorStorageStatus is used when:
-
SparkContextis requested for storage status of cached RDDs -
SparkStatusTrackeris requested for known executors
== [[deployMode]] Deploy Mode -- deployMode Method
[source,scala]¶
deployMode: String¶
deployMode returns the current value of spark-deploy-mode.md[spark.submit.deployMode] setting or client if not set.
== [[getSchedulingMode]] Scheduling Mode -- getSchedulingMode Method
[source, scala]¶
getSchedulingMode: SchedulingMode.SchedulingMode¶
getSchedulingMode returns the current spark-scheduler-SchedulingMode.md[Scheduling Mode].
== [[getPoolForName]] Schedulable (Pool) by Name -- getPoolForName Method
[source, scala]¶
getPoolForName(pool: String): Option[Schedulable]¶
getPoolForName returns a spark-scheduler-Schedulable.md[Schedulable] by the pool name, if one exists.
NOTE: getPoolForName is part of the Developer's API and may change in the future.
Internally, it requests the scheduler:TaskScheduler.md#rootPool[TaskScheduler for the root pool] and spark-scheduler-Pool.md#schedulableNameToSchedulable[looks up the Schedulable by the pool name].
It is exclusively used to spark-webui-PoolPage.md[show pool details in web UI (for a stage)].
== [[getAllPools]] All Schedulable Pools -- getAllPools Method
[source, scala]¶
getAllPools: Seq[Schedulable]¶
getAllPools collects the spark-scheduler-Pool.md[Pools] in scheduler:TaskScheduler.md#contract[TaskScheduler.rootPool].
NOTE: TaskScheduler.rootPool is part of the scheduler:TaskScheduler.md#contract[TaskScheduler Contract].
NOTE: getAllPools is part of the Developer's API.
CAUTION: FIXME Where is the method used?
NOTE: getAllPools is used to calculate pool names for spark-webui-AllStagesPage.md#pool-names[Stages tab in web UI] with FAIR scheduling mode used.
== [[defaultParallelism]] Default Level of Parallelism
[source, scala]¶
defaultParallelism: Int¶
defaultParallelism requests <
NOTE: Default level of parallelism specifies the number of spark-rdd-partitions.md[partitions] in RDDs when created without specifying them explicitly by a user.
[NOTE]¶
defaultParallelism is used in <SparkContext.range and <DStream.countByValue and DStream.countByValueAndWindow et al.).
defaultParallelism is also used to instantiate rdd:HashPartitioner.md[HashPartitioner] and for the minimum number of partitions in rdd:HadoopRDD.md[HadoopRDDs].¶
== [[taskScheduler]] Current Spark Scheduler (aka TaskScheduler) -- taskScheduler Property
[source, scala]¶
taskScheduler: TaskScheduler taskScheduler_=(ts: TaskScheduler): Unit
taskScheduler manages (i.e. reads or writes) <<_taskScheduler, _taskScheduler>> internal property.
== [[version]] Getting Spark Version -- version Property
[source, scala]¶
version: String¶
version returns the Spark version this SparkContext uses.
== [[makeRDD]] makeRDD Method
CAUTION: FIXME
== [[submitJob]] Submitting Jobs Asynchronously -- submitJob Method
[source, scala]¶
submitJobT, U, R: SimpleFutureAction[R]
submitJob submits a job in an asynchronous, non-blocking way to scheduler:DAGScheduler.md#submitJob[DAGScheduler].
It cleans the processPartition input function argument and returns an instance of spark-rdd-actions.md#FutureAction[SimpleFutureAction] that holds the JobWaiter instance.
CAUTION: FIXME What are resultFunc?
It is used in:
- spark-rdd-actions.md#AsyncRDDActions[AsyncRDDActions] methods
- spark-streaming/spark-streaming.md[Spark Streaming] for spark-streaming/spark-streaming-receivertracker.md#ReceiverTrackerEndpoint-startReceiver[ReceiverTrackerEndpoint.startReceiver]
== [[spark-configuration]] Spark Configuration
CAUTION: FIXME
== [[sparkcontext-and-rdd]] SparkContext and RDDs
You use a Spark context to create RDDs (see <
When an RDD is created, it belongs to and is completely owned by the Spark context it originated from. RDDs can't by design be shared between SparkContexts.
.A Spark context creates a living space for RDDs. image::diagrams/sparkcontext-rdds.png)
== [[creating-rdds]][[parallelize]] Creating RDD -- parallelize Method
SparkContext allows you to create many different RDDs from input sources like:
- Scala's collections, i.e.
sc.parallelize(0 to 100) - local or remote filesystems, i.e.
sc.textFile("README.md") - Any Hadoop
InputSourceusingsc.newAPIHadoopFile
Read rdd:index.md#creating-rdds[Creating RDDs] in rdd:index.md[RDD - Resilient Distributed Dataset].
== [[unpersist]] Unpersisting RDD (Marking RDD as Non-Persistent) -- unpersist Method
CAUTION: FIXME
unpersist removes an RDD from the master's storage:BlockManager.md[Block Manager] (calls removeRdd(rddId: Int, blocking: Boolean)) and the internal <
It finally posts SparkListener.md#SparkListenerUnpersistRDD[SparkListenerUnpersistRDD] message to listenerBus.
== [[setCheckpointDir]] Setting Checkpoint Directory -- setCheckpointDir Method
[source, scala]¶
setCheckpointDir(directory: String)¶
setCheckpointDir method is used to set up the checkpoint directory...FIXME
CAUTION: FIXME
== [[register]] Registering Accumulator -- register Methods
[source, scala]¶
register(acc: AccumulatorV2[, _]): Unit register(acc: AccumulatorV2[, _], name: String): Unit
register registers the acc accumulator. You can optionally give an accumulator a name.
TIP: You can create built-in accumulators for longs, doubles, and collection types using <
Internally, register registers acc accumulator (with the current SparkContext).
== [[creating-accumulators]][[longAccumulator]][[doubleAccumulator]][[collectionAccumulator]] Creating Built-In Accumulators
[source, scala]¶
longAccumulator: LongAccumulator longAccumulator(name: String): LongAccumulator doubleAccumulator: DoubleAccumulator doubleAccumulator(name: String): DoubleAccumulator collectionAccumulator[T]: CollectionAccumulator[T] collectionAccumulatorT: CollectionAccumulator[T]
You can use longAccumulator, doubleAccumulator or collectionAccumulator to create and register accumulators for simple and collection values.
longAccumulator returns LongAccumulator with the zero value 0.
doubleAccumulator returns DoubleAccumulator with the zero value 0.0.
collectionAccumulator returns CollectionAccumulator with the zero value java.util.List[T].
scala> val acc = sc.longAccumulator
acc: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: None, value: 0)
scala> val counter = sc.longAccumulator("counter")
counter: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 1, name: Some(counter), value: 0)
scala> counter.value
res0: Long = 0
scala> sc.parallelize(0 to 9).foreach(n => counter.add(n))
scala> counter.value
res3: Long = 45
The name input parameter allows you to give a name to an accumulator and have it displayed in spark-webui-StagePage.md#accumulators[Spark UI] (under Stages tab for a given stage).

Tip
You can register custom accumulators using register methods.
== [[broadcast]] Creating Broadcast Variable -- broadcast Method
[source, scala]¶
broadcastT: Broadcast[T]
broadcast method creates a Broadcast.md[]. It is a shared memory with value (as broadcast blocks) on the driver and later on all Spark executors.
[source,plaintext]¶
val sc: SparkContext = ??? scala> val hello = sc.broadcast("hello") hello: org.apache.spark.broadcast.Broadcast[String] = Broadcast(0)
Spark transfers the value to Spark executors once, and tasks can share it without incurring repetitive network transmissions when the broadcast variable is used multiple times.

Internally, broadcast requests BroadcastManager for a new broadcast variable.
NOTE: The current BroadcastManager is available using core:SparkEnv.md#broadcastManager[SparkEnv.broadcastManager] attribute and is always BroadcastManager (with few internal configuration changes to reflect where it runs, i.e. inside the driver or executors).
You should see the following INFO message in the logs:
Created broadcast [id] from [callSite]
If ContextCleaner is defined, the core:ContextCleaner.md#[new broadcast variable is registered for cleanup].
[NOTE]¶
Spark does not support broadcasting RDDs.
scala> sc.broadcast(sc.range(0, 10))
java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1392)
... 48 elided
¶
scala> sc.broadcast(sc.range(0, 10))
java.lang.IllegalArgumentException: requirement failed: Can not directly broadcast RDDs; instead, call collect() and broadcast the result.
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1392)
... 48 elided
Once created, the broadcast variable (and other blocks) are displayed per executor and the driver in web UI.

== [[jars]] Distribute JARs to workers
The jar you specify with SparkContext.addJar will be copied to all the worker nodes.
The configuration setting spark.jars is a comma-separated list of jar paths to be included in all tasks executed from this SparkContext. A path can either be a local file, a file in HDFS (or other Hadoop-supported filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
scala> sc.addJar("build.sbt")
15/11/11 21:54:54 Added JAR build.sbt at http://192.168.1.4:49427/jars/build.sbt with timestamp 1447275294457
CAUTION: FIXME Why is HttpFileServer used for addJar?
=== SparkContext as Application-Wide Counter
SparkContext keeps track of:
[[nextShuffleId]] * shuffle ids using nextShuffleId internal counter for scheduler:ShuffleMapStage.md[registering shuffle dependencies] to shuffle:ShuffleManager.md[Shuffle Service].
== [[stop]][[stopping]] Stopping SparkContext -- stop Method
[source, scala]¶
stop(): Unit¶
stop stops the SparkContext.
Internally, stop enables stopped internal flag. If already stopped, you should see the following INFO message in the logs:
SparkContext already stopped.
stop then does the following:
- Removes
_shutdownHookReffromShutdownHookManager - <
SparkListenerApplicationEnd>> (to < >) - spark-webui-SparkUI.md#stop[Stops web UI]
- Requests
MetricSystemto report metrics (from all registered sinks) - core:ContextCleaner.md#stop[Stops
ContextCleaner] - Requests
ExecutorAllocationManagerto stop - If
LiveListenerBuswas started, scheduler:LiveListenerBus.md#stop[requestsLiveListenerBusto stop] - Requests spark-history-server:EventLoggingListener.md#stop[
EventLoggingListenerto stop] - Requests scheduler:DAGScheduler.md#stop[
DAGSchedulerto stop] - Requests rpc:index.md#stop[RpcEnv to stop
HeartbeatReceiverendpoint] - Requests
ConsoleProgressBarto stop - Clears the reference to
TaskScheduler, i.e._taskSchedulerisnull - Requests core:SparkEnv.md#stop[
SparkEnvto stop] and clearsSparkEnv - Clears yarn/spark-yarn-client.md#SPARK_YARN_MODE[
SPARK_YARN_MODEflag] - <
>
Ultimately, you should see the following INFO message in the logs:
Successfully stopped SparkContext
Registering SparkListener¶
addSparkListener(
listener: SparkListenerInterface): Unit
addSparkListener registers a custom SparkListenerInterface.
Note
Custom listeners can also be registered declaratively using spark.extraListeners configuration property.
== [[custom-schedulers]] Custom SchedulerBackend, TaskScheduler and DAGScheduler
By default, SparkContext uses (private[spark] class) org.apache.spark.scheduler.DAGScheduler, but you can develop your own custom DAGScheduler implementation, and use (private[spark]) SparkContext.dagScheduler_=(ds: DAGScheduler) method to assign yours.
It is also applicable to SchedulerBackend and TaskScheduler using schedulerBackend_=(sb: SchedulerBackend) and taskScheduler_=(ts: TaskScheduler) methods, respectively.
CAUTION: FIXME Make it an advanced exercise.
== [[events]] Events
When a Spark context starts, it triggers SparkListener.md#SparkListenerEnvironmentUpdate[SparkListenerEnvironmentUpdate] and SparkListener.md#SparkListenerApplicationStart[SparkListenerApplicationStart] messages.
Refer to the section <
== [[setLogLevel]][[setting-default-log-level]] Setting Default Logging Level -- setLogLevel Method
[source, scala]¶
setLogLevel(logLevel: String)¶
setLogLevel allows you to set the root logging level in a Spark application, e.g. spark-shell.md[Spark shell].
Internally, setLogLevel calls ++http://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/Level.html#toLevel(java.lang.String)++[org.apache.log4j.Level.toLevel(logLevel)] that it then uses to set using ++http://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/LogManager.html#getRootLogger()++[org.apache.log4j.LogManager.getRootLogger().setLevel(level)].
[TIP]¶
You can directly set the logging level using ++http://logging.apache.org/log4j/2.x/log4j-api/apidocs/org/apache/logging/log4j/LogManager.html#getLogger()++[org.apache.log4j.LogManager.getLogger()].
[source, scala]¶
LogManager.getLogger("org").setLevel(Level.OFF)¶
====
== [[hadoopConfiguration]] Hadoop Configuration
While a <_hadoopConfiguration).
NOTE: spark-SparkHadoopUtil.md#newConfiguration[SparkHadoopUtil.get.newConfiguration] is used.
If a SparkConf is provided it is used to build the configuration as described. Otherwise, the default Configuration object is returned.
If AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY are both available, the following settings are set for the Hadoop configuration:
fs.s3.awsAccessKeyId,fs.s3n.awsAccessKeyId,fs.s3a.access.keyare set to the value ofAWS_ACCESS_KEY_IDfs.s3.awsSecretAccessKey,fs.s3n.awsSecretAccessKey, andfs.s3a.secret.keyare set to the value ofAWS_SECRET_ACCESS_KEY
Every spark.hadoop. setting becomes a setting of the configuration with the prefix spark.hadoop. removed for the key.
The value of spark.buffer.size (default: 65536) is used as the value of io.file.buffer.size.
== [[listenerBus]] listenerBus -- LiveListenerBus Event Bus
listenerBus is a scheduler:LiveListenerBus.md[] object that acts as a mechanism to announce events to other services on the spark-driver.md[driver].
LiveListenerBus is created and started when SparkContext is created and, since it is a single-JVM event bus, is exclusively used on the driver.
== [[startTime]] Time when SparkContext was Created -- startTime Property
[source, scala]¶
startTime: Long¶
startTime is the time in milliseconds when <
[source, scala]¶
scala> sc.startTime res0: Long = 1464425605653
== [[submitMapStage]] Submitting ShuffleDependency for Execution -- submitMapStage Internal Method
[source, scala]¶
submitMapStageK, V, C: SimpleFutureAction[MapOutputStatistics]
submitMapStage scheduler:DAGScheduler.md#submitMapStage[submits the input ShuffleDependency to DAGScheduler for execution] and returns a SimpleFutureAction.
Internally, submitMapStage <localProperties.
NOTE: Interestingly, submitMapStage is used exclusively when Spark SQL's spark-sql-SparkPlan-ShuffleExchange.md[ShuffleExchange] physical operator is executed.
NOTE: submitMapStage seems related to scheduler:DAGScheduler.md#adaptive-query-planning[Adaptive Query Planning / Adaptive Scheduling].
== [[cancelJobGroup]] Cancelling Job Group -- cancelJobGroup Method
[source, scala]¶
cancelJobGroup(groupId: String)¶
cancelJobGroup requests DAGScheduler scheduler:DAGScheduler.md#cancelJobGroup[to cancel a group of active Spark jobs].
NOTE: cancelJobGroup is used exclusively when SparkExecuteStatementOperation does cancel.
== [[cancelAllJobs]] Cancelling All Running and Scheduled Jobs -- cancelAllJobs Method
CAUTION: FIXME
NOTE: cancelAllJobs is used when spark-shell.md[spark-shell] is terminated (e.g. using Ctrl+C, so it can in turn terminate all active Spark jobs) or SparkSQLCLIDriver is terminated.
== [[cleaner]] ContextCleaner
[source, scala]¶
cleaner: Option[ContextCleaner]¶
SparkContext may have a core:ContextCleaner.md[ContextCleaner] defined.
ContextCleaner is created when SparkContext is created with configuration-properties.md#spark.cleaner.referenceTracking[spark.cleaner.referenceTracking] configuration property enabled.
== [[getPreferredLocs]] Finding Preferred Locations (Placement Preferences) for RDD Partition
[source, scala]¶
getPreferredLocs( rdd: RDD[_], partition: Int): Seq[TaskLocation]
getPreferredLocs simply scheduler:DAGScheduler.md#getPreferredLocs[requests DAGScheduler for the preferred locations for partition].
NOTE: Preferred locations of a partition of a RDD are also called placement preferences or locality preferences.
getPreferredLocs is used in CoalescedRDDPartition, DefaultPartitionCoalescer and PartitionerAwareUnionRDD.
== [[persistRDD]] Registering RDD in persistentRdds Internal Registry -- persistRDD Internal Method
[source, scala]¶
persistRDD(rdd: RDD[_]): Unit¶
persistRDD registers rdd in <
NOTE: persistRDD is used exclusively when RDD is rdd:index.md#persist-internal[persisted or locally checkpointed].
== [[getRDDStorageInfo]] Getting Storage Status of Cached RDDs (as RDDInfos) -- getRDDStorageInfo Methods
[source, scala]¶
getRDDStorageInfo: Array[RDDInfo] // <1> getRDDStorageInfo(filter: RDD[_] => Boolean): Array[RDDInfo] // <2>
<1> Part of Spark's Developer API that uses <2> filtering no RDDs
getRDDStorageInfo takes all the RDDs (from <filter and creates a collection of storage:RDDInfo.md[RDDInfo] instances.
getRDDStorageInfo...FIXME
In the end, getRDDStorageInfo gives only the RDD that are cached (i.e. the sum of memory and disk sizes as well as the number of partitions cached are greater than 0).
NOTE: getRDDStorageInfo is used when RDD spark-rdd-lineage.md#toDebugString[is requested for RDD lineage graph].
== [[statusStore]] Accessing AppStatusStore
[source, scala]¶
statusStore: AppStatusStore¶
statusStore gives the current core:AppStatusStore.md[].
statusStore is used when:
-
SparkContext is requested to <
> -
ConsoleProgressBaris requested to refresh -
SharedState (Spark SQL) is requested for a SQLAppStatusStore
== [[uiWebUrl]] Requesting URL of web UI -- uiWebUrl Method
[source, scala]¶
uiWebUrl: Option[String]¶
uiWebUrl requests the SparkUI for webUrl.
== [[maxNumConcurrentTasks]] maxNumConcurrentTasks Method
[source, scala]¶
maxNumConcurrentTasks(): Int¶
maxNumConcurrentTasks simply requests the <
NOTE: maxNumConcurrentTasks is used exclusively when DAGScheduler is requested to scheduler:DAGScheduler.md#checkBarrierStageWithNumSlots[checkBarrierStageWithNumSlots].
== [[environment-variables]] Environment Variables
.Environment Variables [cols="1,1,2",options="header",width="100%"] |=== | Environment Variable | Default Value | Description
| [[SPARK_EXECUTOR_MEMORY]] SPARK_EXECUTOR_MEMORY | 1024 | Amount of memory to allocate for a Spark executor in MB.
See executor:Executor.md#memory[Executor Memory].
[[SPARK_USER]] SPARK_USER |
|---|
| The user who is running SparkContext. Available later as < |
| === |
== [[addJar-internals]] addJar Method
[source, scala]¶
addJar(path: String): Unit¶
addJar...FIXME
NOTE: addJar is used when...FIXME
== [[runApproximateJob]] Running Approximate Job
[source, scala]¶
runApproximateJobT, U, R: PartialResult[R]
runApproximateJob...FIXME
runApproximateJob is used when:
-
DoubleRDDFunctions is requested to meanApprox and sumApprox
-
RDD is requested to countApprox and countByValueApprox
== [[killTaskAttempt]] Killing Task
[source, scala]¶
killTaskAttempt( taskId: Long, interruptThread: Boolean = true, reason: String = "killed via SparkContext.killTaskAttempt"): Boolean
killTaskAttempt requests the <
== [[checkpointFile]] checkpointFile Internal Method
[source, scala]¶
checkpointFileT: ClassTag: RDD[T]
checkpointFile...FIXME
== [[logging]] Logging
Enable ALL logging level for org.apache.spark.SparkContext logger to see what happens inside.
Add the following line to conf/log4j.properties:
[source,plaintext]¶
log4j.logger.org.apache.spark.SparkContext=ALL¶
Refer to spark-logging.md[Logging].
== [[internal-properties]] Internal Properties
=== [[checkpointDir]] Checkpoint Directory
[source,scala]¶
checkpointDir: Option[String] = None¶
checkpointDir is...FIXME
=== [[persistentRdds]] persistentRdds Lookup Table
Lookup table of persistent/cached RDDs per their ids.
Used when SparkContext is requested to:
- <
> - <
> - <
> - <
>
Creating SparkEnv for Driver¶
createSparkEnv(
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv
createSparkEnv uses the SparkEnv utility to create a SparkEnv for the driver (with the arguments and numDriverCores).
numDriverCores¶
numDriverCores(
master: String,
conf: SparkConf): Int
numDriverCores...FIXME