From 2ab4928a952166add8ec1867a1f9866ddf57bbe7 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 18 Jan 2017 00:15:23 +0800 Subject: [PATCH 1/8] [SPARK-19257][SQL]CatalogStorageFormat.locationUri should be java.net.URI --- .../catalyst/catalog/InMemoryCatalog.scala | 9 +++++---- .../sql/catalyst/catalog/interface.scala | 9 +++++---- .../catalog/ExternalCatalogSuite.scala | 18 +++++++++-------- .../catalog/SessionCatalogSuite.scala | 6 ++++-- .../spark/sql/execution/SparkSqlParser.scala | 5 +++-- .../command/createDataSourceTables.scala | 11 ++++++---- .../spark/sql/execution/command/ddl.scala | 11 ++++++---- .../spark/sql/execution/command/tables.scala | 6 +++--- .../datasources/CatalogFileIndex.scala | 2 +- .../execution/datasources/DataSource.scala | 6 ++++-- .../datasources/DataSourceStrategy.scala | 2 +- .../execution/command/DDLCommandSuite.scala | 6 ++++-- .../sql/execution/command/DDLSuite.scala | 3 ++- .../spark/sql/internal/CatalogSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 20 +++++++++---------- .../spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- .../spark/sql/hive/MetastoreRelation.scala | 8 ++++---- .../sql/hive/client/HiveClientImpl.scala | 11 +++++----- .../spark/sql/hive/client/HiveShim.scala | 4 ++-- ...nalCatalogBackwardCompatibilitySuite.scala | 14 ++++++------- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 4 ++-- .../sql/hive/MetastoreDataSourcesSuite.scala | 3 ++- .../spark/sql/hive/client/VersionsSuite.scala | 3 ++- 23 files changed, 93 insertions(+), 72 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1bdf..2a3f5907465d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import java.io.IOException +import java.net.URI import scala.collection.mutable @@ -211,7 +212,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to create table $table as failed " + s"to create its directory $defaultTableLocation", e) } - tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString)) + tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri)) } else { tableDefinition } @@ -283,7 +284,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + s"to rename its directory $oldDir", e) } - oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString)) + oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri)) } catalog(db).tables.put(newName, oldDesc) @@ -392,7 +393,7 @@ class InMemoryCatalog( existingParts.put( p.spec, - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))) } } @@ -465,7 +466,7 @@ class InMemoryCatalog( } oldPartition.copy( spec = newSpec, - storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString))) + storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri))) } else { oldPartition.copy(spec = newSpec) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 2adccdd7bf61d..e22bf88890a4a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI import java.util.Date import scala.collection.mutable @@ -50,7 +51,7 @@ case class CatalogStorageFormat( // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and // path.toUri respectively before use as a filesystem path due to URI char escaping. - locationUri: Option[String], + locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], @@ -104,7 +105,7 @@ case class CatalogTablePartition( } /** Return the partition location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: String = storage.locationUri.map(_.toString).getOrElse { val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") throw new AnalysisException(s"Partition [$specString] did not specify locationUri") } @@ -194,7 +195,7 @@ case class CatalogTable( } /** Return the table location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: String = storage.locationUri.map(_.toString).getOrElse { throw new AnalysisException(s"table $identifier did not specify locationUri") } @@ -244,7 +245,7 @@ case class CatalogTable( /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( - locationUri: Option[String] = storage.locationUri, + locationUri: Option[URI] = storage.locationUri, inputFormat: Option[String] = storage.inputFormat, outputFormat: Option[String] = storage.outputFormat, compressed: Boolean = false, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 91f464b47aae2..a7162ae091720 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -366,10 +368,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partition1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), - storageFormat.copy(locationUri = Some(newLocationPart1))) + storageFormat.copy(locationUri = Some(new URI(newLocationPart1)))) val partition2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), - storageFormat.copy(locationUri = Some(newLocationPart2))) + storageFormat.copy(locationUri = Some(new URI(newLocationPart2)))) catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false) catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false) @@ -561,8 +563,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))))) val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec) val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec) assert(newPart1.storage.locationUri == Some(newLocation)) @@ -743,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("external_table", Some("db1")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( - Some(Utils.createTempDir().getAbsolutePath), + Some(new URI(Utils.createTempDir().getAbsolutePath)), None, None, None, false, Map.empty), schema = new StructType().add("a", "int").add("b", "string"), provider = Some("hive") @@ -791,7 +793,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithExistingDir = CatalogTablePartition( Map("partCol1" -> "7", "partCol2" -> "8"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) @@ -800,7 +802,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithNonExistingDir = CatalogTablePartition( Map("partCol1" -> "9", "partCol2" -> "10"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) assert(tempPath.exists()) @@ -893,7 +895,7 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)), + storage = storageFormat.copy(locationUri = Some(new URI(Utils.createTempDir().getAbsolutePath))), schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index ae93dffbab8d2..6113749a9d748 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -843,8 +845,8 @@ class SessionCatalogSuite extends PlanTest { val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))))) val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) assert(newPart1.storage.locationUri == Some(newLocation)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 41768d451261a..01e80a0b960ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import scala.collection.JavaConverters._ +import java.net.URI import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode @@ -373,7 +374,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) - val location = Option(ctx.locationSpec).map(visitLocationSpec) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new URI(_)) val storage = DataSource.buildStorageFormatFromOptions(options) if (location.isDefined && storage.locationUri.isDefined) { @@ -1068,7 +1069,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(CatalogStorageFormat.empty) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) .getOrElse(CatalogStorageFormat.empty) - val location = Option(ctx.locationSpec).map(visitLocationSpec) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new URI(_)) // If we are creating an EXTERNAL table, then the LOCATION field is required if (external && location.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index beeba05554dde..5f337394a0c63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.net.URI + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -54,7 +56,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUri.map("path" -> _.toString) // Fill in some default table options from the session conf val tableWithDefaultOptions = table.copy( identifier = table.identifier.copy( @@ -142,15 +144,16 @@ case class CreateDataSourceTableAsSelectCommand( } saveDataIntoTable( - sparkSession, table, table.storage.locationUri, query, mode, tableExists = true) + sparkSession, table, table.storage.locationUri.map(_.toString), + query, mode, tableExists = true) } else { val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { - Some(sessionState.catalog.defaultTablePath(table.identifier)) + Some(new URI(sessionState.catalog.defaultTablePath(table.identifier))) } else { table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, mode, tableExists = false) + sparkSession, table, tableLocation.map(_.toString), query, mode, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 82cbb4aa47445..fecec82d0d4f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.net.URI + import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool @@ -426,7 +428,8 @@ case class AlterTableAddPartitionCommand( table.identifier.quotedString, sparkSession.sessionState.conf.resolver) // inherit table storage format (possibly except for location) - CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location)) + CatalogTablePartition(normalizedSpec, table.storage + .copy(locationUri = location.map(new URI(_)))) } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) Seq.empty[Row] @@ -710,7 +713,7 @@ case class AlterTableRecoverPartitionsCommand( // inherit table storage format (possibly except for location) CatalogTablePartition( spec, - table.storage.copy(locationUri = Some(location.toUri.toString)), + table.storage.copy(locationUri = Some(location.toUri)), params) } spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) @@ -748,11 +751,11 @@ case class AlterTableSetLocationCommand( sparkSession, table, "ALTER TABLE ... SET LOCATION") // Partition spec is specified, so we set the location only for this partition val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location))) + val newPart = part.copy(storage = part.storage.copy(locationUri = Some(new URI(location)))) catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(location))) + catalog.alterTable(table.withNewStorage(locationUri = Some(new URI(location)))) } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 246894813c3b9..b6817bbd03c47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -491,7 +491,7 @@ case class DescribeTableCommand( append(buffer, "Owner:", table.owner, "") append(buffer, "Create Time:", new Date(table.createTime).toString, "") append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") - append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", table.storage.locationUri.getOrElse("").toString, "") append(buffer, "Table Type:", table.tableType.name, "") table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) @@ -581,7 +581,7 @@ case class DescribeTableCommand( append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") append(buffer, "Database:", table.database, "") append(buffer, "Table:", tableIdentifier.table, "") - append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", partition.storage.locationUri.getOrElse("").toString, "") append(buffer, "Partition Parameters:", "", "") partition.parameters.foreach { case (key, value) => append(buffer, s" $key", value, "") @@ -947,7 +947,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman // when the table creation DDL contains the PATH option. None } else { - Some(s"path '${escapeSingleQuotedString(location)}'") + Some(s"path '${escapeSingleQuotedString(location.toString)}'") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 1235a4b12f1d0..a8825fb2effc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -46,7 +46,7 @@ class CatalogFileIndex( assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFileIndex") - private val baseLocation: Option[String] = table.storage.locationUri + private val baseLocation: Option[String] = table.storage.locationUri.map(_.toString) override def partitionSchema: StructType = table.partitionSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 29afe5751bc16..a1874fb8a00d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.net.URI import java.util.{ServiceConfigurationError, ServiceLoader} import scala.collection.JavaConverters._ @@ -25,8 +26,8 @@ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.fs.Path - import org.apache.spark.deploy.SparkHadoopUtil + import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable} @@ -642,6 +643,7 @@ object DataSource { def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = new CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") - CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath) + CatalogStorageFormat.empty.copy(locationUri = path.map(new URI(_)), + properties = optionsWithoutPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 3d3db06eee0c6..129c1268b4ac0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -223,7 +223,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] sparkSession: SparkSession, simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = { val table = simpleCatalogRelation.catalogTable - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUri.map("path" -> _.toString) val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 76bb9e5929a71..3860bae6a0237 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.command -import scala.reflect.{classTag, ClassTag} +import java.net.URI + +import scala.reflect.{ClassTag, classTag} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ @@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest { val expectedTableDesc = CatalogTable( identifier = TableIdentifier("my_tab"), tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")), + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), schema = new StructType().add("a", IntegerType).add("b", StringType), provider = Some("parquet")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index ac3878e849051..d7f8954fd393a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.command import java.io.File +import java.net.URI import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -78,7 +79,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = { val storage = CatalogStorageFormat( - locationUri = Some(catalog.defaultTablePath(name)), + locationUri = Some(new URI(catalog.defaultTablePath(name))), inputFormat = None, outputFormat = None, serde = None, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 801912f44174f..05b8591dfecfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -480,7 +480,7 @@ class CatalogSuite options = Map.empty[String, String]) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.tableType == CatalogTableType.MANAGED) - val tablePath = new File(new URI(table.storage.locationUri.get)) + val tablePath = new File(table.storage.locationUri.get) assert(tablePath.exists() && tablePath.listFiles().isEmpty) Seq((1)).toDF("i").write.insertInto("t") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 208c8c9d5d0cf..802bdaf372cb9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri.isEmpty val tableLocation = if (needDefaultTableLocation) { - Some(defaultTablePath(tableDefinition.identifier)) + Some(new URI(defaultTablePath(tableDefinition.identifier))) } else { tableDefinition.storage.locationUri } @@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. val storagePropsWithLocation = table.storage.properties ++ - table.storage.locationUri.map("path" -> _) + table.storage.locationUri.map("path" -> _.toString) // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and // bucket specification to empty. Note that partition columns are retained, so that we can @@ -285,7 +285,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // compatible format, which means the data source is file-based and must have a `path`. require(table.storage.locationUri.isDefined, "External file-based data source table must have a `path` entry in storage properties.") - Some(new Path(table.location).toUri.toString) + Some(new Path(table.location).toUri) } else { None } @@ -438,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) @@ -482,8 +482,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterTable(oldName, newTable) } - private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { - new CaseInsensitiveMap(table.storage.properties).get("path") + private def getLocationFromStorageProps(table: CatalogTable): Option[URI] = { + new CaseInsensitiveMap(table.storage.properties).get("path").map(new URI(_)) } private def updateLocationInStorageProps( @@ -565,7 +565,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newLocation = tableDefinition.storage.locationUri val storageWithPathOption = tableDefinition.storage.copy( - properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) + properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _.toString)) val oldLocation = getLocationFromStorageProps(oldTableDef) if (oldLocation == newLocation) { @@ -854,10 +854,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, Hive metastore is not case preserving and will generate wrong partition location // with lower cased partition column names. Here we set the default partition location // manually to avoid this problem. - val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse { + val partitionPath = p.storage.locationUri.map(uri => new Path(uri)).getOrElse { ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) } - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))) } val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) @@ -903,7 +903,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case e: IOException => throw new SparkException( s"Unable to rename partition path from $wrongPath to $rightPath", e) } - partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) + partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri))) } alterPartitions(db, table, newParts) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e4b1f6ae3e49e..4f59580eda65a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -72,7 +72,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log logDebug(s"Creating new cached data source for $in") val table = sparkSession.sharedState.externalCatalog.getTable(in.database, in.name) - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUri.map("path" -> _.toString) val dataSource = DataSource( sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 7254f73f41f37..f316ce421f782 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -98,7 +98,7 @@ private[hive] case class MetastoreRelation( sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - catalogTable.storage.locationUri.foreach(sd.setLocation) + catalogTable.storage.locationUri.foreach(u => sd.setLocation(u.toString)) catalogTable.storage.inputFormat.foreach(sd.setInputFormat) catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) @@ -182,7 +182,7 @@ private[hive] case class MetastoreRelation( } sd.setCols(schema.asJava) - p.storage.locationUri.foreach(sd.setLocation) + p.storage.locationUri.foreach(u => sd.setLocation(u.toString)) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -250,10 +250,10 @@ private[hive] case class MetastoreRelation( .flatMap(_.storage.locationUri) .toArray if (partLocations.nonEmpty) { - partLocations + partLocations.map(_.toString) } else { Array( - catalogTable.storage.locationUri.getOrElse( + catalogTable.storage.locationUri.map(_.toString).getOrElse( sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 5c0e2f6ec4941..2a97069ab484b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} +import java.net.URI import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -27,7 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} +import org.apache.hadoop.hive.metastore.api.{FieldSchema, Database => HiveDatabase} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} @@ -407,7 +408,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h), + locationUri = shim.getDataLocation(h).map(new URI(_)), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -842,7 +843,7 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(conf.getUser) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc.toString) } table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -867,7 +868,7 @@ private[hive] class HiveClientImpl( } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(storageDesc.setLocation) + p.storage.locationUri.foreach(u => storageDesc.setLocation(u.toString)) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) @@ -885,7 +886,7 @@ private[hive] class HiveClientImpl( CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), + locationUri = Option(apiPartition.getSd.getLocation).map(new URI(_)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index b052f1e7e43f5..9174bc40153ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -271,7 +271,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map( - uri => new Path(table.getPath, new Path(new URI(uri)))).orNull + uri => new Path(table.getPath, new Path(uri))).orNull val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { @@ -469,7 +469,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition( - s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull) + s.spec.asJava, s.storage.locationUri.map(u => new Path(u).toString).orNull) if (s.parameters.nonEmpty) { addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index 00fdfbcebbe85..c7679f512dd52 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -81,7 +81,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl2", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), + locationUri = Some(new URI(tempDirUri)), inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), schema = simpleSchema) @@ -167,7 +167,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl7", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"), + locationUri = Some(new URI(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map( @@ -179,7 +179,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl8", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), + locationUri = Some(new URI(tempDirUri)), properties = Map("path" -> tempDirUri)), schema = simpleSchema, properties = Map( @@ -191,7 +191,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl9", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"), + locationUri = Some(new URI(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map("spark.sql.sources.provider" -> "json")) @@ -215,7 +215,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest if (tbl.tableType == CatalogTableType.EXTERNAL) { // trim the URI prefix - val tableLocation = new URI(readBack.storage.locationUri.get).getPath + val tableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = tempDir.toURI.getPath.stripSuffix("/") assert(tableLocation == expectedLocation) } @@ -231,7 +231,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest val readBack = getTableMetadata(tbl.identifier.table) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expected = dir.toURI.getPath.stripSuffix("/") assert(actualTableLocation == expected) } @@ -247,7 +247,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest assert(readBack.schema.sameType(expectedSchema)) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) { tempDir.toURI.getPath.stripSuffix("/") } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 8f0d5d886c9d5..6bb5c19466da4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -868,14 +868,14 @@ object SPARK_18360 { val rawTable = hiveClient.getTable("default", "test_tbl") // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table // location for tables in default database. - assert(rawTable.storage.locationUri.get.contains(newWarehousePath)) + assert(rawTable.storage.locationUri.map(_.toString).get.contains(newWarehousePath)) hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") // Spark SQL will use the location of default database to generate default table // location for tables in default database. - assert(readBack.storage.locationUri.get.contains(defaultDbLocation)) + assert(readBack.storage.locationUri.map(_.toString).get.contains(defaultDbLocation)) } finally { hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 081f6f6d8263f..765220380de34 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.net.URI import scala.collection.mutable.ArrayBuffer @@ -1015,7 +1016,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier("not_skip_hive_metadata"), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempPath.getCanonicalPath), + locationUri = Some(new URI(tempPath.getCanonicalPath)), properties = Map("skipHiveMetadata" -> "false") ), schema = schema, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 5cb8519d2a9af..04c548c5215bc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -346,7 +347,7 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet val spec = Map("key1" -> "1", "key2" -> "2") val newLocation = Utils.createTempDir().getPath() val storage = storageFormat.copy( - locationUri = Some(newLocation), + locationUri = Some(new URI(newLocation)), // needed for 0.12 alter partitions serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) From 758c8b558c3758a59ee2893c4460a39e59037205 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 18 Jan 2017 00:25:00 +0800 Subject: [PATCH 2/8] remove a comment and fix code style --- .../org/apache/spark/sql/catalyst/catalog/interface.scala | 3 --- .../scala/org/apache/spark/sql/execution/SparkSqlParser.scala | 3 ++- .../apache/spark/sql/execution/datasources/DataSource.scala | 2 +- .../org/apache/spark/sql/hive/client/HiveClientImpl.scala | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index e22bf88890a4a..8949a1e8b0ab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -48,9 +48,6 @@ case class CatalogFunction( * Storage format, used to describe how a partition or a table is stored. */ case class CatalogStorageFormat( - // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must - // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and - // path.toUri respectively before use as a filesystem path due to URI char escaping. locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 01e80a0b960ec..ed32c1aa8f94f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -17,9 +17,10 @@ package org.apache.spark.sql.execution -import scala.collection.JavaConverters._ import java.net.URI +import scala.collection.JavaConverters._ + import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index a1874fb8a00d0..ca34f79383842 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -26,8 +26,8 @@ import scala.util.{Failure, Success, Try} import scala.util.control.NonFatal import org.apache.hadoop.fs.Path -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 2a97069ab484b..a247593f465dd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.{TableType => HiveTableType} -import org.apache.hadoop.hive.metastore.api.{FieldSchema, Database => HiveDatabase} +import org.apache.hadoop.hive.metastore.api.{Database => HiveDatabase, FieldSchema} import org.apache.hadoop.hive.metastore.api.{SerDeInfo, StorageDescriptor} import org.apache.hadoop.hive.ql.Driver import org.apache.hadoop.hive.ql.metadata.{Hive, Partition => HivePartition, Table => HiveTable} From 5b5debff77ea4c1c34075df53fafbc38c82b5d26 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 18 Jan 2017 00:32:53 +0800 Subject: [PATCH 3/8] fix code style --- .../spark/sql/catalyst/catalog/ExternalCatalogSuite.scala | 3 ++- .../apache/spark/sql/execution/command/DDLCommandSuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index a7162ae091720..778fe4815cc76 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -895,7 +895,8 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat.copy(locationUri = Some(new URI(Utils.createTempDir().getAbsolutePath))), + storage = storageFormat.copy(locationUri = Some(new URI(Utils.createTempDir() + .getAbsolutePath))), schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 3860bae6a0237..5f780cf48c311 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command import java.net.URI -import scala.reflect.{ClassTag, classTag} +import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ From 635a3985ce99f8e1d47efb18415116e725e16a88 Mon Sep 17 00:00:00 2001 From: windpiger Date: Wed, 18 Jan 2017 01:05:03 +0800 Subject: [PATCH 4/8] merge with master --- .../scala/org/apache/spark/sql/execution/command/DDLSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 3f01b0dd251ee..2efee172d7aff 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1802,7 +1802,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) // the previous table path should be still there. - assert(new File(new URI(defaultTablePath)).exists()) + assert(new File(defaultTablePath).exists()) sql("INSERT INTO tbl SELECT 2") checkAnswer(spark.table("tbl"), Row(2)) From 256f65fdd5f8ba304729fa88190209f007446302 Mon Sep 17 00:00:00 2001 From: windpiger Date: Fri, 20 Jan 2017 21:36:18 +0800 Subject: [PATCH 5/8] path.toUri and uri.getPath --- .../spark/sql/catalyst/catalog/interface.scala | 4 ++-- .../sql/catalyst/catalog/ExternalCatalogSuite.scala | 5 ++--- .../apache/spark/sql/execution/SparkSqlParser.scala | 7 +++---- .../execution/command/createDataSourceTables.scala | 6 +++--- .../org/apache/spark/sql/execution/command/ddl.scala | 6 ++++-- .../apache/spark/sql/execution/command/tables.scala | 6 +++--- .../sql/execution/datasources/CatalogFileIndex.scala | 2 +- .../spark/sql/execution/datasources/DataSource.scala | 2 +- .../execution/datasources/DataSourceStrategy.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 6 +++--- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 12 +++++++----- .../apache/spark/sql/hive/MetastoreRelation.scala | 8 ++++---- .../spark/sql/hive/client/HiveClientImpl.scala | 8 ++++---- ...veExternalCatalogBackwardCompatibilitySuite.scala | 4 ++-- .../apache/spark/sql/hive/HiveSparkSubmitSuite.scala | 4 ++-- .../hive/PartitionProviderCompatibilitySuite.scala | 1 + .../apache/spark/sql/hive/client/VersionsSuite.scala | 2 +- 17 files changed, 44 insertions(+), 41 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 25d0a3a9f3264..c45c4188675cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -102,7 +102,7 @@ case class CatalogTablePartition( } /** Return the partition location, assuming it is specified. */ - def location: String = storage.locationUri.map(_.toString).getOrElse { + def location: String = storage.locationUri.map(_.getPath).getOrElse { val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") throw new AnalysisException(s"Partition [$specString] did not specify locationUri") } @@ -192,7 +192,7 @@ case class CatalogTable( } /** Return the table location, assuming it is specified. */ - def location: String = storage.locationUri.map(_.toString).getOrElse { + def location: String = storage.locationUri.map(_.getPath).getOrElse { throw new AnalysisException(s"table $identifier did not specify locationUri") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 36dfd759143be..43254ab570d59 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -745,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("external_table", Some("db1")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( - Some(new URI(Utils.createTempDir().getAbsolutePath)), + Some(Utils.createTempDir().toURI), None, None, None, false, Map.empty), schema = new StructType().add("a", "int").add("b", "string"), provider = Some("hive") @@ -897,8 +897,7 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat.copy(locationUri = Some(new URI(Utils.createTempDir() - .getAbsolutePath))), + storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)), schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index ed32c1aa8f94f..c2c0f5910cc5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -20,10 +20,9 @@ package org.apache.spark.sql.execution import java.net.URI import scala.collection.JavaConverters._ - import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode - +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -375,7 +374,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) - val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new URI(_)) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri) val storage = DataSource.buildStorageFormatFromOptions(options) if (location.isDefined && storage.locationUri.isDefined) { @@ -1070,7 +1069,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(CatalogStorageFormat.empty) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) .getOrElse(CatalogStorageFormat.empty) - val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new URI(_)) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri) // If we are creating an EXTERNAL table, then the LOCATION field is required if (external && location.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 5f337394a0c63..bcb0aa9c4d99c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -56,7 +56,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. - val pathOption = table.storage.locationUri.map("path" -> _.toString) + val pathOption = table.storage.locationUri.map("path" -> _.getPath) // Fill in some default table options from the session conf val tableWithDefaultOptions = table.copy( identifier = table.identifier.copy( @@ -144,7 +144,7 @@ case class CreateDataSourceTableAsSelectCommand( } saveDataIntoTable( - sparkSession, table, table.storage.locationUri.map(_.toString), + sparkSession, table, table.storage.locationUri.map(_.getPath), query, mode, tableExists = true) } else { val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { @@ -153,7 +153,7 @@ case class CreateDataSourceTableAsSelectCommand( table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation.map(_.toString), query, mode, tableExists = false) + sparkSession, table, tableLocation.map(_.getPath), query, mode, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index fecec82d0d4f3..89cb12c3985b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -751,11 +751,13 @@ case class AlterTableSetLocationCommand( sparkSession, table, "ALTER TABLE ... SET LOCATION") // Partition spec is specified, so we set the location only for this partition val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy(locationUri = Some(new URI(location)))) + val newPart = part.copy(storage = part.storage.copy( + locationUri = Some(new Path(location).toUri)) + ) catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(new URI(location)))) + catalog.alterTable(table.withNewStorage(locationUri = Some(new Path(location).toUri))) } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 70a28ff0ff6fa..7db6180d2cbe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -491,7 +491,7 @@ case class DescribeTableCommand( append(buffer, "Owner:", table.owner, "") append(buffer, "Create Time:", new Date(table.createTime).toString, "") append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") - append(buffer, "Location:", table.storage.locationUri.getOrElse("").toString, "") + append(buffer, "Location:", table.storage.locationUri.map(_.getPath).getOrElse(""), "") append(buffer, "Table Type:", table.tableType.name, "") table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) @@ -581,7 +581,7 @@ case class DescribeTableCommand( append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") append(buffer, "Database:", table.database, "") append(buffer, "Table:", tableIdentifier.table, "") - append(buffer, "Location:", partition.storage.locationUri.getOrElse("").toString, "") + append(buffer, "Location:", partition.storage.locationUri.map(_.getPath).getOrElse(""), "") append(buffer, "Partition Parameters:", "", "") partition.parameters.foreach { case (key, value) => append(buffer, s" $key", value, "") @@ -947,7 +947,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman // when the table creation DDL contains the PATH option. None } else { - Some(s"path '${escapeSingleQuotedString(location.toString)}'") + Some(s"path '${escapeSingleQuotedString(location.getPath)}'") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index a8825fb2effc9..fae8f4e54cdcb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -46,7 +46,7 @@ class CatalogFileIndex( assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFileIndex") - private val baseLocation: Option[String] = table.storage.locationUri.map(_.toString) + private val baseLocation: Option[String] = table.storage.locationUri.map(_.getPath) override def partitionSchema: StructType = table.partitionSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 5efe6dc930af3..86139472158cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -613,7 +613,7 @@ object DataSource { def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = new CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") - CatalogStorageFormat.empty.copy(locationUri = path.map(new URI(_)), + CatalogStorageFormat.empty.copy(locationUri = path.map(new Path(_).toUri), properties = optionsWithoutPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 478b190d579ea..57ab0f038284d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -226,7 +226,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _.toString) + val pathOption = table.storage.locationUri.map("path" -> _.getPath) val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d42849c9ce552..b6980d49ca6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -79,7 +79,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = { val storage = CatalogStorageFormat( - locationUri = Some(new URI(catalog.defaultTablePath(name))), + locationUri = Some(new Path(catalog.defaultTablePath(name)).toUri), inputFormat = None, outputFormat = None, serde = None, @@ -1795,7 +1795,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE TABLE tbl(i INT) USING parquet") sql("INSERT INTO tbl SELECT 1") checkAnswer(spark.table("tbl"), Row(1)) - val defaultTablePath = spark.sessionState.catalog + val defaultTableUri = spark.sessionState.catalog .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") @@ -1803,7 +1803,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) // the previous table path should be still there. - assert(new File(defaultTablePath).exists()) + assert(new File(defaultTableUri).exists()) sql("INSERT INTO tbl SELECT 2") checkAnswer(spark.table("tbl"), Row(2)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 802bdaf372cb9..02f45c3062b8f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri.isEmpty val tableLocation = if (needDefaultTableLocation) { - Some(new URI(defaultTablePath(tableDefinition.identifier))) + Some(new Path(defaultTablePath(tableDefinition.identifier)).toUri) } else { tableDefinition.storage.locationUri } @@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. val storagePropsWithLocation = table.storage.properties ++ - table.storage.locationUri.map("path" -> _.toString) + table.storage.locationUri.map("path" -> _.getPath) // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and // bucket specification to empty. Note that partition columns are retained, so that we can @@ -438,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toUri)), + tableDefinition.withNewStorage(locationUri = Some(new URI(tempPath.toString))), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) @@ -565,7 +565,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val newLocation = tableDefinition.storage.locationUri val storageWithPathOption = tableDefinition.storage.copy( - properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _.toString)) + properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _.getPath)) val oldLocation = getLocationFromStorageProps(oldTableDef) if (oldLocation == newLocation) { @@ -903,7 +903,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case e: IOException => throw new SparkException( s"Unable to rename partition path from $wrongPath to $rightPath", e) } - partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toUri))) + partition.copy(storage = partition.storage.copy( + locationUri = Some(new URI(rightPath.toString))) + ) } alterPartitions(db, table, newParts) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index d45807c69c39f..afc889504168e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -97,7 +97,7 @@ private[hive] case class MetastoreRelation( sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - catalogTable.storage.locationUri.foreach(u => sd.setLocation(u.toString)) + catalogTable.storage.locationUri.foreach(u => sd.setLocation(u.getPath)) catalogTable.storage.inputFormat.foreach(sd.setInputFormat) catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) @@ -181,7 +181,7 @@ private[hive] case class MetastoreRelation( } sd.setCols(schema.asJava) - p.storage.locationUri.foreach(u => sd.setLocation(u.toString)) + p.storage.locationUri.foreach(u => sd.setLocation(u.getPath)) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -249,10 +249,10 @@ private[hive] case class MetastoreRelation( .flatMap(_.storage.locationUri) .toArray if (partLocations.nonEmpty) { - partLocations.map(_.toString) + partLocations.map(_.getPath) } else { Array( - catalogTable.storage.locationUri.map(_.toString).getOrElse( + catalogTable.storage.locationUri.map(_.getPath).getOrElse( sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 7b1790097356f..a4195260f66ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -408,7 +408,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map(new URI(_)), + locationUri = shim.getDataLocation(h).map(new Path(_).toUri), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -847,7 +847,7 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(conf.getUser) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc.toString) } + table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc.getPath) } table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -872,7 +872,7 @@ private[hive] class HiveClientImpl( } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(u => storageDesc.setLocation(u.toString)) + p.storage.locationUri.foreach(u => storageDesc.setLocation(u.getPath)) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) @@ -890,7 +890,7 @@ private[hive] class HiveClientImpl( CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation).map(new URI(_)), + locationUri = Option(apiPartition.getSd.getLocation).map(new Path(_).toUri), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index c7679f512dd52..21cec4dd29dab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -167,7 +167,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl7", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(new URI(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__")), + locationUri = Some(new URI(defaultTableURI("tbl7").getPath + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map( @@ -191,7 +191,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl9", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(new URI(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__")), + locationUri = Some(new URI(defaultTableURI("tbl9").getPath + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map("spark.sql.sources.provider" -> "json")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 6bb5c19466da4..88dbb1ca0e9e7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -868,14 +868,14 @@ object SPARK_18360 { val rawTable = hiveClient.getTable("default", "test_tbl") // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table // location for tables in default database. - assert(rawTable.storage.locationUri.map(_.toString).get.contains(newWarehousePath)) + assert(rawTable.storage.locationUri.map(_.getPath).get.contains(newWarehousePath)) hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") // Spark SQL will use the location of default database to generate default table // location for tables in default database. - assert(readBack.storage.locationUri.map(_.toString).get.contains(defaultDbLocation)) + assert(readBack.storage.locationUri.map(_.getPath).get.contains(defaultDbLocation)) } finally { hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index dca207a72d889..79ed511ffd0c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -265,6 +265,7 @@ class PartitionProviderCompatibilitySuite spark.sql(s""" |alter table test partition (A=5, B='%') |rename to partition (A=100, B='%')""".stripMargin) + val x = spark.sql("show partitions test").collect() assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0) assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 295c49c02ff4b..b5966cf13a587 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -353,7 +353,7 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) - .storage.locationUri == Some(newLocation)) + .storage.locationUri.get.getPath == newLocation) } test(s"$version: dropPartitions") { From b27596f70e2de13014e7c80f7e96a477c0aedd83 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 21 Jan 2017 16:14:37 +0800 Subject: [PATCH 6/8] fix some tc failed --- .../execution/command/createDataSourceTables.scala | 11 +++++------ .../apache/spark/sql/execution/command/ddl.scala | 4 ++-- .../spark/sql/execution/command/tables.scala | 8 ++++---- .../execution/datasources/CatalogFileIndex.scala | 2 +- .../sql/execution/datasources/DataSource.scala | 2 +- .../execution/datasources/DataSourceStrategy.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 14 ++++++++------ .../apache/spark/sql/hive/MetastoreRelation.scala | 10 +++++----- .../spark/sql/hive/client/HiveClientImpl.scala | 10 ++++++---- .../spark/sql/hive/HiveMetastoreCatalogSuite.scala | 2 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 4 ++-- .../spark/sql/hive/client/VersionsSuite.scala | 6 +++--- 12 files changed, 39 insertions(+), 36 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index bcb0aa9c4d99c..687ff7d1fd747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command -import java.net.URI +import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ @@ -56,7 +56,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. - val pathOption = table.storage.locationUri.map("path" -> _.getPath) + val pathOption = table.storage.locationUriString.map("path" -> _) // Fill in some default table options from the session conf val tableWithDefaultOptions = table.copy( identifier = table.identifier.copy( @@ -144,16 +144,15 @@ case class CreateDataSourceTableAsSelectCommand( } saveDataIntoTable( - sparkSession, table, table.storage.locationUri.map(_.getPath), - query, mode, tableExists = true) + sparkSession, table, table.storage.locationUriString, query, mode, tableExists = true) } else { val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { - Some(new URI(sessionState.catalog.defaultTablePath(table.identifier))) + Some(new Path(sessionState.catalog.defaultTablePath(table.identifier)).toUri) } else { table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation.map(_.getPath), query, mode, tableExists = false) + sparkSession, table, tableLocation.map(_.toString), query, mode, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 89cb12c3985b2..9723075128096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -752,12 +752,12 @@ case class AlterTableSetLocationCommand( // Partition spec is specified, so we set the location only for this partition val part = catalog.getPartition(table.identifier, spec) val newPart = part.copy(storage = part.storage.copy( - locationUri = Some(new Path(location).toUri)) + locationUri = Some(new URI(location))) ) catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(new Path(location).toUri))) + catalog.alterTable(table.withNewStorage(locationUri = Some(new URI(location)))) } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7db6180d2cbe9..722026cf4aa6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -491,7 +491,7 @@ case class DescribeTableCommand( append(buffer, "Owner:", table.owner, "") append(buffer, "Create Time:", new Date(table.createTime).toString, "") append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") - append(buffer, "Location:", table.storage.locationUri.map(_.getPath).getOrElse(""), "") + append(buffer, "Location:", table.storage.locationUriString.getOrElse(""), "") append(buffer, "Table Type:", table.tableType.name, "") table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) @@ -581,7 +581,7 @@ case class DescribeTableCommand( append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") append(buffer, "Database:", table.database, "") append(buffer, "Table:", tableIdentifier.table, "") - append(buffer, "Location:", partition.storage.locationUri.map(_.getPath).getOrElse(""), "") + append(buffer, "Location:", partition.storage.locationUriString.getOrElse(""), "") append(buffer, "Partition Parameters:", "", "") partition.parameters.foreach { case (key, value) => append(buffer, s" $key", value, "") @@ -941,13 +941,13 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val dataSourceOptions = metadata.storage.properties.map { case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" - } ++ metadata.storage.locationUri.flatMap { location => + } ++ metadata.storage.locationUriString.flatMap { location => if (metadata.tableType == MANAGED) { // If it's a managed table, omit PATH option. Spark SQL always creates external table // when the table creation DDL contains the PATH option. None } else { - Some(s"path '${escapeSingleQuotedString(location.getPath)}'") + Some(s"path '${escapeSingleQuotedString(location)}'") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index fae8f4e54cdcb..4e0381df3964e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -46,7 +46,7 @@ class CatalogFileIndex( assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFileIndex") - private val baseLocation: Option[String] = table.storage.locationUri.map(_.getPath) + private val baseLocation: Option[String] = table.storage.locationUriString override def partitionSchema: StructType = table.partitionSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index 86139472158cf..5efe6dc930af3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -613,7 +613,7 @@ object DataSource { def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = new CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") - CatalogStorageFormat.empty.copy(locationUri = path.map(new Path(_).toUri), + CatalogStorageFormat.empty.copy(locationUri = path.map(new URI(_)), properties = optionsWithoutPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 57ab0f038284d..c3a49ededf6d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -226,7 +226,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _.getPath) + val pathOption = table.storage.locationUriString.map("path" -> _) val dataSource = DataSource( sparkSession, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 02f45c3062b8f..3071ebfd7b052 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. val storagePropsWithLocation = table.storage.properties ++ - table.storage.locationUri.map("path" -> _.getPath) + table.storage.locationUriString.map("path" -> _) // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and // bucket specification to empty. Note that partition columns are retained, so that we can @@ -482,8 +482,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat client.alterTable(oldName, newTable) } - private def getLocationFromStorageProps(table: CatalogTable): Option[URI] = { - new CaseInsensitiveMap(table.storage.properties).get("path").map(new URI(_)) + private def getLocationFromStorageProps(table: CatalogTable): Option[String] = { + new CaseInsensitiveMap(table.storage.properties).get("path") } private def updateLocationInStorageProps( @@ -563,9 +563,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // want to alter the table location to a file path, we will fail. This should be fixed // in the future. - val newLocation = tableDefinition.storage.locationUri + val newLocation = tableDefinition.storage.locationUriString val storageWithPathOption = tableDefinition.storage.copy( - properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _.getPath)) + properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) val oldLocation = getLocationFromStorageProps(oldTableDef) if (oldLocation == newLocation) { @@ -704,7 +704,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table) // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) + updateLocationInStorageProps(table, newPath = None).copy( + locationUri = tableLocation.map(new Path(_).toUri) + ) } val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index afc889504168e..eb10c331bed2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -97,7 +97,7 @@ private[hive] case class MetastoreRelation( sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - catalogTable.storage.locationUri.foreach(u => sd.setLocation(u.getPath)) + catalogTable.storage.locationUriString.foreach(sd.setLocation) catalogTable.storage.inputFormat.foreach(sd.setInputFormat) catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) @@ -181,7 +181,7 @@ private[hive] case class MetastoreRelation( } sd.setCols(schema.asJava) - p.storage.locationUri.foreach(u => sd.setLocation(u.getPath)) + p.storage.locationUriString.foreach(sd.setLocation) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -246,13 +246,13 @@ private[hive] case class MetastoreRelation( override def inputFiles: Array[String] = { val partLocations = allPartitions - .flatMap(_.storage.locationUri) + .flatMap(_.storage.locationUriString) .toArray if (partLocations.nonEmpty) { - partLocations.map(_.getPath) + partLocations } else { Array( - catalogTable.storage.locationUri.map(_.getPath).getOrElse( + catalogTable.storage.locationUriString.getOrElse( sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index a4195260f66ed..37ec8bd96e3b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -408,7 +408,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h).map(new Path(_).toUri), + locationUri = shim.getDataLocation(h).map(new URI(_)), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -847,7 +847,9 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(conf.getUser) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc.getPath) } + table.storage.locationUriString.foreach { loc => + shim.setDataLocation(hiveTable, loc) + } table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -872,7 +874,7 @@ private[hive] class HiveClientImpl( } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(u => storageDesc.setLocation(u.getPath)) + p.storage.locationUriString.foreach(storageDesc.setLocation) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) @@ -890,7 +892,7 @@ private[hive] class HiveClientImpl( CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation).map(new Path(_).toUri), + locationUri = Option(apiPartition.getSd.getLocation).map(new URI(_)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 16cf4d7ec67f6..dfca63d3f9bba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -140,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(hiveTable.storage.serde === Some(serde)) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) - assert(hiveTable.storage.locationUri === Some(path.toString)) + assert(hiveTable.storage.locationUriString === Some(path.toString)) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 88dbb1ca0e9e7..9d173c6f5364f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -868,14 +868,14 @@ object SPARK_18360 { val rawTable = hiveClient.getTable("default", "test_tbl") // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table // location for tables in default database. - assert(rawTable.storage.locationUri.map(_.getPath).get.contains(newWarehousePath)) + assert(rawTable.storage.locationUriString.get.contains(newWarehousePath)) hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") // Spark SQL will use the location of default database to generate default table // location for tables in default database. - assert(readBack.storage.locationUri.map(_.getPath).get.contains(defaultDbLocation)) + assert(readBack.storage.locationUriString.get.contains(defaultDbLocation)) } finally { hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index b5966cf13a587..394758a1c26d0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -345,15 +345,15 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") - val newLocation = Utils.createTempDir().getPath() + val newLocation = Some(Utils.createTempDir().toURI) val storage = storageFormat.copy( - locationUri = Some(new URI(newLocation)), + locationUri = newLocation, // needed for 0.12 alter partitions serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) - .storage.locationUri.get.getPath == newLocation) + .storage.locationUri === newLocation) } test(s"$version: dropPartitions") { From e0a5fe060dc3a6a8dd7997ac48dfd368c909d033 Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 21 Jan 2017 16:15:09 +0800 Subject: [PATCH 7/8] fix some tc failed --- .../apache/spark/sql/catalyst/catalog/interface.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index c45c4188675cc..5542dc60815cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -20,8 +20,9 @@ package org.apache.spark.sql.catalyst.catalog import java.net.URI import java.util.Date -import scala.collection.mutable +import org.apache.hadoop.fs.Path +import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} @@ -54,6 +55,9 @@ case class CatalogStorageFormat( serde: Option[String], compressed: Boolean, properties: Map[String, String]) { + private[this] lazy val uriString = locationUri.map(_.toString) + + def locationUriString: Option[String] = uriString override def toString: String = { val serdePropsToString = CatalogUtils.maskCredentials(properties) match { @@ -102,7 +106,7 @@ case class CatalogTablePartition( } /** Return the partition location, assuming it is specified. */ - def location: String = storage.locationUri.map(_.getPath).getOrElse { + def location: String = storage.locationUriString.getOrElse { val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") throw new AnalysisException(s"Partition [$specString] did not specify locationUri") } @@ -192,7 +196,7 @@ case class CatalogTable( } /** Return the table location, assuming it is specified. */ - def location: String = storage.locationUri.map(_.getPath).getOrElse { + def location: String = storage.locationUriString.getOrElse { throw new AnalysisException(s"table $identifier did not specify locationUri") } From 59736fa41625ae3fe9341e9289aa1361a46baeda Mon Sep 17 00:00:00 2001 From: windpiger Date: Sat, 21 Jan 2017 19:21:51 +0800 Subject: [PATCH 8/8] fix test failed --- .../apache/spark/sql/hive/HiveDDLCommandSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index b67e5f6fe57a1..48773642bea67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -70,7 +70,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUriString == Some("/user/external/page_view")) assert(desc.schema.isEmpty) // will be populated later when the table is actually created assert(desc.comment == Some("This is the staging page view table")) // TODO will be SQLText @@ -101,7 +101,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUriString == Some("/user/external/page_view")) assert(desc.schema.isEmpty) // will be populated later when the table is actually created // TODO will be SQLText assert(desc.comment == Some("This is the staging page view table")) @@ -333,7 +333,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" val (desc, _) = extractTableDesc(query) assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/path/to/nowhere")) + assert(desc.storage.locationUriString == Some("/path/to/nowhere")) } test("create table - if not exists") { @@ -463,7 +463,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.locationUri == Some("/path/to/mercury")) + assert(desc.storage.locationUriString == Some("/path/to/mercury")) assert(desc.storage.inputFormat == Some("winput")) assert(desc.storage.outputFormat == Some("wowput")) assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) @@ -614,7 +614,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle .add("id", "int") .add("name", "string", nullable = true, comment = "blabla")) assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) - assert(table.storage.locationUri == Some("/tmp/file")) + assert(table.storage.locationUriString == Some("/tmp/file")) assert(table.storage.properties == Map("my_prop" -> "1")) assert(table.comment == Some("BLABLA"))