Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.catalog

import java.io.IOException
import java.net.URI

import scala.collection.mutable

Expand Down Expand Up @@ -211,7 +212,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to create table $table as failed " +
s"to create its directory $defaultTableLocation", e)
}
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri.toString))
tableDefinition.withNewStorage(locationUri = Some(defaultTableLocation.toUri))
} else {
tableDefinition
}
Expand Down Expand Up @@ -283,7 +284,7 @@ class InMemoryCatalog(
throw new SparkException(s"Unable to rename table $oldName to $newName as failed " +
s"to rename its directory $oldDir", e)
}
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri.toString))
oldDesc.table = oldDesc.table.withNewStorage(locationUri = Some(newDir.toUri))
}

catalog(db).tables.put(newName, oldDesc)
Expand Down Expand Up @@ -392,7 +393,7 @@ class InMemoryCatalog(

existingParts.put(
p.spec,
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toString))))
p.copy(storage = p.storage.copy(locationUri = Some(partitionPath.toUri))))
}
}

Expand Down Expand Up @@ -465,7 +466,7 @@ class InMemoryCatalog(
}
oldPartition.copy(
spec = newSpec,
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toString)))
storage = oldPartition.storage.copy(locationUri = Some(newPartPath.toUri)))
} else {
oldPartition.copy(spec = newSpec)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI
import java.util.Date

import scala.collection.mutable
import org.apache.hadoop.fs.Path

import scala.collection.mutable
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
Expand All @@ -47,15 +49,15 @@ case class CatalogFunction(
* Storage format, used to describe how a partition or a table is stored.
*/
case class CatalogStorageFormat(
// TODO(ekl) consider storing this field as java.net.URI for type safety. Note that this must
// be converted to/from a hadoop Path object using new Path(new URI(locationUri)) and
// path.toUri respectively before use as a filesystem path due to URI char escaping.
locationUri: Option[String],
locationUri: Option[URI],
inputFormat: Option[String],
outputFormat: Option[String],
serde: Option[String],
compressed: Boolean,
properties: Map[String, String]) {
private[this] lazy val uriString = locationUri.map(_.toString)

def locationUriString: Option[String] = uriString

override def toString: String = {
val serdePropsToString = CatalogUtils.maskCredentials(properties) match {
Expand Down Expand Up @@ -104,7 +106,7 @@ case class CatalogTablePartition(
}

/** Return the partition location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: String = storage.locationUriString.getOrElse {
val specString = spec.map { case (k, v) => s"$k=$v" }.mkString(", ")
throw new AnalysisException(s"Partition [$specString] did not specify locationUri")
}
Expand Down Expand Up @@ -194,7 +196,7 @@ case class CatalogTable(
}

/** Return the table location, assuming it is specified. */
def location: String = storage.locationUri.getOrElse {
def location: String = storage.locationUriString.getOrElse {
throw new AnalysisException(s"table $identifier did not specify locationUri")
}

Expand Down Expand Up @@ -225,7 +227,7 @@ case class CatalogTable(

/** Syntactic sugar to update a field in `storage`. */
def withNewStorage(
locationUri: Option[String] = storage.locationUri,
locationUri: Option[URI] = storage.locationUri,
inputFormat: Option[String] = storage.inputFormat,
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
Expand Down Expand Up @@ -366,10 +368,10 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac

val partition1 =
CatalogTablePartition(Map("partCol1" -> "1", "partCol2" -> "2"),
storageFormat.copy(locationUri = Some(newLocationPart1)))
storageFormat.copy(locationUri = Some(new URI(newLocationPart1))))
val partition2 =
CatalogTablePartition(Map("partCol1" -> "3", "partCol2" -> "4"),
storageFormat.copy(locationUri = Some(newLocationPart2)))
storageFormat.copy(locationUri = Some(new URI(newLocationPart2))))
catalog.createPartitions("db1", "tbl", Seq(partition1), ignoreIfExists = false)
catalog.createPartitions("db1", "tbl", Seq(partition2), ignoreIfExists = false)

Expand Down Expand Up @@ -561,8 +563,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val oldPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
val oldPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
catalog.alterPartitions("db2", "tbl2", Seq(
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation))))))
val newPart1 = catalog.getPartition("db2", "tbl2", part1.spec)
val newPart2 = catalog.getPartition("db2", "tbl2", part2.spec)
assert(newPart1.storage.locationUri == Some(newLocation))
Expand Down Expand Up @@ -743,7 +745,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
identifier = TableIdentifier("external_table", Some("db1")),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat(
Some(Utils.createTempDir().getAbsolutePath),
Some(Utils.createTempDir().toURI),
None, None, None, false, Map.empty),
schema = new StructType().add("a", "int").add("b", "string"),
provider = Some("hive")
Expand Down Expand Up @@ -791,7 +793,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithExistingDir = CatalogTablePartition(
Map("partCol1" -> "7", "partCol2" -> "8"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithExistingDir), ignoreIfExists = false)

Expand All @@ -800,7 +802,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
val partWithNonExistingDir = CatalogTablePartition(
Map("partCol1" -> "9", "partCol2" -> "10"),
CatalogStorageFormat(
Some(tempPath.toURI.toString),
Some(tempPath.toURI),
None, None, None, false, Map.empty))
catalog.createPartitions("db1", "tbl", Seq(partWithNonExistingDir), ignoreIfExists = false)
assert(tempPath.exists())
Expand Down Expand Up @@ -895,7 +897,7 @@ abstract class CatalogTestUtils {
CatalogTable(
identifier = TableIdentifier(name, database),
tableType = CatalogTableType.EXTERNAL,
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().getAbsolutePath)),
storage = storageFormat.copy(locationUri = Some(Utils.createTempDir().toURI)),
schema = new StructType()
.add("col1", "int")
.add("col2", "string")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.catalyst.catalog

import java.net.URI

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -872,8 +874,8 @@ class SessionCatalogSuite extends PlanTest {
val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation)))))
oldPart1.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation)))),
oldPart2.copy(storage = storageFormat.copy(locationUri = Some(new URI(newLocation))))))
val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec)
val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec)
assert(newPart1.storage.locationUri == Some(newLocation))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.execution

