diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 15aed5f9b1bdf..2a3f5907465d4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.catalog import java.io.IOException +import java.net.URI import scala.collection.mutable @@ -211,7 +212,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to create table $table as failed " + s"to create its directory $defaultTableLocation", e) } - tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString)) + tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri)) } else { tableDefinition } @@ -283,7 +284,7 @@ class InMemoryCatalog( throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + s"to rename its directory $oldDir", e) } - oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString)) + oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri)) } catalog(db).tables.put(newName, oldDesc) @@ -392,7 +393,7 @@ class InMemoryCatalog( existingParts.put( p.spec, - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString)))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri)))) } } @@ -465,7 +466,7 @@ class InMemoryCatalog( } oldPartition.copy( spec = newSpec, - storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString))) + storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri))) } else { oldPartition.copy(spec = newSpec) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 80d32822f58ce..5542dc60815cf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -17,10 +17,12 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI import java.util.Date -import scala.collection.mutable +import org.apache.hadoop.fs.Path +import scala.collection.mutable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal} @@ -47,15 +49,15 @@ case class CatalogFunction( * Storage format, used to describe how a partition or a table is stored. */ case class CatalogStorageFormat( - // TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must - // be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and - // path.toUri respectively before use as a filesystem path due to URI char escaping. - locationUri: Option[String], + locationUri: Option[URI], inputFormat: Option[String], outputFormat: Option[String], serde: Option[String], compressed: Boolean, properties: Map[String, String]) { + private[this] lazy val uriString = locationUri.map(_.toString) + + def locationUriString: Option[String] = uriString override def toString: String = { val serdePropsToString = CatalogUtils.maskCredentials(properties) match { @@ -104,7 +106,7 @@ case class CatalogTablePartition( } /** Return the partition location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: String = storage.locationUriString.getOrElse { val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ") throw new AnalysisException(s"Partition [$specString] did not specify locationUri") } @@ -194,7 +196,7 @@ case class CatalogTable( } /** Return the table location, assuming it is specified. */ - def location: String = storage.locationUri.getOrElse { + def location: String = storage.locationUriString.getOrElse { throw new AnalysisException(s"table $identifier did not specify locationUri") } @@ -225,7 +227,7 @@ case class CatalogTable( /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( - locationUri: Option[String] = storage.locationUri, + locationUri: Option[URI] = storage.locationUri, inputFormat: Option[String] = storage.inputFormat, outputFormat: Option[String] = storage.outputFormat, compressed: Boolean = false, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index acf3bcfdaa955..43254ab570d59 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.scalatest.BeforeAndAfterEach @@ -366,10 +368,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partition1 = CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"), - storageFormat.copy(locationUri = Some(newLocationPart1))) + storageFormat.copy(locationUri = Some(new URI(newLocationPart1)))) val partition2 = CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"), - storageFormat.copy(locationUri = Some(newLocationPart2))) + storageFormat.copy(locationUri = Some(new URI(newLocationPart2)))) catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false) catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false) @@ -561,8 +563,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec) val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec) catalog.alterPartitions("db2", "tbl2", Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))))) val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec) val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec) assert(newPart1.storage.locationUri == Some(newLocation)) @@ -743,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac identifier = TableIdentifier("external_table", Some("db1")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat( - Some(Utils.createTempDir().getAbsolutePath), + Some(Utils.createTempDir().toURI), None, None, None, false, Map.empty), schema = new StructType().add("a", "int").add("b", "string"), provider = Some("hive") @@ -791,7 +793,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithExistingDir = CatalogTablePartition( Map("partCol1" -> "7", "partCol2" -> "8"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false) @@ -800,7 +802,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac val partWithNonExistingDir = CatalogTablePartition( Map("partCol1" -> "9", "partCol2" -> "10"), CatalogStorageFormat( - Some(tempPath.toURI.toString), + Some(tempPath.toURI), None, None, None, false, Map.empty)) catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false) assert(tempPath.exists()) @@ -895,7 +897,7 @@ abstract class CatalogTestUtils { CatalogTable( identifier = TableIdentifier(name, database), tableType = CatalogTableType.EXTERNAL, - storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)), + storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)), schema = new StructType() .add("col1", "int") .add("col2", "string") diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index f935de68af899..a15ecc23a0167 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.catalog +import java.net.URI + import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ @@ -872,8 +874,8 @@ class SessionCatalogSuite extends PlanTest { val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq( - oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), - oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))))) val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) assert(newPart1.storage.locationUri == Some(newLocation)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 41768d451261a..c2c0f5910cc5b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -17,11 +17,12 @@ package org.apache.spark.sql.execution -import scala.collection.JavaConverters._ +import java.net.URI +import scala.collection.JavaConverters._ import org.antlr.v4.runtime.{ParserRuleContext, Token} import org.antlr.v4.runtime.tree.TerminalNode - +import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, SaveMode} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ @@ -373,7 +374,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(Array.empty[String]) val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec) - val location = Option(ctx.locationSpec).map(visitLocationSpec) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri) val storage = DataSource.buildStorageFormatFromOptions(options) if (location.isDefined && storage.locationUri.isDefined) { @@ -1068,7 +1069,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { .getOrElse(CatalogStorageFormat.empty) val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) .getOrElse(CatalogStorageFormat.empty) - val location = Option(ctx.locationSpec).map(visitLocationSpec) + val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri) // If we are creating an EXTERNAL table, then the LOCATION field is required if (external && location.isEmpty) { operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index beeba05554dde..687ff7d1fd747 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import org.apache.hadoop.fs.Path + import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -54,7 +56,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo // Create the relation to validate the arguments before writing the metadata to the metastore, // and infer the table schema and partition if users didn't specify schema in CREATE TABLE. - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUriString.map("path" -> _) // Fill in some default table options from the session conf val tableWithDefaultOptions = table.copy( identifier = table.identifier.copy( @@ -142,15 +144,15 @@ case class CreateDataSourceTableAsSelectCommand( } saveDataIntoTable( - sparkSession, table, table.storage.locationUri, query, mode, tableExists = true) + sparkSession, table, table.storage.locationUriString, query, mode, tableExists = true) } else { val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { - Some(sessionState.catalog.defaultTablePath(table.identifier)) + Some(new Path(sessionState.catalog.defaultTablePath(table.identifier)).toUri) } else { table.storage.locationUri } val result = saveDataIntoTable( - sparkSession, table, tableLocation, query, mode, tableExists = false) + sparkSession, table, tableLocation.map(_.toString), query, mode, tableExists = false) val newTable = table.copy( storage = table.storage.copy(locationUri = tableLocation), // We will use the schema of resolved.relation as the schema of the table (instead of diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 82cbb4aa47445..9723075128096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.net.URI + import scala.collection.{GenMap, GenSeq} import scala.collection.parallel.ForkJoinTaskSupport import scala.concurrent.forkjoin.ForkJoinPool @@ -426,7 +428,8 @@ case class AlterTableAddPartitionCommand( table.identifier.quotedString, sparkSession.sessionState.conf.resolver) // inherit table storage format (possibly except for location) - CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location)) + CatalogTablePartition(normalizedSpec, table.storage + .copy(locationUri = location.map(new URI(_)))) } catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists) Seq.empty[Row] @@ -710,7 +713,7 @@ case class AlterTableRecoverPartitionsCommand( // inherit table storage format (possibly except for location) CatalogTablePartition( spec, - table.storage.copy(locationUri = Some(location.toUri.toString)), + table.storage.copy(locationUri = Some(location.toUri)), params) } spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true) @@ -748,11 +751,13 @@ case class AlterTableSetLocationCommand( sparkSession, table, "ALTER TABLE ... SET LOCATION") // Partition spec is specified, so we set the location only for this partition val part = catalog.getPartition(table.identifier, spec) - val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location))) + val newPart = part.copy(storage = part.storage.copy( + locationUri = Some(new URI(location))) + ) catalog.alterPartitions(table.identifier, Seq(newPart)) case None => // No partition spec is specified, so we set the location for the table itself - catalog.alterTable(table.withNewStorage(locationUri = Some(location))) + catalog.alterTable(table.withNewStorage(locationUri = Some(new URI(location)))) } Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 1b596c97a1c4e..722026cf4aa6b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -491,7 +491,7 @@ case class DescribeTableCommand( append(buffer, "Owner:", table.owner, "") append(buffer, "Create Time:", new Date(table.createTime).toString, "") append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "") - append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", table.storage.locationUriString.getOrElse(""), "") append(buffer, "Table Type:", table.tableType.name, "") table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, "")) @@ -581,7 +581,7 @@ case class DescribeTableCommand( append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "") append(buffer, "Database:", table.database, "") append(buffer, "Table:", tableIdentifier.table, "") - append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "") + append(buffer, "Location:", partition.storage.locationUriString.getOrElse(""), "") append(buffer, "Partition Parameters:", "", "") partition.parameters.foreach { case (key, value) => append(buffer, s" $key", value, "") @@ -941,7 +941,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman val dataSourceOptions = metadata.storage.properties.map { case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'" - } ++ metadata.storage.locationUri.flatMap { location => + } ++ metadata.storage.locationUriString.flatMap { location => if (metadata.tableType == MANAGED) { // If it's a managed table, omit PATH option. Spark SQL always creates external table // when the table creation DDL contains the PATH option. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index 1235a4b12f1d0..4e0381df3964e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -46,7 +46,7 @@ class CatalogFileIndex( assert(table.identifier.database.isDefined, "The table identifier must be qualified in CatalogFileIndex") - private val baseLocation: Option[String] = table.storage.locationUri + private val baseLocation: Option[String] = table.storage.locationUriString override def partitionSchema: StructType = table.partitionSchema diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index ecfcafe69c1f2..5efe6dc930af3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources +import java.net.URI import java.util.{ServiceConfigurationError, ServiceLoader} import scala.collection.JavaConverters._ @@ -612,6 +613,7 @@ object DataSource { def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = new CaseInsensitiveMap(options).get("path") val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path") - CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath) + CatalogStorageFormat.empty.copy(locationUri = path.map(new URI(_)), + properties = optionsWithoutPath) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 21b07ee85adc8..c3a49ededf6d1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -226,7 +226,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] cache.get(qualifiedTableName, new Callable[LogicalPlan]() { override def call(): LogicalPlan = { - val pathOption = table.storage.locationUri.map("path" -> _) + val pathOption = table.storage.locationUriString.map("path" -> _) val dataSource = DataSource( sparkSession, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 76bb9e5929a71..5f780cf48c311 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution.command +import java.net.URI + import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier @@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest { val expectedTableDesc = CatalogTable( identifier = TableIdentifier("my_tab"), tableType = CatalogTableType.EXTERNAL, - storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")), + storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))), schema = new StructType().add("a", IntegerType).add("b", StringType), provider = Some("parquet")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index b4c9e276ece7a..b6980d49ca6f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -79,7 +79,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = { val storage = CatalogStorageFormat( - locationUri = Some(catalog.defaultTablePath(name)), + locationUri = Some(new Path(catalog.defaultTablePath(name)).toUri), inputFormat = None, outputFormat = None, serde = None, @@ -1795,7 +1795,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { sql("CREATE TABLE tbl(i INT) USING parquet") sql("INSERT INTO tbl SELECT 1") checkAnswer(spark.table("tbl"), Row(1)) - val defaultTablePath = spark.sessionState.catalog + val defaultTableUri = spark.sessionState.catalog .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'") @@ -1803,7 +1803,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // SET LOCATION won't move data from previous table path to new table path. assert(spark.table("tbl").count() == 0) // the previous table path should be still there. - assert(new File(new URI(defaultTablePath)).exists()) + assert(new File(defaultTableUri).exists()) sql("INSERT INTO tbl SELECT 2") checkAnswer(spark.table("tbl"), Row(2)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 801912f44174f..05b8591dfecfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -480,7 +480,7 @@ class CatalogSuite options = Map.empty[String, String]) val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) assert(table.tableType == CatalogTableType.MANAGED) - val tablePath = new File(new URI(table.storage.locationUri.get)) + val tablePath = new File(table.storage.locationUri.get) assert(tablePath.exists() && tablePath.listFiles().isEmpty) Seq((1)).toDF("i").write.insertInto("t") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 208c8c9d5d0cf..3071ebfd7b052 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -210,7 +210,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat tableDefinition.storage.locationUri.isEmpty val tableLocation = if (needDefaultTableLocation) { - Some(defaultTablePath(tableDefinition.identifier)) + Some(new Path(defaultTablePath(tableDefinition.identifier)).toUri) } else { tableDefinition.storage.locationUri } @@ -260,7 +260,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, in older version of Spark we already store table location in storage properties // with key "path". Here we keep this behaviour for backward compatibility. val storagePropsWithLocation = table.storage.properties ++ - table.storage.locationUri.map("path" -> _) + table.storage.locationUriString.map("path" -> _) // converts the table metadata to Spark SQL specific format, i.e. set data schema, names and // bucket specification to empty. Note that partition columns are retained, so that we can @@ -285,7 +285,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // compatible format, which means the data source is file-based and must have a `path`. require(table.storage.locationUri.isDefined, "External file-based data source table must have a `path` entry in storage properties.") - Some(new Path(table.location).toUri.toString) + Some(new Path(table.location).toUri) } else { None } @@ -438,7 +438,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat try { client.createTable( - tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)), + tableDefinition.withNewStorage(locationUri = Some(new URI(tempPath.toString))), ignoreIfExists) } finally { FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true) @@ -563,7 +563,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // want to alter the table location to a file path, we will fail. This should be fixed // in the future. - val newLocation = tableDefinition.storage.locationUri + val newLocation = tableDefinition.storage.locationUriString val storageWithPathOption = tableDefinition.storage.copy( properties = tableDefinition.storage.properties ++ newLocation.map("path" -> _)) @@ -704,7 +704,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat val storageWithLocation = { val tableLocation = getLocationFromStorageProps(table) // We pass None as `newPath` here, to remove the path option in storage properties. - updateLocationInStorageProps(table, newPath = None).copy(locationUri = tableLocation) + updateLocationInStorageProps(table, newPath = None).copy( + locationUri = tableLocation.map(new Path(_).toUri) + ) } val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) @@ -854,10 +856,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat // However, Hive metastore is not case preserving and will generate wrong partition location // with lower cased partition column names. Here we set the default partition location // manually to avoid this problem. - val partitionPath = p.storage.locationUri.map(uri => new Path(new URI(uri))).getOrElse { + val partitionPath = p.storage.locationUri.map(uri => new Path(uri)).getOrElse { ExternalCatalogUtils.generatePartitionPath(p.spec, partitionColumnNames, tablePath) } - p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri.toString))) + p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))) } val lowerCasedParts = partsWithLocation.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec))) client.createPartitions(db, table, lowerCasedParts, ignoreIfExists) @@ -903,7 +905,9 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat case e: IOException => throw new SparkException( s"Unable to rename partition path from $wrongPath to $rightPath", e) } - partition.copy(storage = partition.storage.copy(locationUri = Some(rightPath.toString))) + partition.copy(storage = partition.storage.copy( + locationUri = Some(new URI(rightPath.toString))) + ) } alterPartitions(db, table, newParts) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index 346757c2047a7..eb10c331bed2f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -97,7 +97,7 @@ private[hive] case class MetastoreRelation( sd.setCols(schema.asJava) tTable.setPartitionKeys(partCols.asJava) - catalogTable.storage.locationUri.foreach(sd.setLocation) + catalogTable.storage.locationUriString.foreach(sd.setLocation) catalogTable.storage.inputFormat.foreach(sd.setInputFormat) catalogTable.storage.outputFormat.foreach(sd.setOutputFormat) @@ -181,7 +181,7 @@ private[hive] case class MetastoreRelation( } sd.setCols(schema.asJava) - p.storage.locationUri.foreach(sd.setLocation) + p.storage.locationUriString.foreach(sd.setLocation) p.storage.inputFormat.foreach(sd.setInputFormat) p.storage.outputFormat.foreach(sd.setOutputFormat) @@ -246,13 +246,13 @@ private[hive] case class MetastoreRelation( override def inputFiles: Array[String] = { val partLocations = allPartitions - .flatMap(_.storage.locationUri) + .flatMap(_.storage.locationUriString) .toArray if (partLocations.nonEmpty) { partLocations } else { Array( - catalogTable.storage.locationUri.getOrElse( + catalogTable.storage.locationUriString.getOrElse( sys.error(s"Could not get the location of ${catalogTable.qualifiedName}."))) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 9a6144c5e3cc8..37ec8bd96e3b9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.{File, PrintStream} +import java.net.URI import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer @@ -407,7 +408,7 @@ private[hive] class HiveClientImpl( createTime = h.getTTable.getCreateTime.toLong * 1000, lastAccessTime = h.getLastAccessTime.toLong * 1000, storage = CatalogStorageFormat( - locationUri = shim.getDataLocation(h), + locationUri = shim.getDataLocation(h).map(new URI(_)), // To avoid ClassNotFound exception, we try our best to not get the format class, but get // the class name directly. However, for non-native tables, there is no interface to get // the format class name, so we may still throw ClassNotFound in this case. @@ -846,7 +847,9 @@ private[hive] class HiveClientImpl( hiveTable.setOwner(conf.getUser) hiveTable.setCreateTime((table.createTime / 1000).toInt) hiveTable.setLastAccessTime((table.lastAccessTime / 1000).toInt) - table.storage.locationUri.foreach { loc => shim.setDataLocation(hiveTable, loc) } + table.storage.locationUriString.foreach { loc => + shim.setDataLocation(hiveTable, loc) + } table.storage.inputFormat.map(toInputFormat).foreach(hiveTable.setInputFormatClass) table.storage.outputFormat.map(toOutputFormat).foreach(hiveTable.setOutputFormatClass) hiveTable.setSerializationLib( @@ -871,7 +874,7 @@ private[hive] class HiveClientImpl( } val storageDesc = new StorageDescriptor val serdeInfo = new SerDeInfo - p.storage.locationUri.foreach(storageDesc.setLocation) + p.storage.locationUriString.foreach(storageDesc.setLocation) p.storage.inputFormat.foreach(storageDesc.setInputFormat) p.storage.outputFormat.foreach(storageDesc.setOutputFormat) p.storage.serde.foreach(serdeInfo.setSerializationLib) @@ -889,7 +892,7 @@ private[hive] class HiveClientImpl( CatalogTablePartition( spec = Option(hp.getSpec).map(_.asScala.toMap).getOrElse(Map.empty), storage = CatalogStorageFormat( - locationUri = Option(apiPartition.getSd.getLocation), + locationUri = Option(apiPartition.getSd.getLocation).map(new URI(_)), inputFormat = Option(apiPartition.getSd.getInputFormat), outputFormat = Option(apiPartition.getSd.getOutputFormat), serde = Option(apiPartition.getSd.getSerdeInfo.getSerializationLib), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala index b052f1e7e43f5..9174bc40153ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala @@ -271,7 +271,7 @@ private[client] class Shim_v0_12 extends Shim with Logging { val table = hive.getTable(database, tableName) parts.foreach { s => val location = s.storage.locationUri.map( - uri => new Path(table.getPath, new Path(new URI(uri)))).orNull + uri => new Path(table.getPath, new Path(uri))).orNull val params = if (s.parameters.nonEmpty) s.parameters.asJava else null val spec = s.spec.asJava if (hive.getPartition(table, spec, false) != null && ignoreIfExists) { @@ -469,7 +469,7 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { val addPartitionDesc = new AddPartitionDesc(db, table, ignoreIfExists) parts.zipWithIndex.foreach { case (s, i) => addPartitionDesc.addPartition( - s.spec.asJava, s.storage.locationUri.map(u => new Path(new URI(u)).toString).orNull) + s.spec.asJava, s.storage.locationUri.map(u => new Path(u).toString).orNull) if (s.parameters.nonEmpty) { addPartitionDesc.getPartition(i).setPartParams(s.parameters.asJava) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala index b67e5f6fe57a1..48773642bea67 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala @@ -70,7 +70,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUriString == Some("/user/external/page_view")) assert(desc.schema.isEmpty) // will be populated later when the table is actually created assert(desc.comment == Some("This is the staging page view table")) // TODO will be SQLText @@ -101,7 +101,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.identifier.database == Some("mydb")) assert(desc.identifier.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/user/external/page_view")) + assert(desc.storage.locationUriString == Some("/user/external/page_view")) assert(desc.schema.isEmpty) // will be populated later when the table is actually created // TODO will be SQLText assert(desc.comment == Some("This is the staging page view table")) @@ -333,7 +333,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle val query = "CREATE EXTERNAL TABLE tab1 (id int, name string) LOCATION '/path/to/nowhere'" val (desc, _) = extractTableDesc(query) assert(desc.tableType == CatalogTableType.EXTERNAL) - assert(desc.storage.locationUri == Some("/path/to/nowhere")) + assert(desc.storage.locationUriString == Some("/path/to/nowhere")) } test("create table - if not exists") { @@ -463,7 +463,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle assert(desc.bucketSpec.isEmpty) assert(desc.viewText.isEmpty) assert(desc.viewOriginalText.isEmpty) - assert(desc.storage.locationUri == Some("/path/to/mercury")) + assert(desc.storage.locationUriString == Some("/path/to/mercury")) assert(desc.storage.inputFormat == Some("winput")) assert(desc.storage.outputFormat == Some("wowput")) assert(desc.storage.serde == Some("org.apache.poof.serde.Baff")) @@ -614,7 +614,7 @@ class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingle .add("id", "int") .add("name", "string", nullable = true, comment = "blabla")) assert(table.provider == Some(DDLUtils.HIVE_PROVIDER)) - assert(table.storage.locationUri == Some("/tmp/file")) + assert(table.storage.locationUriString == Some("/tmp/file")) assert(table.storage.properties == Map("my_prop" -> "1")) assert(table.comment == Some("BLABLA")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala index 00fdfbcebbe85..21cec4dd29dab 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogBackwardCompatibilitySuite.scala @@ -81,7 +81,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl2", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), + locationUri = Some(new URI(tempDirUri)), inputFormat = Some("org.apache.hadoop.mapred.TextInputFormat"), outputFormat = Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")), schema = simpleSchema) @@ -167,7 +167,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl7", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl7").toString + "-__PLACEHOLDER__"), + locationUri = Some(new URI(defaultTableURI("tbl7").getPath + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map( @@ -179,7 +179,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl8", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempDirUri), + locationUri = Some(new URI(tempDirUri)), properties = Map("path" -> tempDirUri)), schema = simpleSchema, properties = Map( @@ -191,7 +191,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest identifier = TableIdentifier("tbl9", Some("test_db")), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(defaultTableURI("tbl9").toString + "-__PLACEHOLDER__"), + locationUri = Some(new URI(defaultTableURI("tbl9").getPath + "-__PLACEHOLDER__")), properties = Map("path" -> tempDirUri)), schema = new StructType(), properties = Map("spark.sql.sources.provider" -> "json")) @@ -215,7 +215,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest if (tbl.tableType == CatalogTableType.EXTERNAL) { // trim the URI prefix - val tableLocation = new URI(readBack.storage.locationUri.get).getPath + val tableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = tempDir.toURI.getPath.stripSuffix("/") assert(tableLocation == expectedLocation) } @@ -231,7 +231,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest val readBack = getTableMetadata(tbl.identifier.table) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expected = dir.toURI.getPath.stripSuffix("/") assert(actualTableLocation == expected) } @@ -247,7 +247,7 @@ class HiveExternalCatalogBackwardCompatibilitySuite extends QueryTest assert(readBack.schema.sameType(expectedSchema)) // trim the URI prefix - val actualTableLocation = new URI(readBack.storage.locationUri.get).getPath + val actualTableLocation = readBack.storage.locationUri.get.getPath val expectedLocation = if (tbl.tableType == CatalogTableType.EXTERNAL) { tempDir.toURI.getPath.stripSuffix("/") } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala index 16cf4d7ec67f6..dfca63d3f9bba 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala @@ -140,7 +140,7 @@ class DataSourceWithHiveMetastoreCatalogSuite assert(hiveTable.storage.serde === Some(serde)) assert(hiveTable.tableType === CatalogTableType.EXTERNAL) - assert(hiveTable.storage.locationUri === Some(path.toString)) + assert(hiveTable.storage.locationUriString === Some(path.toString)) val columns = hiveTable.schema assert(columns.map(_.name) === Seq("d1", "d2")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 8f0d5d886c9d5..9d173c6f5364f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -868,14 +868,14 @@ object SPARK_18360 { val rawTable = hiveClient.getTable("default", "test_tbl") // Hive will use the value of `hive.metastore.warehouse.dir` to generate default table // location for tables in default database. - assert(rawTable.storage.locationUri.get.contains(newWarehousePath)) + assert(rawTable.storage.locationUriString.get.contains(newWarehousePath)) hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = false, purge = false) spark.sharedState.externalCatalog.createTable(tableMeta, ignoreIfExists = false) val readBack = spark.sharedState.externalCatalog.getTable("default", "test_tbl") // Spark SQL will use the location of default database to generate default table // location for tables in default database. - assert(readBack.storage.locationUri.get.contains(defaultDbLocation)) + assert(readBack.storage.locationUriString.get.contains(defaultDbLocation)) } finally { hiveClient.dropTable("default", "test_tbl", ignoreIfNotExists = true, purge = false) hiveClient.runSqlHive(s"SET hive.metastore.warehouse.dir=$defaultDbLocation") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index f0e2c9369bd05..698c16e5b6656 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive import java.io.File +import java.net.URI import scala.collection.mutable.ArrayBuffer @@ -1015,7 +1016,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv identifier = TableIdentifier("not_skip_hive_metadata"), tableType = CatalogTableType.EXTERNAL, storage = CatalogStorageFormat.empty.copy( - locationUri = Some(tempPath.getCanonicalPath), + locationUri = Some(new URI(tempPath.getCanonicalPath)), properties = Map("skipHiveMetadata" -> "false") ), schema = schema, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index dca207a72d889..79ed511ffd0c1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -265,6 +265,7 @@ class PartitionProviderCompatibilitySuite spark.sql(s""" |alter table test partition (A=5, B='%') |rename to partition (A=100, B='%')""".stripMargin) + val x = spark.sql("show partitions test").collect() assert(spark.sql("select * from test where a = 5 and b = '%'").count() == 0) assert(spark.sql("select * from test where a = 100 and b = '%'").count() == 1) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 28b5bfd5819c6..394758a1c26d0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.client import java.io.{ByteArrayOutputStream, File, PrintStream} +import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat @@ -344,15 +345,15 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet test(s"$version: alterPartitions") { val spec = Map("key1" -> "1", "key2" -> "2") - val newLocation = Utils.createTempDir().getPath() + val newLocation = Some(Utils.createTempDir().toURI) val storage = storageFormat.copy( - locationUri = Some(newLocation), + locationUri = newLocation, // needed for 0.12 alter partitions serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) val partition = CatalogTablePartition(spec, storage) client.alterPartitions("default", "src_part", Seq(partition)) assert(client.getPartition("default", "src_part", spec) - .storage.locationUri == Some(newLocation)) + .storage.locationUri === newLocation) } test(s"$version: dropPartitions") {