From 528b835eddc9f80b1937e27858391cdfd656f0c2 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 23 May 2016 17:30:37 -0700 Subject: [PATCH 1/8] Fixes SPARK-15269 --- .../apache/spark/sql/AnalysisException.scala | 5 +-- .../sql/catalyst/catalog/SessionCatalog.scala | 22 ++++++++++-- .../command/createDataSourceTables.scala | 4 +-- .../spark/sql/hive/HiveExternalCatalog.scala | 4 ++- .../sql/hive/client/HiveClientImpl.scala | 16 +++++++-- .../sql/hive/MetastoreDataSourcesSuite.scala | 13 +++++++ .../spark/sql/hive/ShowCreateTableSuite.scala | 36 +++++++++---------- 7 files changed, 72 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala index d2003fd6892e1..6911843999392 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala @@ -32,8 +32,9 @@ class AnalysisException protected[sql] ( val message: String, val line: Option[Int] = None, val startPosition: Option[Int] = None, - val plan: Option[LogicalPlan] = None) - extends Exception with Serializable { + val plan: Option[LogicalPlan] = None, + val cause: Option[Throwable] = None) + extends Exception(message, cause.orNull) with Serializable { def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = { val newException = new AnalysisException(message, line, startPosition) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index cf9286e6b97a6..325f7fa3fcb9e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException @@ -216,7 +216,25 @@ class SessionCatalog( val table = formatTableName(tableDefinition.identifier.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) - externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) + + if (newTableDefinition.tableType == CatalogTableType.EXTERNAL) { + // !! HACK ALERT !! + // + // See https://issues.apache.org/jira/browse/SPARK-15269 for more details about why we have to + // set `locationUri` and then remove the directory after creating the external table: + val tablePath = defaultTablePath(newTableDefinition.identifier) + try { + externalCatalog.createTable( + db, + newTableDefinition.withNewStorage(locationUri = Some(tablePath)), + ignoreIfExists) + } finally { + val path = new Path(tablePath) + FileSystem.get(path.toUri, hadoopConf).delete(path, true) + } + } else { + externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 4b9aab612e7c3..9956c5b09236d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -118,8 +118,8 @@ case class CreateDataSourceTableCommand( /** * A command used to create a data source table using the result of a query. * - * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference. - * This is not intended for temporary tables. + * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for + * difference. This is not intended for temporary tables. * * The syntax of using this command in SQL is: * {{{ diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5ffd8ef149a1e..54f0dbd10cd52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,7 @@ import java.util import scala.util.control.NonFatal +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.thrift.TException @@ -68,7 +69,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat body } catch { case NonFatal(e) if isClientException(e) => - throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage) + throw new AnalysisException( + e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e)) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 71d5c9960a70c..6e09aa8e9c454 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.execution.QueryExecutionException +import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.util.{CircularBuffer, Utils} /** @@ -323,7 +324,7 @@ private[hive] class HiveClientImpl( } override def listDatabases(pattern: String): Seq[String] = withHiveState { - client.getDatabasesByPattern(pattern).asScala.toSeq + client.getDatabasesByPattern(pattern).asScala } override def getTableOption( @@ -351,6 +352,8 @@ private[hive] class HiveClientImpl( unsupportedFeatures += "bucketing" } + val properties = h.getParameters.asScala.toMap + CatalogTable( identifier = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { @@ -368,14 +371,21 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h), + locationUri = shim.getDataLocation(h).filterNot { _ => + // SPARK-15269: Persisted data source tables always store the location URI as a SerDe + // property named "path" instead of standard Hive `dataLocation`, because Hive only + // allows directory paths as location URIs while Spark SQL data source tables also + // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL + // data source tables. + DDLUtils.isDatasourceTable(properties) + }, inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), serde = Option(h.getSerializationLib), compressed = h.getTTable.getSd.isCompressed, serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap ), - properties = h.getParameters.asScala.toMap, + properties = properties, viewOriginalText = Option(h.getViewOriginalText), viewText = Option(h.getViewExpandedText), unsupportedFeatures = unsupportedFeatures) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 2c50cc88cc4cc..095983d6dda72 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1104,4 +1104,17 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv } } } + + test("SPARK-15269 external data source table creation") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(1).write.json(path) + + withTable("t") { + sql(s"CREATE TABLE t USING json OPTIONS (PATH '/tmp/test.json')") + sql("DROP TABLE t") + sql(s"CREATE TABLE t USING json AS SELECT 1 AS c") + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala index f789d88d5dd4a..3f3dc122093b5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala @@ -28,11 +28,11 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing import testImplicits._ test("data source table with user specified schema") { - withTable("ddl_test1") { + withTable("ddl_test") { val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile sql( - s"""CREATE TABLE ddl_test1 ( + s"""CREATE TABLE ddl_test ( | a STRING, | b STRING, | `extra col` ARRAY, @@ -45,55 +45,55 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing """.stripMargin ) - checkCreateTable("ddl_test1") + checkCreateTable("ddl_test") } } test("data source table CTAS") { - withTable("ddl_test2") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test2 + s"""CREATE TABLE ddl_test |USING json |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test2") + checkCreateTable("ddl_test") } } test("partitioned data source table") { - withTable("ddl_test3") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test3 + s"""CREATE TABLE ddl_test |USING json |PARTITIONED BY (b) |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test3") + checkCreateTable("ddl_test") } } test("bucketed data source table") { - withTable("ddl_test3") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test3 + s"""CREATE TABLE ddl_test |USING json |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS |AS SELECT 1 AS a, "foo" AS b """.stripMargin ) - checkCreateTable("ddl_test3") + checkCreateTable("ddl_test") } } test("partitioned bucketed data source table") { - withTable("ddl_test4") { + withTable("ddl_test") { sql( - s"""CREATE TABLE ddl_test4 + s"""CREATE TABLE ddl_test |USING json |PARTITIONED BY (c) |CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS @@ -101,12 +101,12 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing """.stripMargin ) - checkCreateTable("ddl_test4") + checkCreateTable("ddl_test") } } test("data source table using Dataset API") { - withTable("ddl_test5") { + withTable("ddl_test") { spark .range(3) .select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e) @@ -114,9 +114,9 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing .mode("overwrite") .partitionBy("a", "b") .bucketBy(2, "c", "d") - .saveAsTable("ddl_test5") + .saveAsTable("ddl_test") - checkCreateTable("ddl_test5") + checkCreateTable("ddl_test") } } From fa7b5b6f676553757da96ccd40944ec349a489af Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 24 May 2016 14:57:34 -0700 Subject: [PATCH 2/8] Fixes test failures --- .../sql/catalyst/catalog/SessionCatalog.scala | 29 ++++++++++++++----- .../org/apache/spark/sql/SparkSession.scala | 6 ++-- .../command/createDataSourceTables.scala | 5 +++- .../sql/hive/client/HiveClientImpl.scala | 2 +- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 325f7fa3fcb9e..22cc813115cb2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -212,24 +212,37 @@ class SessionCatalog( * If no such database is specified, create it in the current database. */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) - val table = formatTableName(tableDefinition.identifier.table) + val tableId = tableDefinition.identifier + val db = formatDatabaseName(tableId.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableId.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) - if (newTableDefinition.tableType == CatalogTableType.EXTERNAL) { + if ( + // If this is an external data source table + tableDefinition.properties.contains("spark.sql.sources.provider") && + newTableDefinition.tableType == CatalogTableType.EXTERNAL + ) { // !! HACK ALERT !! // - // See https://issues.apache.org/jira/browse/SPARK-15269 for more details about why we have to - // set `locationUri` and then remove the directory after creating the external table: - val tablePath = defaultTablePath(newTableDefinition.identifier) + // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary + // directory that doesn't exist yet but can definitely be successfully created, and then + // delete it right after creating the external data source table. This location will be + // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't + // really use it. Also, since we only do this workaround for external tables, deleting the + // directory after the fact doesn't do any harm. + // + // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. + + val tempPath = new Path(defaultTablePath(tableId), "-__PLACEHOLDER__").toString + try { externalCatalog.createTable( db, - newTableDefinition.withNewStorage(locationUri = Some(tablePath)), + newTableDefinition.withNewStorage(locationUri = Some(tempPath)), ignoreIfExists) } finally { - val path = new Path(tablePath) + val path = new Path(tempPath) FileSystem.get(path.toUri, hadoopConf).delete(path, true) } } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 20e22baa351a9..f48b8cde422ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -556,9 +556,9 @@ class SparkSession private( } - /* ------------------------ * - | Catalog-related methods | - * ----------------- ------ */ + /* ------------------------- * + | Catalog-related methods | + * ------------------------- */ /** * Interface through which the user may create, drop, alter or query underlying diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 9956c5b09236d..23463dee748cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -400,7 +400,10 @@ object CreateDataSourceTableUtils extends Logging { schema = relation.schema.map { f => CatalogColumn(f.name, f.dataType.catalogString) }, - properties = tableProperties.toMap, + // Removes the provider property since we are gonna saving this table as a Hive compatible + // one, and other places use this property to check whether a table is a data source table + // (e.g. `DDLUtils.isDatasourceTable`). + properties = (tableProperties - "spark.sql.sources.provider").toMap, viewText = None) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6e09aa8e9c454..6502acfd5fb42 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -377,7 +377,7 @@ private[hive] class HiveClientImpl( // allows directory paths as location URIs while Spark SQL data source tables also // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL // data source tables. - DDLUtils.isDatasourceTable(properties) + DDLUtils.isDatasourceTable(properties) && h.getTableType == HiveTableType.EXTERNAL_TABLE }, inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), From 0aedf7b9fd24288dc6fda84157789eca20348528 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 24 May 2016 18:14:54 -0700 Subject: [PATCH 3/8] Fixes more test failures --- .../org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala | 1 + .../spark/sql/execution/command/createDataSourceTables.scala | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 22cc813115cb2..2cea2cdab73b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -221,6 +221,7 @@ class SessionCatalog( if ( // If this is an external data source table tableDefinition.properties.contains("spark.sql.sources.provider") && + tableDefinition.storage.locationUri.isEmpty && newTableDefinition.tableType == CatalogTableType.EXTERNAL ) { // !! HACK ALERT !! diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 23463dee748cb..1f7264868e7ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -403,7 +403,7 @@ object CreateDataSourceTableUtils extends Logging { // Removes the provider property since we are gonna saving this table as a Hive compatible // one, and other places use this property to check whether a table is a data source table // (e.g. `DDLUtils.isDatasourceTable`). - properties = (tableProperties - "spark.sql.sources.provider").toMap, + properties = tableProperties.toMap, viewText = None) } From 505f3f0a58db6298745bb474cd7a0d22b12adb37 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 25 May 2016 14:14:37 -0700 Subject: [PATCH 4/8] Handles Hive compatible tables properly --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 ++++---- .../sql/execution/command/createDataSourceTables.scala | 3 --- .../scala/org/apache/spark/sql/internal/HiveSerDe.scala | 1 - .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 8 +++++++- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 2cea2cdab73b1..a9b9d20868cac 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -235,16 +235,16 @@ class SessionCatalog( // // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. - val tempPath = new Path(defaultTablePath(tableId), "-__PLACEHOLDER__").toString + val tempPath = + new Path(defaultTablePath(tableId).stripSuffix(Path.SEPARATOR) + "-__PLACEHOLDER__") try { externalCatalog.createTable( db, - newTableDefinition.withNewStorage(locationUri = Some(tempPath)), + newTableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), ignoreIfExists) } finally { - val path = new Path(tempPath) - FileSystem.get(path.toUri, hadoopConf).delete(path, true) + FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) } } else { externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 1f7264868e7ee..9956c5b09236d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -400,9 +400,6 @@ object CreateDataSourceTableUtils extends Logging { schema = relation.schema.map { f => CatalogColumn(f.name, f.dataType.catalogString) }, - // Removes the provider property since we are gonna saving this table as a Hive compatible - // one, and other places use this property to check whether a table is a data source table - // (e.g. `DDLUtils.isDatasourceTable`). properties = tableProperties.toMap, viewText = None) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala index 38317d46dd82d..d554937d8b400 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala @@ -77,4 +77,3 @@ object HiveSerDe { serdeMap.get(key) } } - diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 6502acfd5fb42..47fa41823cd09 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -377,7 +377,13 @@ private[hive] class HiveClientImpl( // allows directory paths as location URIs while Spark SQL data source tables also // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL // data source tables. - DDLUtils.isDatasourceTable(properties) && h.getTableType == HiveTableType.EXTERNAL_TABLE + DDLUtils.isDatasourceTable(properties) && + h.getTableType == HiveTableType.EXTERNAL_TABLE && + // Spark SQL may also save external data source in Hive compatible format when + // possible, so that these tables can be directly accessed by Hive. For these tables, + // `dataLocation` is still necessary. Here we also check for input format class + // because only these Hive compatible tables set this field. + h.getInputFormatClass == null }, inputFormat = Option(h.getInputFormatClass).map(_.getName), outputFormat = Option(h.getOutputFormatClass).map(_.getName), From 04af79d641c091559b1c6b0d604569b09b5992ac Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 25 May 2016 16:46:43 -0700 Subject: [PATCH 5/8] Fixes newly added test case --- .../org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 095983d6dda72..3d8123d3c06d8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -1111,7 +1111,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv spark.range(1).write.json(path) withTable("t") { - sql(s"CREATE TABLE t USING json OPTIONS (PATH '/tmp/test.json')") + sql(s"CREATE TABLE t USING json OPTIONS (PATH '$path')") sql("DROP TABLE t") sql(s"CREATE TABLE t USING json AS SELECT 1 AS c") } From 62412890e99036f1448932d7cc56f4be59fafefc Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 25 May 2016 16:50:25 -0700 Subject: [PATCH 6/8] More comments --- .../spark/sql/catalyst/catalog/SessionCatalog.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index a9b9d20868cac..62da7b47a324b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -219,10 +219,13 @@ class SessionCatalog( requireDbExists(db) if ( - // If this is an external data source table + // If this is an external data source table... tableDefinition.properties.contains("spark.sql.sources.provider") && - tableDefinition.storage.locationUri.isEmpty && - newTableDefinition.tableType == CatalogTableType.EXTERNAL + newTableDefinition.tableType == CatalogTableType.EXTERNAL && + // ... that is not persisted as Hive compatible format (external tables in Hive compatible + // format always set `locationUri` to the actual data location and should NOT be hacked as + // following.) + tableDefinition.storage.locationUri.isEmpty ) { // !! HACK ALERT !! // From 336fb55406ad19eb7cc7276cd771ebd92ed8dec1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 30 May 2016 21:42:06 -0700 Subject: [PATCH 7/8] Moves changes in SessionCatalog to HiveExternalCatalog --- .../sql/catalyst/catalog/SessionCatalog.scala | 41 ++---------------- .../spark/sql/hive/HiveExternalCatalog.scala | 43 +++++++++++++++++-- .../spark/sql/hive/HiveSharedState.scala | 4 +- .../sql/hive/HiveExternalCatalogSuite.scala | 3 +- 4 files changed, 47 insertions(+), 44 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 62da7b47a324b..371c198aa3493 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -212,46 +212,11 @@ class SessionCatalog( * If no such database is specified, create it in the current database. */ def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { - val tableId = tableDefinition.identifier - val db = formatDatabaseName(tableId.database.getOrElse(getCurrentDatabase)) - val table = formatTableName(tableId.table) + val db = formatDatabaseName(tableDefinition.identifier.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableDefinition.identifier.table) val newTableDefinition = tableDefinition.copy(identifier = TableIdentifier(table, Some(db))) requireDbExists(db) - - if ( - // If this is an external data source table... - tableDefinition.properties.contains("spark.sql.sources.provider") && - newTableDefinition.tableType == CatalogTableType.EXTERNAL && - // ... that is not persisted as Hive compatible format (external tables in Hive compatible - // format always set `locationUri` to the actual data location and should NOT be hacked as - // following.) - tableDefinition.storage.locationUri.isEmpty - ) { - // !! HACK ALERT !! - // - // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary - // directory that doesn't exist yet but can definitely be successfully created, and then - // delete it right after creating the external data source table. This location will be - // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't - // really use it. Also, since we only do this workaround for external tables, deleting the - // directory after the fact doesn't do any harm. - // - // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. - - val tempPath = - new Path(defaultTablePath(tableId).stripSuffix(Path.SEPARATOR) + "-__PLACEHOLDER__") - - try { - externalCatalog.createTable( - db, - newTableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), - ignoreIfExists) - } finally { - FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) - } - } else { - externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) - } + externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 54f0dbd10cd52..525ab1f44e543 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -21,6 +21,7 @@ import java.util import scala.util.control.NonFatal +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.hive.ql.metadata.HiveException import org.apache.thrift.TException @@ -36,7 +37,9 @@ import org.apache.spark.sql.hive.client.HiveClient * A persistent implementation of the system catalog using Hive. * All public methods must be synchronized for thread-safety. */ -private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging { +private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration) + extends ExternalCatalog with Logging { + import CatalogTypes.TablePartitionSpec // Exceptions thrown by the hive client that we would like to wrap @@ -75,7 +78,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat } private def requireDbMatches(db: String, table: CatalogTable): Unit = { - if (table.identifier.database != Some(db)) { + if (!table.identifier.database.contains(db)) { throw new AnalysisException( s"Provided database '$db' does not match the one specified in the " + s"table definition (${table.identifier.database.getOrElse("n/a")})") @@ -149,7 +152,41 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat ignoreIfExists: Boolean): Unit = withClient { requireDbExists(db) requireDbMatches(db, tableDefinition) - client.createTable(tableDefinition, ignoreIfExists) + + if ( + // If this is an external data source table... + tableDefinition.properties.contains("spark.sql.sources.provider") && + tableDefinition.tableType == CatalogTableType.EXTERNAL && + // ... that is not persisted as Hive compatible format (external tables in Hive compatible + // format always set `locationUri` to the actual data location and should NOT be hacked as + // following.) + tableDefinition.storage.locationUri.isEmpty + ) { + // !! HACK ALERT !! + // + // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary + // directory that doesn't exist yet but can definitely be successfully created, and then + // delete it right after creating the external data source table. This location will be + // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't + // really use it. Also, since we only do this workaround for external tables, deleting the + // directory after the fact doesn't do any harm. + // + // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details. + val tempPath = { + val dbLocation = getDatabase(tableDefinition.database).locationUri + new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__") + } + + try { + client.createTable( + tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + ignoreIfExists) + } finally { + FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) + } + } else { + client.createTable(tableDefinition, ignoreIfExists) + } } override def dropTable( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala index f0d96403e8551..a0106ee882e76 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala @@ -51,6 +51,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext) /** * A catalog that interacts with the Hive metastore. */ - override lazy val externalCatalog = new HiveExternalCatalog(metadataHive) - + override lazy val externalCatalog = + new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala index bf9935ae41b30..175889b08b49f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala @@ -37,7 +37,8 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite { protected override val utils: CatalogTestUtils = new CatalogTestUtils { override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" - override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client) + override def newEmptyCatalog(): ExternalCatalog = + new HiveExternalCatalog(client, new Configuration()) } protected override def resetState(): Unit = client.reset() From 7d0122f9e59e83683b1539c9c8e2be3b2c6f5658 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 31 May 2016 15:07:39 -0700 Subject: [PATCH 8/8] Reverts minor refactoring that is not supported by Scala 2.10 --- .../scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 525ab1f44e543..b8bc9ab900ad1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -78,7 +78,7 @@ private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configu } private def requireDbMatches(db: String, table: CatalogTable): Unit = { - if (!table.identifier.database.contains(db)) { + if (table.identifier.database != Some(db)) { throw new AnalysisException( s"Provided database '$db' does not match the one specified in the " + s"table definition (${table.identifier.database.getOrElse("n/a")})")