import scala.collection.JavaConverters._
import java.net.URI

import scala.collection.JavaConverters._
import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
Expand Down Expand Up @@ -373,7 +374,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)

val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri)
val storage = DataSource.buildStorageFormatFromOptions(options)

if (location.isDefined && storage.locationUri.isDefined) {
Expand Down Expand Up @@ -1068,7 +1069,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.getOrElse(CatalogStorageFormat.empty)
val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
.getOrElse(CatalogStorageFormat.empty)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
val location = Option(ctx.locationSpec).map(visitLocationSpec).map(new Path(_).toUri)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
operationNotAllowed("CREATE EXTERNAL TABLE must be accompanied by LOCATION", ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -54,7 +56,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo

// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
val pathOption = table.storage.locationUriString.map("path" -> _)
// Fill in some default table options from the session conf
val tableWithDefaultOptions = table.copy(
identifier = table.identifier.copy(
Expand Down Expand Up @@ -142,15 +144,15 @@ case class CreateDataSourceTableAsSelectCommand(
}

saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
sparkSession, table, table.storage.locationUriString, query, mode, tableExists = true)
} else {
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
Some(new Path(sessionState.catalog.defaultTablePath(table.identifier)).toUri)
} else {
table.storage.locationUri
}
val result = saveDataIntoTable(
sparkSession, table, tableLocation, query, mode, tableExists = false)
sparkSession, table, tableLocation.map(_.toString), query, mode, tableExists = false)
val newTable = table.copy(
storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import java.net.URI

import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.concurrent.forkjoin.ForkJoinPool
Expand Down Expand Up @@ -426,7 +428,8 @@ case class AlterTableAddPartitionCommand(
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
// inherit table storage format (possibly except for location)
CatalogTablePartition(normalizedSpec, table.storage.copy(locationUri = location))
CatalogTablePartition(normalizedSpec, table.storage
.copy(locationUri = location.map(new URI(_))))
}
catalog.createPartitions(table.identifier, parts, ignoreIfExists = ifNotExists)
Seq.empty[Row]
Expand Down Expand Up @@ -710,7 +713,7 @@ case class AlterTableRecoverPartitionsCommand(
// inherit table storage format (possibly except for location)
CatalogTablePartition(
spec,
table.storage.copy(locationUri = Some(location.toUri.toString)),
table.storage.copy(locationUri = Some(location.toUri)),
params)
}
spark.sessionState.catalog.createPartitions(tableName, parts, ignoreIfExists = true)
Expand Down Expand Up @@ -748,11 +751,13 @@ case class AlterTableSetLocationCommand(
sparkSession, table, "ALTER TABLE ... SET LOCATION")
// Partition spec is specified, so we set the location only for this partition
val part = catalog.getPartition(table.identifier, spec)
val newPart = part.copy(storage = part.storage.copy(locationUri = Some(location)))
val newPart = part.copy(storage = part.storage.copy(
locationUri = Some(new URI(location)))
)
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
catalog.alterTable(table.withNewStorage(locationUri = Some(new URI(location))))
}
Seq.empty[Row]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ case class DescribeTableCommand(
append(buffer, "Owner:", table.owner, "")
append(buffer, "Create Time:", new Date(table.createTime).toString, "")
append(buffer, "Last Access Time:", new Date(table.lastAccessTime).toString, "")
append(buffer, "Location:", table.storage.locationUri.getOrElse(""), "")
append(buffer, "Location:", table.storage.locationUriString.getOrElse(""), "")
append(buffer, "Table Type:", table.tableType.name, "")
table.stats.foreach(s => append(buffer, "Statistics:", s.simpleString, ""))

Expand Down Expand Up @@ -581,7 +581,7 @@ case class DescribeTableCommand(
append(buffer, "Partition Value:", s"[${partition.spec.values.mkString(", ")}]", "")
append(buffer, "Database:", table.database, "")
append(buffer, "Table:", tableIdentifier.table, "")
append(buffer, "Location:", partition.storage.locationUri.getOrElse(""), "")
append(buffer, "Location:", partition.storage.locationUriString.getOrElse(""), "")
append(buffer, "Partition Parameters:", "", "")
partition.parameters.foreach { case (key, value) =>
append(buffer, s" $key", value, "")
Expand Down Expand Up @@ -941,7 +941,7 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman

val dataSourceOptions = metadata.storage.properties.map {
case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
} ++ metadata.storage.locationUri.flatMap { location =>
} ++ metadata.storage.locationUriString.flatMap { location =>
if (metadata.tableType == MANAGED) {
// If it's a managed table, omit PATH option. Spark SQL always creates external table
// when the table creation DDL contains the PATH option.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CatalogFileIndex(
assert(table.identifier.database.isDefined,
"The table identifier must be qualified in CatalogFileIndex")

private val baseLocation: Option[String] = table.storage.locationUri
private val baseLocation: Option[String] = table.storage.locationUriString

override def partitionSchema: StructType = table.partitionSchema

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources

import java.net.URI
import java.util.{ServiceConfigurationError, ServiceLoader}

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -612,6 +613,7 @@ object DataSource {
def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
val path = new CaseInsensitiveMap(options).get("path")
val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
CatalogStorageFormat.empty.copy(locationUri = path.map(new URI(_)),
properties = optionsWithoutPath)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]

cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
override def call(): LogicalPlan = {
val pathOption = table.storage.locationUri.map("path" -> _)
val pathOption = table.storage.locationUriString.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.command

import java.net.URI

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -409,7 +411,7 @@ class DDLCommandSuite extends PlanTest {
val expectedTableDesc = CatalogTable(
identifier = TableIdentifier("my_tab"),
tableType = CatalogTableType.EXTERNAL,
storage = CatalogStorageFormat.empty.copy(locationUri = Some("/tmp/file")),
storage = CatalogStorageFormat.empty.copy(locationUri = Some(new URI("/tmp/file"))),
schema = new StructType().add("a", IntegerType).add("b", StringType),
provider = Some("parquet"))

Expand Down
Loading