FileCommitProtocol¶
FileCommitProtocol
is an abstraction of file committers that can setup, commit or abort a Spark job or task (while writing out a pair RDD and partitions).
FileCommitProtocol
is used for RDD.saveAsNewAPIHadoopDataset and RDD.saveAsHadoopDataset transformations (that use SparkHadoopWriter
utility to write a key-value RDD out).
FileCommitProtocol
is created using FileCommitProtocol.instantiate utility.
Contract¶
Aborting Job¶
abortJob(
jobContext: JobContext): Unit
Aborts a job
Used when:
SparkHadoopWriter
utility is used to write a key-value RDD (and writing fails)- (Spark SQL)
FileFormatWriter
utility is used to write a result of a structured query (and writing fails) - (Spark SQL)
FileBatchWrite
is requested toabort
Aborting Task¶
abortTask(
taskContext: TaskAttemptContext): Unit
Abort a task
Used when:
SparkHadoopWriter
utility is used to write an RDD partition- (Spark SQL)
FileFormatDataWriter
is requested toabort
Committing Job¶
commitJob(
jobContext: JobContext,
taskCommits: Seq[TaskCommitMessage]): Unit
Commits a job after the writes succeed
Used when:
SparkHadoopWriter
utility is used to write a key-value RDD- (Spark SQL)
FileFormatWriter
utility is used to write a result of a structured query - (Spark SQL)
FileBatchWrite
is requested tocommit
Committing Task¶
commitTask(
taskContext: TaskAttemptContext): TaskCommitMessage
Used when:
SparkHadoopWriter
utility is used to write an RDD partition- (Spark SQL)
FileFormatDataWriter
is requested tocommit
Deleting Path with Job¶
deleteWithJob(
fs: FileSystem,
path: Path,
recursive: Boolean): Boolean
deleteWithJob
requests the given Hadoop FileSystem to delete a path
directory.
Used when InsertIntoHadoopFsRelationCommand
logical command (Spark SQL) is executed
newTaskTempFile¶
newTaskTempFile(
taskContext: TaskAttemptContext,
dir: Option[String],
ext: String): String
Used when:
- (Spark SQL)
SingleDirectoryDataWriter
andDynamicPartitionDataWriter
are requested towrite
(and in turnnewOutputWriter
)
newTaskTempFileAbsPath¶
newTaskTempFileAbsPath(
taskContext: TaskAttemptContext,
absoluteDir: String,
ext: String): String
Used when:
- (Spark SQL)
DynamicPartitionDataWriter
is requested towrite
On Task Committed¶
onTaskCommit(
taskCommit: TaskCommitMessage): Unit
Used when:
- (Spark SQL)
FileFormatWriter
is requested towrite
Setting Up Job¶
setupJob(
jobContext: JobContext): Unit
Used when:
SparkHadoopWriter
utility is used to write an RDD partition (while writing out a key-value RDD)- (Spark SQL)
FileFormatWriter
utility is used to write a result of a structured query - (Spark SQL)
FileWriteBuilder
is requested tobuildForBatch
Setting Up Task¶
setupTask(
taskContext: TaskAttemptContext): Unit
Sets up the task with the Hadoop TaskAttemptContext
Used when:
SparkHadoopWriter
is requested to write an RDD partition (while writing out a key-value RDD)- (Spark SQL)
FileFormatWriter
utility is used to write out a RDD partition (while writing out a result of a structured query) - (Spark SQL)
FileWriterFactory
is requested tocreateWriter
Implementations¶
- HadoopMapReduceCommitProtocol
ManifestFileCommitProtocol
(qv. Spark Structured Streaming)
Instantiating FileCommitProtocol Committer¶
instantiate(
className: String,
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol
instantiate
prints out the following DEBUG message to the logs:
Creating committer [className]; job [jobId]; output=[outputPath]; dynamic=[dynamicPartitionOverwrite]
instantiate
tries to find a constructor method that takes three arguments (two of type String
and one Boolean
) for the given jobId
, outputPath
and dynamicPartitionOverwrite
flag. If found, instantiate
prints out the following DEBUG message to the logs:
Using (String, String, Boolean) constructor
In case of NoSuchMethodException
, instantiate
prints out the following DEBUG message to the logs:
Falling back to (String, String) constructor
instantiate
tries to find a constructor method that takes two arguments (two of type String
) for the given jobId
and outputPath
.
With two String
arguments, instantiate
requires that the given dynamicPartitionOverwrite
flag is disabled (false
) or throws an IllegalArgumentException
:
requirement failed: Dynamic Partition Overwrite is enabled but the committer [className] does not have the appropriate constructor
instantiate
is used when:
- HadoopMapRedWriteConfigUtil and HadoopMapReduceWriteConfigUtil are requested to create a HadoopMapReduceCommitProtocol committer
- (Spark SQL)
InsertIntoHadoopFsRelationCommand
,InsertIntoHiveDirCommand
, andInsertIntoHiveTable
logical commands are executed - (Spark Structured Streaming)
FileStreamSink
is requested to write out a micro-batch data
Logging¶
Enable ALL
logging level for org.apache.spark.internal.io.FileCommitProtocol
logger to see what happens inside.
Add the following line to conf/log4j.properties
:
log4j.logger.org.apache.spark.internal.io.FileCommitProtocol=ALL
Refer to Logging.