Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import scala.collection.mutable
import scala.util.Try

import org.apache.hadoop.conf.Configurable
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.hadoop.hdfs.protocol.EncryptionZone
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
Expand Down Expand Up @@ -194,19 +196,33 @@ class HadoopMapReduceCommitProtocol(
if (hasValidPath) {
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
val hadoopConf = jobContext.getConfiguration
val fs = stagingDir.getFileSystem(hadoopConf)

val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet
if (dynamicPartitionOverwrite) {
logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths")
absParentPaths.foreach(fs.delete(_, true))
absParentPaths.foreach(path => path.getFileSystem(hadoopConf)
.delete(path, true))
}
logDebug(s"Create absolute parent directories: $absParentPaths")
absParentPaths.foreach(fs.mkdirs)
absParentPaths.foreach(path => path.getFileSystem(hadoopConf).mkdirs(path))
for ((src, dst) <- filesToMove) {
if (!fs.rename(new Path(src), new Path(dst))) {
val srcPath = new Path(src)
val dstPath = new Path(dst)
val srcFs = srcPath.getFileSystem(hadoopConf)
val dstFs = dstPath.getFileSystem(hadoopConf)
// Copying files across different file systems
if (needCopy(srcPath, dstPath, srcFs, dstFs)) {
if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), dstFs, dstPath,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may want to think about parallelizing the copy, as for each file it is now going to take time proportional to data.length/(download_bandwidth+upload_bandwidth)

shame copy returns false sometimes; looks like it is only if mkdirs() on the dest or delete(src) fails.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can i highlight something i've noticed here, that copy() command stos on src read() returning -1, without doing any checks to validate file length, not great.

true, false, /* We've cleared the target path up ahead */
hadoopConf)) {
throw new IOException(s"Failed to copy $src to $dst with different file system" +
s" when committing files staged for absolute locations")
}
} else if (!fs.rename(srcPath, dstPath)) {
throw new IOException(s"Failed to rename $src to $dst when committing files staged for " +
s"absolute locations")
}
Expand Down Expand Up @@ -240,6 +256,45 @@ class HadoopMapReduceCommitProtocol(
}
}

/**
* Determines whether a file needs to be copied across file systems.
* If the file systems are different, we need to copy across file systems.
* If the file systems are the same and HDFS is used,
* we need to determine if the encryption is the same.
*
* @param srcPath The source file to copy
* @param destPath The dest path that needs to be copied to
* @param srcFs The file system of the source file
* @param destFs The file system of the dest file
* @return
*/
def needCopy(srcPath: Path, destPath: Path,
srcFs: FileSystem, destFs: FileSystem): Boolean = {
!srcFs.getUri.equals(destFs.getUri) || {
(srcFs, destFs) match {
case (srcDfs: DistributedFileSystem
, destDfs: DistributedFileSystem) =>
// If it's HDFS, let's determine if `EncryptionZone` is equal
val (srcEZ: EncryptionZone, destEZ: EncryptionZone) = (srcDfs.getEZForPath(srcPath),
destDfs.getEZForPath(destPath))
val isSrcPathEncrypted = srcEZ != null
val isDestPathEncrypted = destEZ != null
// If the source and destination paths are encrypted,
// we need to copy across `FileSystem` when their `EncryptionZone` is not equal.
// If one of the source and destination paths is encrypted,
// we need to copy across `FileSystem`.
// If the source and destination paths are not encrypted,
// we don't need to copy across `FileSystem`.
if (isSrcPathEncrypted && isDestPathEncrypted) !srcEZ.equals(destEZ)
else if (!isSrcPathEncrypted || !isDestPathEncrypted) true
else false
case _ =>
// If it is not HDFS, we don't need to copy across `FileSystem`
false
}
}
}

/**
* Abort the job; log and ignore any IO exception thrown.
* This is invariably invoked in an exception handler; raising
Expand Down