From 4dc94570321aedae900790a5e258114c2f9e3987 Mon Sep 17 00:00:00 2001 From: CHENXCHEN Date: Tue, 5 Apr 2022 12:44:09 +0800 Subject: [PATCH 1/2] Fix rename and delete files with different filesystem at HadoopMapReduceCommitProtocol --- .../io/HadoopMapReduceCommitProtocol.scala | 65 +++++++++++++++++-- 1 file changed, 60 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index 3a24da98ecc24..a525ba0f8c4b1 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -24,7 +24,9 @@ import scala.collection.mutable import scala.util.Try import org.apache.hadoop.conf.Configurable -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.hadoop.hdfs.DistributedFileSystem +import org.apache.hadoop.hdfs.protocol.EncryptionZone import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl @@ -194,19 +196,33 @@ class HadoopMapReduceCommitProtocol( if (hasValidPath) { val (allAbsPathFiles, allPartitionPaths) = taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip - val fs = stagingDir.getFileSystem(jobContext.getConfiguration) + val hadoopConfiguration = jobContext.getConfiguration + val fs = stagingDir.getFileSystem(hadoopConfiguration) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet if (dynamicPartitionOverwrite) { logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths") - absParentPaths.foreach(fs.delete(_, true)) + absParentPaths.foreach(path => path.getFileSystem(hadoopConfiguration) + .delete(path, true)) } logDebug(s"Create absolute parent directories: $absParentPaths") - absParentPaths.foreach(fs.mkdirs) + absParentPaths.foreach(path => path.getFileSystem(hadoopConfiguration).mkdirs(path)) for ((src, dst) <- filesToMove) { - if (!fs.rename(new Path(src), new Path(dst))) { + val srcPath = new Path(src) + val dstPath = new Path(dst) + val srcFs = srcPath.getFileSystem(hadoopConfiguration) + val dstFs = dstPath.getFileSystem(hadoopConfiguration) + // Copying files across different file systems + if (needCopy(srcPath, dstPath, srcFs, dstFs)) { + if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), dstFs, dstPath, + true, false, /* We've cleared the target path up ahead */ + hadoopConfiguration)) { + throw new IOException(s"Failed to copy $src to $dst with different file system" + + s" when committing files staged for absolute locations") + } + } else if (!fs.rename(srcPath, dstPath)) { throw new IOException(s"Failed to rename $src to $dst when committing files staged for " + s"absolute locations") } @@ -240,6 +256,45 @@ class HadoopMapReduceCommitProtocol( } } + /** + * Determines whether a file needs to be copied across file systems. + * If the file systems are different, we need to copy across file systems. + * If the file systems are the same and HDFS is used, + * we need to determine if the encryption is the same. + * + * @param srcPath The source file to copy + * @param destPath The dest path that needs to be copied to + * @param srcFs The file system of the source file + * @param destFs The file system of the dest file + * @return + */ + def needCopy(srcPath: Path, destPath: Path, + srcFs: FileSystem, destFs: FileSystem): Boolean = { + !srcFs.getUri.equals(destFs.getUri) || { + (srcFs, destFs) match { + case (srcDfs: DistributedFileSystem + , destDfs: DistributedFileSystem) => + // If it's HDFS, let's determine if `EncryptionZone` is equal + val (srcEZ: EncryptionZone, destEZ: EncryptionZone) = (srcDfs.getEZForPath(srcPath), + destDfs.getEZForPath(destPath)) + val isSrcPathEncrypted = srcEZ != null + val isDestPathEncrypted = destEZ != null + // If the source and destination paths are encrypted, + // we need to copy across `FileSystem` when their `EncryptionZone` is not equal. + // If one of the source and destination paths is encrypted, + // we need to copy across `FileSystem`. + // If the source and destination paths are not encrypted, + // we don't need to copy across `FileSystem`. + if (isSrcPathEncrypted && isDestPathEncrypted) !srcEZ.equals(destEZ) + else if (!isSrcPathEncrypted || !isDestPathEncrypted) true + else false + case _ => + // If it is not HDFS, we don't need to copy across `FileSystem` + false + } + } + } + /** * Abort the job; log and ignore any IO exception thrown. * This is invariably invoked in an exception handler; raising From b5efbab60a007da652f46c52ca1a0a38fb8ab65b Mon Sep 17 00:00:00 2001 From: CHENXCHEN Date: Thu, 7 Apr 2022 15:27:57 +0800 Subject: [PATCH 2/2] spell correction --- .../io/HadoopMapReduceCommitProtocol.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala index a525ba0f8c4b1..8cc6e6749adf3 100644 --- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala +++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala @@ -196,29 +196,29 @@ class HadoopMapReduceCommitProtocol( if (hasValidPath) { val (allAbsPathFiles, allPartitionPaths) = taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip - val hadoopConfiguration = jobContext.getConfiguration - val fs = stagingDir.getFileSystem(hadoopConfiguration) + val hadoopConf = jobContext.getConfiguration + val fs = stagingDir.getFileSystem(hadoopConf) val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _) logDebug(s"Committing files staged for absolute locations $filesToMove") val absParentPaths = filesToMove.values.map(new Path(_).getParent).toSet if (dynamicPartitionOverwrite) { logDebug(s"Clean up absolute partition directories for overwriting: $absParentPaths") - absParentPaths.foreach(path => path.getFileSystem(hadoopConfiguration) + absParentPaths.foreach(path => path.getFileSystem(hadoopConf) .delete(path, true)) } logDebug(s"Create absolute parent directories: $absParentPaths") - absParentPaths.foreach(path => path.getFileSystem(hadoopConfiguration).mkdirs(path)) + absParentPaths.foreach(path => path.getFileSystem(hadoopConf).mkdirs(path)) for ((src, dst) <- filesToMove) { val srcPath = new Path(src) val dstPath = new Path(dst) - val srcFs = srcPath.getFileSystem(hadoopConfiguration) - val dstFs = dstPath.getFileSystem(hadoopConfiguration) + val srcFs = srcPath.getFileSystem(hadoopConf) + val dstFs = dstPath.getFileSystem(hadoopConf) // Copying files across different file systems if (needCopy(srcPath, dstPath, srcFs, dstFs)) { if (!FileUtil.copy(srcFs, srcFs.listStatus(srcPath).map(_.getPath), dstFs, dstPath, true, false, /* We've cleared the target path up ahead */ - hadoopConfiguration)) { + hadoopConf)) { throw new IOException(s"Failed to copy $src to $dst with different file system" + s" when committing files staged for absolute locations") }