Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -988,9 +988,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.orElse(Some("org.apache.hadoop.mapred.TextInputFormat")),
outputFormat = defaultHiveSerde.flatMap(_.outputFormat)
.orElse(Some("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")),
// Note: Keep this unspecified because we use the presence of the serde to decide
// whether to convert a table created by CTAS to a datasource table.
serde = None,
serde = defaultHiveSerde.flatMap(_.serde),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is kept as unspecified because it is intended to write the table with Hive write path. If we specify serde here, it will be converted to datasource table. Is it ok? cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @yhuai to confirm

Copy link
Member

@gatorsmile gatorsmile Sep 22, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not valid now. This was removed by the PR: #13386 (See the code changes made in HiveMetastoreCatalog.scala)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current checking conditions are based on ctx.createFileFormat and ctx.rowFormat. Thus, I think this PR looks ok. : )

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya @cloud-fan Actually i am not sure, if the above comment is in sync with the code. When we had this comment, we used to have CreateTableAsSelectLogicalPlan to represent the CTAS case and we used to check for serde's presence to determine whether or not to convert it to a data source table like following.

   if (sessionState.convertCTAS && table.storage.serde.isEmpty) {
          // Do the conversion when spark.sql.hive.convertCTAS is true and the query
          // does not specify any storage format (file format and storage handler).
          if (table.identifier.database.isDefined) {
            throw new AnalysisException(
              "Cannot specify database name in a CTAS statement " +
                "when spark.sql.hive.convertCTAS is set to true.")
          }

          val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
          CreateTableUsingAsSelect(
            TableIdentifier(desc.identifier.table),
            conf.defaultDataSourceName,
            temporary = false,
            Array.empty[String],
            bucketSpec = None,
            mode,
            options = Map.empty[String, String],
            child
          )
        } else {
          val desc = if (table.storage.serde.isEmpty) {
            // add default serde
            table.withNewStorage(
              serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
          } else {
            table
          }

I think this code has changed and moved to SparkSqlParser ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah. looks ok now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In DataSinks strategy, we set a default serde for CreateTable if tableDesc.storage.serde.isEmpty. I think we should also remove it and add .orElse(Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya Please see my comment below

compressed = false,
properties = Map())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.{TestHive, TestHiveSingleton}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SQLTestUtils
import org.apache.spark.sql.types.StructType

class HiveDDLCommandSuite extends PlanTest {
class HiveDDLCommandSuite extends PlanTest with SQLTestUtils with TestHiveSingleton {
val parser = TestHive.sessionState.sqlParser

private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
Expand Down Expand Up @@ -556,4 +558,24 @@ class HiveDDLCommandSuite extends PlanTest {
assert(partition2.get.apply("c") == "1" && partition2.get.apply("d") == "2")
}

test("Test the default fileformat for Hive-serde tables") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a test for withSQLConf("hive.default.fileformat" -> "")?

withSQLConf("hive.default.fileformat" -> "orc") {
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
assert(exists)
assert(desc.storage.inputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
assert(desc.storage.outputFormat == Some("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
assert(desc.storage.serde == Some("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
}

withSQLConf("hive.default.fileformat" -> "parquet") {
val (desc, exists) = extractTableDesc("CREATE TABLE IF NOT EXISTS fileformat_test (id int)")
assert(exists)
val input = desc.storage.inputFormat
val output = desc.storage.outputFormat
val serde = desc.storage.serde
assert(input == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
assert(output == Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
assert(serde == Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

def checkRelation(
tableName: String,
isDataSourceParquet: Boolean,
isDataSourceTable: Boolean,
format: String,
userSpecifiedLocation: Option[String] = None): Unit = {
val relation = EliminateSubqueryAliases(
Expand All @@ -425,7 +425,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
relation match {
case LogicalRelation(r: HadoopFsRelation, _, _) =>
if (!isDataSourceParquet) {
if (!isDataSourceTable) {
fail(
s"${classOf[MetastoreRelation].getCanonicalName} is expected, but found " +
s"${HadoopFsRelation.getClass.getCanonicalName}.")
Expand All @@ -438,7 +438,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(catalogTable.provider.get === format)

case r: MetastoreRelation =>
if (isDataSourceParquet) {
if (isDataSourceTable) {
fail(
s"${HadoopFsRelation.getClass.getCanonicalName} is expected, but found " +
s"${classOf[MetastoreRelation].getCanonicalName}.")
Expand All @@ -448,8 +448,15 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
assert(r.catalogTable.storage.locationUri.get === location)
case None => // OK.
}
// Also make sure that the format is the desired format.
// Also make sure that the format and serde are as desired.
assert(catalogTable.storage.inputFormat.get.toLowerCase.contains(format))
assert(catalogTable.storage.outputFormat.get.toLowerCase.contains(format))
val serde = catalogTable.storage.serde.get
format match {
case "sequence" | "text" => assert(serde.contains("LazySimpleSerDe"))
case "rcfile" => assert(serde.contains("LazyBinaryColumnarSerDe"))
case _ => assert(serde.toLowerCase.contains(format))
}
}

// When a user-specified location is defined, the table type needs to be EXTERNAL.
Expand Down Expand Up @@ -511,6 +518,30 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
}
}

test("CTAS with default fileformat") {
val table = "ctas1"
val ctas = s"CREATE TABLE IF NOT EXISTS $table SELECT key k, value FROM src"
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withSQLConf("hive.default.fileformat" -> "textfile") {
withTable(table) {
sql(ctas)
// We should use parquet here as that is the default datasource fileformat. The default
// datasource file format is controlled by `spark.sql.sources.default` configuration.
// This testcase verifies that setting `hive.default.fileformat` has no impact on
// the target table's fileformat in case of CTAS.
assert(sessionState.conf.defaultDataSourceName === "parquet")
checkRelation(table, isDataSourceTable = true, "parquet")
}
}
withSQLConf("spark.sql.sources.default" -> "orc") {
withTable(table) {
sql(ctas)
checkRelation(table, isDataSourceTable = true, "orc")
}
}
}
}

test("CTAS without serde with location") {
withSQLConf(SQLConf.CONVERT_CTAS.key -> "true") {
withTempDir { dir =>
Expand Down