OutputCommitCoordinator¶
From the scaladoc (it's a private[spark]
class so no way to find it outside the code):
Authority that decides whether tasks can commit output to HDFS. Uses a "first committer wins" policy.
OutputCommitCoordinator is instantiated in both the drivers and executors. On executors, it is configured with a reference to the driver's OutputCommitCoordinatorEndpoint, so requests to commit output will be forwarded to the driver's OutputCommitCoordinator.
This class was introduced in SPARK-4879; see that JIRA issue (and the associated pull requests) for an extensive design discussion.
Creating Instance¶
OutputCommitCoordinator
takes the following to be created:
- SparkConf
-
isDriver
flag
OutputCommitCoordinator
is created when:
SparkEnv
utility is used to create a SparkEnv on the driver
OutputCommitCoordinator RPC Endpoint¶
coordinatorRef: Option[RpcEndpointRef]
OutputCommitCoordinator
is registered as OutputCommitCoordinator (with OutputCommitCoordinatorEndpoint
RPC Endpoint) in the RPC Environment on the driver (when SparkEnv
utility is used to create "base" SparkEnv). Executors have an RpcEndpointRef to the endpoint on the driver.
coordinatorRef
is used to post an AskPermissionToCommitOutput
(by executors) to the OutputCommitCoordinator
(when canCommit).
coordinatorRef
is used to stop the OutputCommitCoordinator
on the driver (when stop).
canCommit¶
canCommit(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Boolean
canCommit
creates a AskPermissionToCommitOutput
message and sends it (asynchronously) to the OutputCommitCoordinator RPC Endpoint.
canCommit
is used when:
SparkHadoopMapRedUtil
is requested tocommitTask
(withspark.hadoop.outputCommitCoordination.enabled
configuration property enabled)DataWritingSparkTask
(Spark SQL) utility is used torun
handleAskPermissionToCommit¶
handleAskPermissionToCommit(
stage: Int,
stageAttempt: Int,
partition: Int,
attemptNumber: Int): Boolean
handleAskPermissionToCommit
...FIXME
handleAskPermissionToCommit
is used when:
OutputCommitCoordinatorEndpoint
is requested to handle aAskPermissionToCommitOutput
message (that happens after it was sent out in canCommit)
Logging¶
Enable ALL
logging level for org.apache.spark.scheduler.OutputCommitCoordinator
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.scheduler.OutputCommitCoordinator=ALL
Refer to Logging.