From daf928e1fbd1337f8b332b807bd8fa24c4000fff Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Sun, 4 Aug 2019 14:37:14 -0700 Subject: [PATCH 1/2] Support namespaces in V2SessionCatalog. --- .../datasources/v2/V2SessionCatalog.scala | 127 ++++++++- .../v2/V2SessionCatalogSuite.scala | 251 ++++++++++++++++-- 2 files changed, 349 insertions(+), 29 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6dcebe29537d4..fd1ebf96a12d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -17,18 +17,20 @@ package org.apache.spark.sql.execution.datasources.v2 +import java.net.URI import java.util import scala.collection.JavaConverters._ import scala.collection.mutable import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalog.v2.{Identifier, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} +import org.apache.spark.sql.catalog.v2.NamespaceChange.RemoveProperty import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} -import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, CatalogTable, CatalogTableType, CatalogUtils, SessionCatalog} import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.internal.SessionState import org.apache.spark.sql.sources.v2.Table @@ -39,11 +41,16 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap /** * A [[TableCatalog]] that translates calls to the v1 SessionCatalog. */ -class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { +class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with SupportsNamespaces { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + import V2SessionCatalog._ + def this() = { this(SparkSession.active.sessionState) } + override val defaultNamespace: Array[String] = Array("default") + private lazy val catalog: SessionCatalog = sessionState.catalog private var _name: String = _ @@ -177,6 +184,90 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog { } } + override def namespaceExists(namespace: Array[String]): Boolean = namespace match { + case Array(db) => + catalog.databaseExists(db) + case _ => + false + } + + override def listNamespaces(): Array[Array[String]] = { + catalog.listDatabases().map(Array(_)).toArray + } + + override def listNamespaces(namespace: Array[String]): Array[Array[String]] = { + namespace match { + case Array() => + listNamespaces() + case Array(db) if catalog.databaseExists(db) => + Array() + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def loadNamespaceMetadata(namespace: Array[String]): util.Map[String, String] = { + namespace match { + case Array(db) => + catalog.getDatabaseMetadata(db).toMetadata + + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def createNamespace( + namespace: Array[String], + metadata: util.Map[String, String]): Unit = namespace match { + case Array(db) if !catalog.databaseExists(db) => + catalog.createDatabase( + toCatalogDatabase(db, metadata, defaultLocation = Some(catalog.getDefaultDBPath(db))), + ignoreIfExists = false) + + case Array(_) => + throw new NamespaceAlreadyExistsException(namespace) + + case _ => + throw new IllegalArgumentException(s"Invalid namespace name: ${namespace.quoted}") + } + + override def alterNamespace(namespace: Array[String], changes: NamespaceChange*): Unit = { + namespace match { + case Array(db) => + // validate that this catalog's reserved properties are not removed + changes.foreach { + case remove: RemoveProperty + if remove.property == "location" || remove.property == "comment" => + throw new UnsupportedOperationException( + s"Cannot remove reserved property: ${remove.property}") + case _ => + } + + val metadata = catalog.getDatabaseMetadata(db).toMetadata + catalog.alterDatabase( + toCatalogDatabase(db, CatalogV2Util.applyNamespaceChanges(metadata, changes))) + + case _ => + throw new NoSuchNamespaceException(namespace) + } + } + + override def dropNamespace(namespace: Array[String]): Boolean = namespace match { + case Array(db) if catalog.databaseExists(db) => + if (catalog.listTables(db).nonEmpty) { + throw new IllegalStateException(s"Namespace ${namespace.quoted} is not empty") + } + catalog.dropDatabase(db, ignoreIfNotExists = false, cascade = false) + true + + case Array(_) => + // exists returned false + false + + case _ => + throw new NoSuchNamespaceException(namespace) + } + override def toString: String = s"V2SessionCatalog($name)" } @@ -202,4 +293,32 @@ private[sql] object V2SessionCatalog { (identityCols, bucketSpec) } + + private def toCatalogDatabase( + db: String, + metadata: util.Map[String, String], + defaultLocation: Option[URI] = None): CatalogDatabase = { + CatalogDatabase( + name = db, + description = metadata.getOrDefault("comment", ""), + locationUri = Option(metadata.get("location")) + .map(CatalogUtils.stringToURI) + .orElse(defaultLocation) + .getOrElse(throw new IllegalArgumentException("Missing database location")), + properties = metadata.asScala.toMap -- Seq("comment", "location")) + } + + private implicit class CatalogDatabaseHelper(catalogDatabase: CatalogDatabase) { + def toMetadata: util.Map[String, String] = { + val metadata = mutable.HashMap[String, String]() + + catalogDatabase.properties.foreach { + case (key, value) => metadata.put(key, value) + } + metadata.put("location", catalogDatabase.locationUri.toString) + metadata.put("comment", catalogDatabase.description) + + metadata.asJava + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 64460d0338054..77458847be6e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -22,41 +22,55 @@ import java.util.Collections import scala.collection.JavaConverters._ -import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} +import org.scalatest.BeforeAndAfter import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException -import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, TableChange} -import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, TableAlreadyExistsException} +import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, NamespaceChange, TableChange} +import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class V2SessionCatalogSuite - extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { - import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ +class V2SessionCatalogSuite extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { - private val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] - private val schema: StructType = new StructType() + val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] + val schema: StructType = new StructType() .add("id", IntegerType) .add("data", StringType) + val testNs: Array[String] = Array("db") + val defaultNs: Array[String] = Array("default") + val testIdent: Identifier = Identifier.of(testNs, "test_table") + + def newCatalog(): V2SessionCatalog = { + val newCatalog = new V2SessionCatalog(spark.sessionState) + newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) + newCatalog + } +} + +class V2SessionCatalogTableSuite extends V2SessionCatalogSuite { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + override protected def beforeAll(): Unit = { super.beforeAll() - spark.sql("""CREATE DATABASE IF NOT EXISTS db""") - spark.sql("""CREATE DATABASE IF NOT EXISTS db2""") - spark.sql("""CREATE DATABASE IF NOT EXISTS ns""") - spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""") + val catalog = newCatalog() + catalog.createNamespace(Array("db"), emptyProps) + catalog.createNamespace(Array("db2"), emptyProps) + catalog.createNamespace(Array("ns"), emptyProps) + catalog.createNamespace(Array("ns2"), emptyProps) } override protected def afterAll(): Unit = { - spark.sql("""DROP TABLE IF EXISTS db.test_table""") - spark.sql("""DROP DATABASE IF EXISTS db""") - spark.sql("""DROP DATABASE IF EXISTS db2""") - spark.sql("""DROP DATABASE IF EXISTS ns""") - spark.sql("""DROP DATABASE IF EXISTS ns2""") + val catalog = newCatalog() + catalog.dropNamespace(Array("db")) + catalog.dropNamespace(Array("db2")) + catalog.dropNamespace(Array("ns")) + catalog.dropNamespace(Array("ns2")) super.afterAll() } @@ -65,14 +79,6 @@ class V2SessionCatalogSuite newCatalog().dropTable(testIdentNew) } - private def newCatalog(): TableCatalog = { - val newCatalog = new V2SessionCatalog(spark.sessionState) - newCatalog.initialize("test", CaseInsensitiveStringMap.empty()) - newCatalog - } - - private val testNs = Array("db") - private val testIdent = Identifier.of(testNs, "test_table") private val testIdentNew = Identifier.of(testNs, "test_table_new") test("Catalogs can load the catalog") { @@ -753,3 +759,198 @@ class V2SessionCatalogSuite assert(exc.message.contains("RENAME TABLE source and destination databases do not match")) } } + +class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { + + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + def checkMetadata( + expected: scala.collection.Map[String, String], + actual: scala.collection.Map[String, String]): Unit = { + // remove location and comment that are automatically added by HMS unless they are expected + val toRemove = Seq("location", "comment").filter(expected.contains) + assert(expected -- toRemove === actual) + } + + test("listNamespaces: basic behavior") { + val catalog = newCatalog() + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + assert(catalog.listNamespaces() === Array(testNs, defaultNs)) + assert(catalog.listNamespaces(Array()) === Array(testNs, defaultNs)) + assert(catalog.listNamespaces(testNs) === Array()) + + catalog.dropNamespace(testNs) + } + + test("listNamespaces: fail if missing namespace") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + assert(catalog.listNamespaces(testNs) === Array()) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === false) + } + + test("loadNamespaceMetadata: fail missing namespace") { + val catalog = newCatalog() + + val exc = intercept[NoSuchNamespaceException] { + catalog.loadNamespaceMetadata(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } + + test("loadNamespaceMetadata: non-empty metadata") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(metadata.asScala, Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("loadNamespaceMetadata: empty metadata") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + catalog.createNamespace(testNs, emptyProps) + + val metadata = catalog.loadNamespaceMetadata(testNs) + + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(metadata.asScala, emptyProps.asScala) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: basic behavior") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: fail if namespace already exists") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + val exc = intercept[NamespaceAlreadyExistsException] { + catalog.createNamespace(testNs, Map("property" -> "value2").asJava) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("createTable: fail if namespace does not exist") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === false) + } + + test("dropNamespace: drop missing namespace") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === false) + } + + test("dropNamespace: drop empty namespace") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, emptyProps) + + assert(catalog.namespaceExists(testNs) === true) + + val ret = catalog.dropNamespace(testNs) + + assert(ret === true) + assert(catalog.namespaceExists(testNs) === false) + } + + test("dropNamespace: fail if not empty") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + catalog.createTable(testIdent, schema, Array.empty, emptyProps) + + val exc = intercept[IllegalStateException] { + catalog.dropNamespace(testNs) + } + + assert(exc.getMessage.contains(testNs.quoted)) + assert(catalog.namespaceExists(testNs) === true) + checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + + catalog.dropTable(testIdent) + catalog.dropNamespace(testNs) + } + + test("alterNamespace: basic behavior") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("property" -> "value").asJava) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property2", "value2")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value", "property2" -> "value2")) + + catalog.alterNamespace(testNs, + NamespaceChange.removeProperty("property2"), + NamespaceChange.setProperty("property3", "value3")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value", "property3" -> "value3")) + + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("property3")) + checkMetadata( + catalog.loadNamespaceMetadata(testNs).asScala, + Map("property" -> "value")) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: fail if namespace doesn't exist") { + val catalog = newCatalog() + + assert(catalog.namespaceExists(testNs) === false) + + val exc = intercept[NoSuchNamespaceException] { + catalog.alterNamespace(testNs, NamespaceChange.setProperty("property", "value")) + } + + assert(exc.getMessage.contains(testNs.quoted)) + } +} From 9659f52c1e925cc9c4b8f70018764a4868b8a82c Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 30 Aug 2019 17:15:54 -0700 Subject: [PATCH 2/2] Update tests after review. --- .../datasources/v2/V2SessionCatalog.scala | 23 ++-- .../v2/V2SessionCatalogSuite.scala | 106 +++++++++++++++++- 2 files changed, 114 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index fd1ebf96a12d2..6f8cf47215216 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -25,8 +25,8 @@ import scala.collection.mutable import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalog.v2.{Identifier, NamespaceChange, SupportsNamespaces, TableCatalog, TableChange} -import org.apache.spark.sql.catalog.v2.NamespaceChange.RemoveProperty -import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, LogicalExpressions, Transform} +import org.apache.spark.sql.catalog.v2.NamespaceChange.{RemoveProperty, SetProperty} +import org.apache.spark.sql.catalog.v2.expressions.{BucketTransform, FieldReference, IdentityTransform, Transform} import org.apache.spark.sql.catalog.v2.utils.CatalogV2Util import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, NoSuchNamespaceException, NoSuchTableException, TableAlreadyExistsException} @@ -94,7 +94,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup val (partitionColumns, maybeBucketSpec) = V2SessionCatalog.convertTransforms(partitions) val provider = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) val tableProperties = properties.asScala - val location = Option(properties.get("location")) + val location = Option(properties.get(LOCATION_TABLE_PROP)) val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) .copy(locationUri = location.map(CatalogUtils.stringToURI)) val tableType = if (location.isDefined) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED @@ -109,7 +109,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup bucketSpec = maybeBucketSpec, properties = tableProperties.toMap, tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions, - comment = Option(properties.get("comment"))) + comment = Option(properties.get(COMMENT_TABLE_PROP))) try { catalog.createTable(tableDesc, ignoreIfExists = false) @@ -236,8 +236,7 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup case Array(db) => // validate that this catalog's reserved properties are not removed changes.foreach { - case remove: RemoveProperty - if remove.property == "location" || remove.property == "comment" => + case remove: RemoveProperty if RESERVED_PROPERTIES.contains(remove.property) => throw new UnsupportedOperationException( s"Cannot remove reserved property: ${remove.property}") case _ => @@ -272,6 +271,10 @@ class V2SessionCatalog(sessionState: SessionState) extends TableCatalog with Sup } private[sql] object V2SessionCatalog { + val COMMENT_TABLE_PROP: String = "comment" + val LOCATION_TABLE_PROP: String = "location" + val RESERVED_PROPERTIES: Set[String] = Set(COMMENT_TABLE_PROP, LOCATION_TABLE_PROP) + /** * Convert v2 Transforms to v1 partition columns and an optional bucket spec. */ @@ -300,8 +303,8 @@ private[sql] object V2SessionCatalog { defaultLocation: Option[URI] = None): CatalogDatabase = { CatalogDatabase( name = db, - description = metadata.getOrDefault("comment", ""), - locationUri = Option(metadata.get("location")) + description = metadata.getOrDefault(COMMENT_TABLE_PROP, ""), + locationUri = Option(metadata.get(LOCATION_TABLE_PROP)) .map(CatalogUtils.stringToURI) .orElse(defaultLocation) .getOrElse(throw new IllegalArgumentException("Missing database location")), @@ -315,8 +318,8 @@ private[sql] object V2SessionCatalog { catalogDatabase.properties.foreach { case (key, value) => metadata.put(key, value) } - metadata.put("location", catalogDatabase.locationUri.toString) - metadata.put("comment", catalogDatabase.description) + metadata.put(LOCATION_TABLE_PROP, catalogDatabase.locationUri.toString) + metadata.put(COMMENT_TABLE_PROP, catalogDatabase.description) metadata.asJava } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala index 77458847be6e0..275bc339b3b5b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StringType, StructField, StructType, TimestampType} import org.apache.spark.sql.util.CaseInsensitiveStringMap -class V2SessionCatalogSuite extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { +class V2SessionCatalogBaseSuite extends SparkFunSuite with SharedSparkSession with BeforeAndAfter { val emptyProps: util.Map[String, String] = Collections.emptyMap[String, String] val schema: StructType = new StructType() @@ -52,12 +52,13 @@ class V2SessionCatalogSuite extends SparkFunSuite with SharedSparkSession with B } } -class V2SessionCatalogTableSuite extends V2SessionCatalogSuite { +class V2SessionCatalogTableSuite extends V2SessionCatalogBaseSuite { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ override protected def beforeAll(): Unit = { super.beforeAll() + // TODO: when there is a public API for v2 catalogs, use that instead val catalog = newCatalog() catalog.createNamespace(Array("db"), emptyProps) catalog.createNamespace(Array("db2"), emptyProps) @@ -760,7 +761,7 @@ class V2SessionCatalogTableSuite extends V2SessionCatalogSuite { } } -class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { +class V2SessionCatalogNamespaceSuite extends V2SessionCatalogBaseSuite { import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ @@ -768,7 +769,7 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { expected: scala.collection.Map[String, String], actual: scala.collection.Map[String, String]): Unit = { // remove location and comment that are automatically added by HMS unless they are expected - val toRemove = Seq("location", "comment").filter(expected.contains) + val toRemove = V2SessionCatalog.RESERVED_PROPERTIES.filter(expected.contains) assert(expected -- toRemove === actual) } @@ -838,11 +839,32 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { test("createNamespace: basic behavior") { val catalog = newCatalog() + val expectedPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString catalog.createNamespace(testNs, Map("property" -> "value").asJava) + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + assert(catalog.namespaceExists(testNs) === true) - checkMetadata(catalog.loadNamespaceMetadata(testNs).asScala, Map("property" -> "value")) + val metadata = catalog.loadNamespaceMetadata(testNs).asScala + checkMetadata(metadata, Map("property" -> "value")) + assert(expectedPath === metadata("location")) + + catalog.dropNamespace(testNs) + } + + test("createNamespace: initialize location") { + val catalog = newCatalog() + val expectedPath = "file:/tmp/db.db" + + catalog.createNamespace(testNs, Map("location" -> expectedPath).asJava) + + assert(expectedPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + assert(catalog.namespaceExists(testNs) === true) + val metadata = catalog.loadNamespaceMetadata(testNs).asScala + checkMetadata(metadata, Map.empty) + assert(expectedPath === metadata("location")) catalog.dropNamespace(testNs) } @@ -863,6 +885,21 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { catalog.dropNamespace(testNs) } + test("createNamespace: fail nested namespace") { + val catalog = newCatalog() + + // ensure the parent exists + catalog.createNamespace(Array("db"), emptyProps) + + val exc = intercept[IllegalArgumentException] { + catalog.createNamespace(Array("db", "nested"), emptyProps) + } + + assert(exc.getMessage.contains("Invalid namespace name: db.nested")) + + catalog.dropNamespace(Array("db")) + } + test("createTable: fail if namespace does not exist") { val catalog = newCatalog() @@ -942,6 +979,37 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { catalog.dropNamespace(testNs) } + test("alterNamespace: update namespace location") { + val catalog = newCatalog() + val initialPath = sqlContext.sessionState.catalog.getDefaultDBPath(testNs(0)).toString + val newPath = "file:/tmp/db.db" + + catalog.createNamespace(testNs, emptyProps) + + assert(initialPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("location", newPath)) + + assert(newPath === spark.catalog.getDatabase(testNs(0)).locationUri.toString) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: update namespace comment") { + val catalog = newCatalog() + val newComment = "test db" + + catalog.createNamespace(testNs, emptyProps) + + assert(spark.catalog.getDatabase(testNs(0)).description.isEmpty) + + catalog.alterNamespace(testNs, NamespaceChange.setProperty("comment", newComment)) + + assert(newComment === spark.catalog.getDatabase(testNs(0)).description) + + catalog.dropNamespace(testNs) + } + test("alterNamespace: fail if namespace doesn't exist") { val catalog = newCatalog() @@ -953,4 +1021,32 @@ class V2SessionCatalogNamespaceSuite extends V2SessionCatalogSuite { assert(exc.getMessage.contains(testNs.quoted)) } + + test("alterNamespace: fail to remove location") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, emptyProps) + + val exc = intercept[UnsupportedOperationException] { + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("location")) + } + + assert(exc.getMessage.contains("Cannot remove reserved property: location")) + + catalog.dropNamespace(testNs) + } + + test("alterNamespace: fail to remove comment") { + val catalog = newCatalog() + + catalog.createNamespace(testNs, Map("comment" -> "test db").asJava) + + val exc = intercept[UnsupportedOperationException] { + catalog.alterNamespace(testNs, NamespaceChange.removeProperty("comment")) + } + + assert(exc.getMessage.contains("Cannot remove reserved property: comment")) + + catalog.dropNamespace(testNs) + } }