SparkHadoopWriter Utility¶
Writing Key-Value RDD Out (As Hadoop OutputFormat)¶
write[K, V: ClassTag](
rdd: RDD[(K, V)],
config: HadoopWriteConfigUtil[K, V]): Unit
write runs a Spark job to write out partition records (for all partitions of the given key-value RDD) with the given HadoopWriteConfigUtil and a HadoopMapReduceCommitProtocol committer.
The number of writer tasks (parallelism) is the number of the partitions in the given key-value RDD.
Internals¶
Internally, write uses the id of the given RDD as the commitJobId.
write creates a jobTrackerId with the current date.
write requests the given HadoopWriteConfigUtil to create a Hadoop JobContext (for the jobTrackerId and commitJobId).
write requests the given HadoopWriteConfigUtil to initOutputFormat with the Hadoop JobContext.
write requests the given HadoopWriteConfigUtil to assertConf.
write requests the given HadoopWriteConfigUtil to create a HadoopMapReduceCommitProtocol committer for the commitJobId.
write requests the HadoopMapReduceCommitProtocol to setupJob (with the jobContext).
write uses the SparkContext (of the given RDD) to run a Spark job asynchronously for the given RDD with the executeTask partition function.
In the end, write requests the HadoopMapReduceCommitProtocol to commit the job and prints out the following INFO message to the logs:
Job [getJobID] committed.
Throwables¶
In case of any Throwable, write prints out the following ERROR message to the logs:
Aborting job [getJobID].
write requests the HadoopMapReduceCommitProtocol to abort the job and throws a SparkException:
Job aborted.
Usage¶
write is used when:
Writing RDD Partition¶
executeTask[K, V: ClassTag](
context: TaskContext,
config: HadoopWriteConfigUtil[K, V],
jobTrackerId: String,
commitJobId: Int,
sparkPartitionId: Int,
sparkAttemptNumber: Int,
committer: FileCommitProtocol,
iterator: Iterator[(K, V)]): TaskCommitMessage
Fixme
Review Me
executeTask requests the given HadoopWriteConfigUtil to create a TaskAttemptContext.
executeTask requests the given FileCommitProtocol to set up a task with the TaskAttemptContext.
executeTask requests the given HadoopWriteConfigUtil to initWriter (with the TaskAttemptContext and the given sparkPartitionId).
executeTask initHadoopOutputMetrics.
executeTask writes all rows of the RDD partition (from the given Iterator[(K, V)]). executeTask requests the given HadoopWriteConfigUtil to write. In the end, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to commit the task.
executeTask updates metrics about writing data to external systems (bytesWritten and recordsWritten) every few records and at the end.
In case of any errors, executeTask requests the given HadoopWriteConfigUtil to closeWriter and the given FileCommitProtocol to abort the task. In the end, executeTask prints out the following ERROR message to the logs:
Task [taskAttemptID] aborted.
executeTask is used when:
SparkHadoopWriterutility is used to write
Logging¶
Enable ALL logging level for org.apache.spark.internal.io.SparkHadoopWriter logger to see what happens inside.
Add the following line to conf/log4j.properties:
log4j.logger.org.apache.spark.internal.io.SparkHadoopWriter=ALL
Refer to Logging.