From 454c71b05261fda60008146b3fc4bd4751fd7636 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 10 Jan 2020 17:29:59 -0800 Subject: [PATCH] Preserve original permission when truncate table. --- .../apache/spark/sql/internal/SQLConf.scala | 11 +++ .../spark/sql/execution/command/tables.scala | 46 +++++++++++ .../sql/execution/command/DDLSuite.scala | 79 ++++++++++++++++++- 3 files changed, 135 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index c7167f4538098..7a83df9c5d348 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1576,6 +1576,14 @@ object SQLConf { "turning the flag on provides a way for these sources to see these partitionBy columns.") .booleanConf .createWithDefault(false) + + val TRUNCATE_TABLE_IGNORE_PERMISSION_ACL = + buildConf("spark.sql.truncateTable.ignorePermissionAcl") + .internal() + .doc("When set to true, TRUNCATE TABLE command will not try to set back original " + + "permission and ACLs when re-creating the table/partition paths.") + .booleanConf + .createWithDefault(false) } /** @@ -1983,6 +1991,9 @@ class SQLConf extends Serializable with Logging { def setOpsPrecedenceEnforced: Boolean = getConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED) + def truncateTableIgnorePermissionAcl: Boolean = + getConf(SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 6dc8426902193..5323bf65c0540 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,6 +26,7 @@ import scala.util.Try import scala.util.control.NonFatal import org.apache.hadoop.fs.{FileContext, FsConstants, Path} +import org.apache.hadoop.fs.permission.{AclEntry, FsPermission} import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier @@ -457,13 +458,58 @@ case class TruncateTableCommand( partLocations } val hadoopConf = spark.sessionState.newHadoopConf() + val ignorePermissionAcl = SQLConf.get.truncateTableIgnorePermissionAcl locations.foreach { location => if (location.isDefined) { val path = new Path(location.get) try { val fs = path.getFileSystem(hadoopConf) + + // Not all fs impl. support these APIs. + var optPermission: Option[FsPermission] = None + var optAcls: Option[java.util.List[AclEntry]] = None + if (!ignorePermissionAcl) { + val fileStatus = fs.getFileStatus(path) + try { + optPermission = Some(fileStatus.getPermission()) + } catch { + case NonFatal(_) => // do nothing + } + + try { + optAcls = Some(fs.getAclStatus(path).getEntries) + } catch { + case NonFatal(_) => // do nothing + } + } + fs.delete(path, true) + // We should keep original permission/acl of the path. + // For owner/group, only super-user can set it, for example on HDFS. Because + // current user can delete the path, we assume the user/group is correct or not an issue. fs.mkdirs(path) + if (!ignorePermissionAcl) { + optPermission.foreach { permission => + try { + fs.setPermission(path, permission) + } catch { + case NonFatal(e) => + throw new SecurityException( + s"Failed to set original permission $permission back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + } + optAcls.foreach { acls => + try { + fs.setAcl(path, acls) + } catch { + case NonFatal(e) => + throw new SecurityException( + s"Failed to set original ACL $acls back to " + + s"the created path: $path. Exception: ${e.getMessage}") + } + } + } } catch { case NonFatal(e) => throw new AnalysisException( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 2149329491077..753fe1c654228 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -21,7 +21,8 @@ import java.io.{File, PrintWriter} import java.net.URI import java.util.Locale -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{Path, RawLocalFileSystem} +import org.apache.hadoop.fs.permission.{AclEntry, AclEntryScope, AclEntryType, AclStatus, FsAction, FsPermission} import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} @@ -1935,6 +1936,60 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } + test("SPARK-30312: truncate table - keep acl/permission") { + import testImplicits._ + val ignorePermissionAcl = Seq(true, false) + + ignorePermissionAcl.foreach { ignore => + withSQLConf( + "fs.file.impl" -> classOf[FakeLocalFsFileSystem].getName, + "fs.file.impl.disable.cache" -> "true", + SQLConf.TRUNCATE_TABLE_IGNORE_PERMISSION_ACL.key -> ignore.toString) { + withTable("tab1") { + sql("CREATE TABLE tab1 (col INT) USING parquet") + sql("INSERT INTO tab1 SELECT 1") + checkAnswer(spark.table("tab1"), Row(1)) + + val tablePath = new Path(spark.sessionState.catalog + .getTableMetadata(TableIdentifier("tab1")).storage.locationUri.get) + + val hadoopConf = spark.sessionState.newHadoopConf() + val fs = tablePath.getFileSystem(hadoopConf) + val fileStatus = fs.getFileStatus(tablePath); + + fs.setPermission(tablePath, new FsPermission("777")) + assert(fileStatus.getPermission().toString() == "rwxrwxrwx") + + // Set ACL to table path. + val customAcl = new java.util.ArrayList[AclEntry]() + customAcl.add(new AclEntry.Builder() + .setType(AclEntryType.USER) + .setScope(AclEntryScope.ACCESS) + .setPermission(FsAction.READ).build()) + fs.setAcl(tablePath, customAcl) + assert(fs.getAclStatus(tablePath).getEntries().get(0) == customAcl.get(0)) + + sql("TRUNCATE TABLE tab1") + assert(spark.table("tab1").collect().isEmpty) + + val fileStatus2 = fs.getFileStatus(tablePath) + if (ignore) { + assert(fileStatus2.getPermission().toString() == "rwxr-xr-x") + } else { + assert(fileStatus2.getPermission().toString() == "rwxrwxrwx") + } + val aclEntries = fs.getAclStatus(tablePath).getEntries() + if (ignore) { + assert(aclEntries.size() == 0) + } else { + assert(aclEntries.size() == 1) + assert(aclEntries.get(0) == customAcl.get(0)) + } + } + } + } + } + test("create temporary view with mismatched schema") { withTable("tab1") { spark.range(10).write.saveAsTable("tab1") @@ -2752,3 +2807,25 @@ abstract class DDLSuite extends QueryTest with SQLTestUtils { } } } + +object FakeLocalFsFileSystem { + var aclStatus = new AclStatus.Builder().build() +} + +// A fake test local filesystem used to test ACL. It keeps a ACL status. If deletes +// a path of this filesystem, it will clean up the ACL status. Note that for test purpose, +// it has only one ACL status for all paths. +class FakeLocalFsFileSystem extends RawLocalFileSystem { + import FakeLocalFsFileSystem._ + + override def delete(f: Path, recursive: Boolean): Boolean = { + aclStatus = new AclStatus.Builder().build() + super.delete(f, recursive) + } + + override def getAclStatus(path: Path): AclStatus = aclStatus + + override def setAcl(path: Path, aclSpec: java.util.List[AclEntry]): Unit = { + aclStatus = new AclStatus.Builder().addEntries(aclSpec).build() + } +}