From bd69beb2275931fad5764579e75a2511e5d096fc Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Tue, 22 Mar 2016 09:44:14 -0700 Subject: [PATCH 1/8] Use ORC data source for SQL queries on ORC tables --- .../apache/spark/sql/hive/HiveContext.scala | 12 ++ .../spark/sql/hive/HiveMetastoreCatalog.scala | 203 ++++++++++++++---- .../spark/sql/hive/HiveSessionState.scala | 1 + .../spark/sql/hive/orc/OrcQuerySuite.scala | 29 +++ 4 files changed, 205 insertions(+), 40 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index ca3ce43591f5f..62f5de3b81db2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -154,6 +154,13 @@ class HiveContext private[hive]( protected[sql] def convertMetastoreParquetWithSchemaMerging: Boolean = getConf(CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING) + /** + * When true, enables an experimental feature where metastore tables that use the Orc SerDe + * are automatically converted to use the Spark SQL ORC table scan, instead of the Hive + * SerDe. + */ + protected[sql] def convertMetastoreOrc: Boolean = getConf(CONVERT_METASTORE_ORC) + /** * When true, a table created by a Hive CTAS statement (no USING clause) will be * converted to a data source table, using the data source set by spark.sql.sources.default. @@ -442,6 +449,11 @@ private[hive] object HiveContext extends Logging { "different Parquet data files. This configuration is only effective " + "when \"spark.sql.hive.convertMetastoreParquet\" is true.") + val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc", + defaultValue = Some(true), + doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + + "the built in support.") + val CONVERT_CTAS = booleanConf("spark.sql.hive.convertCTAS", defaultValue = Some(false), doc = "When true, a table created by a Hive CTAS statement (no USING clause) will be " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c7066d73631af..9a2cd50554999 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -42,10 +42,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.execution.{datasources, FileRelation} import org.apache.spark.sql.execution.datasources._ -import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource, ParquetRelation} +import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.HiveNativeCommand -import org.apache.spark.sql.sources.{HadoopFsRelation, HDFSFileCatalog} +import org.apache.spark.sql.hive.orc.{DefaultSource => OrcDefaultSource} +import org.apache.spark.sql.sources.{FileFormat, HadoopFsRelation, HDFSFileCatalog} import org.apache.spark.sql.types._ private[hive] case class HiveSerDe( @@ -440,6 +441,56 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } + private def getCached( + tableIdentifier: QualifiedTableName, + metastoreRelation: MetastoreRelation, + schemaInMetastore: StructType, + expectedFileFormat: Class[_ <: FileFormat], + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + + cachedDataSourceTables.getIfPresent(tableIdentifier) match { + case null => None // Cache miss + case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) => + val pathsInMetastore = metastoreRelation.table.storage.locationUri.toSeq + val cachedRelationFileFormatClass = relation.fileFormat.getClass + + expectedFileFormat match { + case `cachedRelationFileFormatClass` => + // If we have the same paths, same schema, and same partition spec, + // we will use the cached relation. + val useCached = + relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && + logical.schema.sameType(schemaInMetastore) && + relation.partitionSpec == partitionSpecInMetastore.getOrElse { + PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) + } + + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + cachedDataSourceTables.invalidate(tableIdentifier) + None + } + case _ => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} " + + s"should be stored as $expectedFileFormat. However, we are getting " + + s"a ${relation.fileFormat} from the metastore cache. This cached " + + s"entry will be invalidated.") + cachedDataSourceTables.invalidate(tableIdentifier) + None + } + case other => + logWarning( + s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + + s"as $expectedFileFormat. However, we are getting a $other from the metastore cache. " + + s"This cached entry will be invalidated.") + cachedDataSourceTables.invalidate(tableIdentifier) + None + } + } + private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging @@ -454,40 +505,6 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { - cachedDataSourceTables.getIfPresent(tableIdentifier) match { - case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => - // If we have the same paths, same schema, and same partition spec, - // we will use the cached Parquet Relation. - val useCached = - parquetRelation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && - logical.schema.sameType(metastoreSchema) && - parquetRelation.partitionSpec == partitionSpecInMetastore.getOrElse { - PartitionSpec(StructType(Nil), Array.empty[datasources.PartitionDirectory]) - } - - if (useCached) { - Some(logical) - } else { - // If the cached relation is not updated, we invalidate it right away. - cachedDataSourceTables.invalidate(tableIdentifier) - None - } - case other => - logWarning( - s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} should be stored " + - s"as Parquet. However, we are getting a $other from the metastore cache. " + - s"This cached entry will be invalidated.") - cachedDataSourceTables.invalidate(tableIdentifier) - None - } - } - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) @@ -504,14 +521,15 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val cached = getCached( tableIdentifier, - metastoreRelation.table.storage.locationUri.toSeq, + metastoreRelation, metastoreSchema, + classOf[ParquetDefaultSource], Some(partitionSpec)) val parquetRelation = cached.getOrElse { val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) - val format = new DefaultSource() + val format = new ParquetDefaultSource() val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles()) val mergedSchema = inferredSchema.map { inferred => @@ -524,7 +542,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte partitionSchema = partitionSchema, dataSchema = mergedSchema, bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. - fileFormat = new DefaultSource(), + fileFormat = new ParquetDefaultSource(), options = parquetOptions) val created = LogicalRelation(relation) @@ -536,7 +554,11 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - val cached = getCached(tableIdentifier, paths, metastoreSchema, None) + val cached = getCached(tableIdentifier, + metastoreRelation, + metastoreSchema, + classOf[ParquetDefaultSource], + None) val parquetRelation = cached.getOrElse { val created = LogicalRelation( @@ -592,6 +614,107 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } + private def convertToOrcRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { + val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) + + val tableIdentifier = + QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + + val orcOptions = Map[String, String]() + + val result = if (metastoreRelation.hiveQlTable.isPartitioned) { + val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) + val partitionColumnDataTypes = partitionSchema.map(_.dataType) + // We're converting the entire table into OrcRelation, so predicates to Hive metastore + // are empty. + val partitions = metastoreRelation.getHiveQlPartitions().map { p => + val location = p.getLocation + val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { + case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) + }) + PartitionDirectory(values, location) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + + val cached = getCached( + tableIdentifier, + metastoreRelation, + metastoreSchema, + classOf[OrcDefaultSource], + Some(partitionSpec)) + + val orcRelation = cached.getOrElse { + val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil + val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) + val format = new OrcDefaultSource() + val inferredSchema = format.inferSchema(hive, orcOptions, fileCatalog.allFiles()).get + + val relation = HadoopFsRelation( + sqlContext = hive, + location = fileCatalog, + partitionSchema = partitionSchema, + dataSchema = inferredSchema, + bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. + fileFormat = new OrcDefaultSource(), + options = orcOptions) + + val created = LogicalRelation(relation) + cachedDataSourceTables.put(tableIdentifier, created) + created + } + + orcRelation + } else { + val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) + + val cached = getCached(tableIdentifier, + metastoreRelation, + metastoreSchema, + classOf[OrcDefaultSource], + None) + val orcRelation = cached.getOrElse { + val created = + LogicalRelation( + DataSource( + sqlContext = hive, + paths = paths, + userSpecifiedSchema = Some(metastoreRelation.schema), + options = orcOptions, + className = "orc").resolveRelation()) + + cachedDataSourceTables.put(tableIdentifier, created) + created + } + + orcRelation + } + result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) + } + + /** + * When scanning Metastore ORC tables, convert them to ORC data source relations + * for better performance. + */ + object OrcConversions extends Rule[LogicalPlan] { + private def isConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && + hive.convertMetastoreOrc + } + + override def apply(plan: LogicalPlan): LogicalPlan = { + if (!plan.resolved || plan.analyzed) { + return plan + } + + plan transformUp { + // Read path + case relation: MetastoreRelation if isConvertMetastoreOrc(relation) => + val orcRelation = convertToOrcRelation(relation) + SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation) + } + } + } + /** * Creates any tables required for query execution. * For example, because of a CREATE TABLE X AS statement. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index caa7f296ed16a..a4add94848428 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -56,6 +56,7 @@ private[hive] class HiveSessionState(ctx: HiveContext) extends SessionState(ctx) new Analyzer(catalog, functionRegistry, conf) { override val extendedResolutionRules = catalog.ParquetConversions :: + catalog.OrcConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: python.ExtractPythonUDFs :: diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 92f424bac7ff3..e72328a849304 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -400,4 +400,33 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { } } } + + test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") { + withTempPath { dir => + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + val path = dir.getCanonicalPath + + withTable("dummy_orc") { + withTempTable("single") { + sqlContext.sql( + s"""CREATE TABLE dummy_orc(key INT, value STRING) + |STORED AS ORC + |LOCATION '$path' + """.stripMargin) + + val singleRowDF = Seq((0, "foo")).toDF("key", "value").coalesce(1) + singleRowDF.registerTempTable("single") + + sqlContext.sql( + s"""INSERT INTO TABLE dummy_orc + |SELECT key, value FROM single + """.stripMargin).collect() + + val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0") + checkAnswer(df, singleRowDF) + } + } + } + } + } } From e76b474a01a61d29a758c9dcb100e365d4f2ca00 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 23 Mar 2016 14:22:21 -0700 Subject: [PATCH 2/8] Changes as per review comments (round #1) - Merged the conversion methods for parquet and orc in one single method `convertToLogicalRelation` as suggested by @@marmbrus - @liancheng's comment about checking for bucket spec - 4 tests were failing because of the change. My change alters the plan for the queries. eg. Query from `date_serde.q` : ``` select * from date_serde_orc ``` Plan (BEFORE) ``` Project [c1#282,c2#283] +- MetastoreRelation default, date_serde_orc, None ``` Plan (AFTER) ``` Project [c1#287,c2#288] +- SubqueryAlias date_serde_orc +- Relation[c1#287,c2#288] HadoopFiles ``` Setting `CONVERT_METASTORE_ORC` to `false` by default to mitigate test failures. Other option was to make `SQLBuilder` work with `Relation` but that is out of scope of the current PR. In my opinion, it would be better to have this config turned on by default so that anyone trying out Spark out of the box gets better perf. w/o needing to tweak such configs. Open items: - @liancheng's review comment : Test case added does not verify if the new codepath is hit --- .../apache/spark/sql/hive/HiveContext.scala | 2 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 205 +++++++----------- .../spark/sql/hive/orc/OrcQuerySuite.scala | 6 +- 3 files changed, 89 insertions(+), 124 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 62f5de3b81db2..5e5eae07f777d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -450,7 +450,7 @@ private[hive] object HiveContext extends Logging { "when \"spark.sql.hive.convertMetastoreParquet\" is true.") val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc", - defaultValue = Some(true), + defaultValue = Some(false), doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + "the built in support.") diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9a2cd50554999..32718639006a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.parser.DataTypeParser import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ -import org.apache.spark.sql.execution.{datasources, FileRelation} +import org.apache.spark.sql.execution.FileRelation import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetDefaultSource, ParquetRelation} import org.apache.spark.sql.hive.client._ @@ -446,6 +446,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte metastoreRelation: MetastoreRelation, schemaInMetastore: StructType, expectedFileFormat: Class[_ <: FileFormat], + expectedBucketSpec: Option[BucketSpec], partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { @@ -461,6 +462,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val useCached = relation.location.paths.map(_.toString).toSet == pathsInMetastore.toSet && logical.schema.sameType(schemaInMetastore) && + relation.bucketSpec == expectedBucketSpec && relation.partitionSpec == partitionSpecInMetastore.getOrElse { PartitionSpec(StructType(Nil), Array.empty[PartitionDirectory]) } @@ -491,24 +493,20 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte } } - private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { + private def convertToLogicalRelation(metastoreRelation: MetastoreRelation, + options: Map[String, String], + defaultSource: FileFormat, + fileFormatClass: Class[_ <: FileFormat], + fileType: String): LogicalRelation = { val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) - val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging - - val parquetOptions = Map( - ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, - ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( - metastoreRelation.tableName, - Some(metastoreRelation.databaseName) - ).unquotedString - ) val tableIdentifier = QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) + val bucketSpec = None // We don't support hive bucketed tables, only ones we write out. val result = if (metastoreRelation.hiveQlTable.isPartitioned) { val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into ParquetRelation, so predicates to Hive metastore + // We're converting the entire table into HadoopFsRelation, so predicates to Hive metastore // are empty. val partitions = metastoreRelation.getHiveQlPartitions().map { p => val location = p.getLocation @@ -523,57 +521,63 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte tableIdentifier, metastoreRelation, metastoreSchema, - classOf[ParquetDefaultSource], + fileFormatClass, + bucketSpec, Some(partitionSpec)) - val parquetRelation = cached.getOrElse { + val hadoopFsRelation = cached.getOrElse { val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) - val format = new ParquetDefaultSource() - val inferredSchema = format.inferSchema(hive, parquetOptions, fileCatalog.allFiles()) - val mergedSchema = inferredSchema.map { inferred => - ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) - }.getOrElse(metastoreSchema) + val inferredSchema = if (fileType.equals("parquet")) { + val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles()) + inferredSchema.map { inferred => + ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) + }.getOrElse(metastoreSchema) + } else { + defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get + } val relation = HadoopFsRelation( sqlContext = hive, location = fileCatalog, partitionSchema = partitionSchema, - dataSchema = mergedSchema, - bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. - fileFormat = new ParquetDefaultSource(), - options = parquetOptions) + dataSchema = inferredSchema, + bucketSpec = bucketSpec, + fileFormat = defaultSource, + options = options) val created = LogicalRelation(relation) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + hadoopFsRelation } else { val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) val cached = getCached(tableIdentifier, metastoreRelation, metastoreSchema, - classOf[ParquetDefaultSource], + fileFormatClass, + bucketSpec, None) - val parquetRelation = cached.getOrElse { + val logicalRelation = cached.getOrElse { val created = LogicalRelation( DataSource( sqlContext = hive, paths = paths, userSpecifiedSchema = Some(metastoreRelation.schema), - options = parquetOptions, - className = "parquet").resolveRelation()) + bucketSpec = bucketSpec, + options = options, + className = fileType).resolveRelation()) cachedDataSourceTables.put(tableIdentifier, created) created } - parquetRelation + logicalRelation } result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) } @@ -583,6 +587,27 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte * data source relations for better performance. */ object ParquetConversions extends Rule[LogicalPlan] { + private def shouldConvertMetastoreParquet(relation: MetastoreRelation): Boolean = { + relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") && + hive.convertMetastoreParquet + } + + private def convertToParquetRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new ParquetDefaultSource() + val fileFormatClass = classOf[ParquetDefaultSource] + + val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging + val options = Map( + ParquetRelation.MERGE_SCHEMA -> mergeSchema.toString, + ParquetRelation.METASTORE_TABLE_NAME -> TableIdentifier( + relation.tableName, + Some(relation.databaseName) + ).unquotedString + ) + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "parquet") + } + override def apply(plan: LogicalPlan): LogicalPlan = { if (!plan.resolved || plan.analyzed) { return plan @@ -592,123 +617,61 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte // Write path case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Write path case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) // Inserting into partitioned table is not supported in Parquet data source (yet). - if !r.hiveQlTable.isPartitioned && hive.convertMetastoreParquet && - r.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => - val parquetRelation = convertToParquetRelation(r) - InsertIntoTable(parquetRelation, partition, child, overwrite, ifNotExists) + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreParquet(r) => + InsertIntoTable(convertToParquetRelation(r), partition, child, overwrite, ifNotExists) // Read path - case relation: MetastoreRelation if hive.convertMetastoreParquet && - relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") => + case relation: MetastoreRelation if shouldConvertMetastoreParquet(relation) => val parquetRelation = convertToParquetRelation(relation) SubqueryAlias(relation.alias.getOrElse(relation.tableName), parquetRelation) } } } - private def convertToOrcRelation(metastoreRelation: MetastoreRelation): LogicalRelation = { - val metastoreSchema = StructType.fromAttributes(metastoreRelation.output) - - val tableIdentifier = - QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) - - val orcOptions = Map[String, String]() - - val result = if (metastoreRelation.hiveQlTable.isPartitioned) { - val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys) - val partitionColumnDataTypes = partitionSchema.map(_.dataType) - // We're converting the entire table into OrcRelation, so predicates to Hive metastore - // are empty. - val partitions = metastoreRelation.getHiveQlPartitions().map { p => - val location = p.getLocation - val values = InternalRow.fromSeq(p.getValues.asScala.zip(partitionColumnDataTypes).map { - case (rawValue, dataType) => Cast(Literal(rawValue), dataType).eval(null) - }) - PartitionDirectory(values, location) - } - val partitionSpec = PartitionSpec(partitionSchema, partitions) - - val cached = getCached( - tableIdentifier, - metastoreRelation, - metastoreSchema, - classOf[OrcDefaultSource], - Some(partitionSpec)) - - val orcRelation = cached.getOrElse { - val paths = new Path(metastoreRelation.table.storage.locationUri.get) :: Nil - val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) - val format = new OrcDefaultSource() - val inferredSchema = format.inferSchema(hive, orcOptions, fileCatalog.allFiles()).get - - val relation = HadoopFsRelation( - sqlContext = hive, - location = fileCatalog, - partitionSchema = partitionSchema, - dataSchema = inferredSchema, - bucketSpec = None, // We don't support hive bucketed tables, only ones we write out. - fileFormat = new OrcDefaultSource(), - options = orcOptions) - - val created = LogicalRelation(relation) - cachedDataSourceTables.put(tableIdentifier, created) - created - } - - orcRelation - } else { - val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString) - - val cached = getCached(tableIdentifier, - metastoreRelation, - metastoreSchema, - classOf[OrcDefaultSource], - None) - val orcRelation = cached.getOrElse { - val created = - LogicalRelation( - DataSource( - sqlContext = hive, - paths = paths, - userSpecifiedSchema = Some(metastoreRelation.schema), - options = orcOptions, - className = "orc").resolveRelation()) - - cachedDataSourceTables.put(tableIdentifier, created) - created - } - - orcRelation - } - result.copy(expectedOutputAttributes = Some(metastoreRelation.output)) - } - /** - * When scanning Metastore ORC tables, convert them to ORC data source relations - * for better performance. - */ + * When scanning Metastore ORC tables, convert them to ORC data source relations + * for better performance. + */ object OrcConversions extends Rule[LogicalPlan] { - private def isConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { + private def shouldConvertMetastoreOrc(relation: MetastoreRelation): Boolean = { relation.tableDesc.getSerdeClassName.toLowerCase.contains("orc") && hive.convertMetastoreOrc } + private def convertToOrcRelation(relation: MetastoreRelation): LogicalRelation = { + val defaultSource = new OrcDefaultSource() + val fileFormatClass = classOf[OrcDefaultSource] + val options = Map[String, String]() + + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + } + override def apply(plan: LogicalPlan): LogicalPlan = { if (!plan.resolved || plan.analyzed) { return plan } plan transformUp { + // Write path + case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + + // Write path + case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite, ifNotExists) + // Inserting into partitioned table is not supported in Orc data source (yet). + if !r.hiveQlTable.isPartitioned && shouldConvertMetastoreOrc(r) => + InsertIntoTable(convertToOrcRelation(r), partition, child, overwrite, ifNotExists) + // Read path - case relation: MetastoreRelation if isConvertMetastoreOrc(relation) => + case relation: MetastoreRelation if shouldConvertMetastoreOrc(relation) => val orcRelation = convertToOrcRelation(relation) SubqueryAlias(relation.alias.getOrElse(relation.tableName), orcRelation) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index e72328a849304..0138eb571f4d5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.CompressionKind +import org.apache.spark.sql.hive.HiveContext import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ @@ -403,7 +404,8 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { test("SPARK-14070 Use ORC data source for SQL queries on ORC tables") { withTempPath { dir => - withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") { + withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true", + HiveContext.CONVERT_METASTORE_ORC.key -> "true") { val path = dir.getCanonicalPath withTable("dummy_orc") { @@ -420,7 +422,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { sqlContext.sql( s"""INSERT INTO TABLE dummy_orc |SELECT key, value FROM single - """.stripMargin).collect() + """.stripMargin) val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0") checkAnswer(df, singleRowDF) From ca993556bd3b9b10d772e482f48fdfa95c7eceba Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 23 Mar 2016 14:32:49 -0700 Subject: [PATCH 3/8] Fixed checkstyle --- .../scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 0138eb571f4d5..625b1ac1bf1a2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -22,11 +22,11 @@ import java.nio.charset.StandardCharsets import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc.CompressionKind -import org.apache.spark.sql.hive.HiveContext import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ import org.apache.spark.sql.internal.SQLConf From 28414cd6d3466a2e98e1a0fa7d4fdab65488042c Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Wed, 23 Mar 2016 15:59:09 -0700 Subject: [PATCH 4/8] Setting `CONVERT_METASTORE_ORC` to `true` by default. This means that anyone downloading Spark and trying it out will get the best performance w/o needing to know about this feature and tweak this config. For the tests which had failed, made `HiveCompatibilitySuite` to set that config to `false` so that they still produce the plan as per the old way. Ran `build/sbt scalastyle catalyst/compile sql/compile hive/compile` Ran `build/sbt -Phive hive/test-only *HiveCompatibilitySuite` --- .../spark/sql/hive/execution/HiveCompatibilitySuite.scala | 8 +++++++- .../scala/org/apache/spark/sql/hive/HiveContext.scala | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 650797f7683e1..730e3188926c8 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -20,9 +20,10 @@ package org.apache.spark.sql.hive.execution import java.io.File import java.util.{Locale, TimeZone} -import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.scalatest.BeforeAndAfter +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.internal.SQLConf @@ -38,6 +39,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { private val originalLocale = Locale.getDefault private val originalColumnBatchSize = TestHive.conf.columnBatchSize private val originalInMemoryPartitionPruning = TestHive.conf.inMemoryPartitionPruning + private val originalConvertMetastoreOrc = TestHive.convertMetastoreOrc def testCases: Seq[(String, File)] = { hiveQueryDir.listFiles.map(f => f.getName.stripSuffix(".q") -> f) @@ -56,6 +58,9 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, true) // Use Hive hash expression instead of the native one TestHive.sessionState.functionRegistry.unregisterFunction("hash") + // Ensures that the plans generation use metastore relation and not OrcRelation + // Was done because SqlBuilder does not work with plans having logical relation + TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, false) RuleExecutor.resetTime() } @@ -66,6 +71,7 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { Locale.setDefault(originalLocale) TestHive.setConf(SQLConf.COLUMN_BATCH_SIZE, originalColumnBatchSize) TestHive.setConf(SQLConf.IN_MEMORY_PARTITION_PRUNING, originalInMemoryPartitionPruning) + TestHive.setConf(HiveContext.CONVERT_METASTORE_ORC, originalConvertMetastoreOrc) TestHive.sessionState.functionRegistry.restore() // For debugging dump some statistics about how much time was spent in various optimizer rules. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 5e5eae07f777d..62f5de3b81db2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -450,7 +450,7 @@ private[hive] object HiveContext extends Logging { "when \"spark.sql.hive.convertMetastoreParquet\" is true.") val CONVERT_METASTORE_ORC = booleanConf("spark.sql.hive.convertMetastoreOrc", - defaultValue = Some(false), + defaultValue = Some(true), doc = "When set to false, Spark SQL will use the Hive SerDe for ORC tables instead of " + "the built in support.") From 0b1dd4891a76ad971bd67aff2a232709ee1ea602 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 24 Mar 2016 14:23:53 -0700 Subject: [PATCH 5/8] dummy commit --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 32718639006a8..983e029b5f9c9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -649,7 +649,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val fileFormatClass = classOf[OrcDefaultSource] val options = Map[String, String]() - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") } override def apply(plan: LogicalPlan): LogicalPlan = { From 2b7a0a4533688352982887d6fe9658ca7eaa9a2e Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 24 Mar 2016 14:24:20 -0700 Subject: [PATCH 6/8] revert dummy commit --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 983e029b5f9c9..32718639006a8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -649,7 +649,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val fileFormatClass = classOf[OrcDefaultSource] val options = Map[String, String]() - convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") + convertToLogicalRelation(relation, options, defaultSource, fileFormatClass, "orc") } override def apply(plan: LogicalPlan): LogicalPlan = { From f73b7488281bc89ae9a9510390df8497044d507f Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Thu, 24 Mar 2016 14:46:48 -0700 Subject: [PATCH 7/8] Modified the test case as per @liancheng's suggestion --- .../org/apache/spark/sql/hive/orc/OrcQuerySuite.scala | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 625b1ac1bf1a2..5ef8194f28881 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -26,6 +26,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.test.TestHive.implicits._ @@ -426,6 +427,13 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest { val df = sqlContext.sql("SELECT * FROM dummy_orc WHERE key=0") checkAnswer(df, singleRowDF) + + val queryExecution = df.queryExecution + queryExecution.analyzed.collectFirst { + case _: LogicalRelation => () + }.getOrElse { + fail(s"Expecting the query plan to have LogicalRelation, but got:\n$queryExecution") + } } } } From 3c25e7ec503f3bc705d2635a13d235bf6e0ec5a1 Mon Sep 17 00:00:00 2001 From: Tejas Patil Date: Sat, 26 Mar 2016 15:52:24 -0700 Subject: [PATCH 8/8] rebased --- .../scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index aa44cba4b5641..1785e6d0c733c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -64,6 +64,7 @@ class HiveSessionCatalog( private val metastoreCatalog = new HiveMetastoreCatalog(client, context) val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions + val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables val PreInsertionCasts: Rule[LogicalPlan] = metastoreCatalog.PreInsertionCasts