From 1429886974099fd066c08aac597a841bb9908705 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 9 May 2016 16:52:47 -0700 Subject: [PATCH 1/3] [SPARK-4131] [SQL] Support INSERT OVERWRITE [LOCAL] DIRECTORY '/path/to/dir' [ROW FORMAT row_format] [STORED AS file_format] query. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 + .../UnsupportedOperationChecker.scala | 3 + .../sql/catalyst/parser/AstBuilder.scala | 116 ++++++++++++++- .../plans/logical/basicLogicalOperators.scala | 12 ++ .../spark/sql/execution/SparkSqlParser.scala | 97 ------------- .../spark/sql/hive/HiveStrategies.scala | 4 + .../sql/hive/execution/InsertIntoDir.scala | 137 ++++++++++++++++++ .../hive/execution/InsertIntoHiveTable.scala | 56 +------ .../sql/hive/execution/SaveAsHiveFile.scala | 77 ++++++++++ .../spark/sql/hive/hiveWriterContainers.scala | 5 +- 10 files changed, 353 insertions(+), 156 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 403191af5e5be..6f7bec50ecdf5 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -202,6 +202,7 @@ query insertInto : INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)? | INSERT INTO TABLE? tableIdentifier partitionSpec? + | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING (rowFormat)? (STORED AS format=IDENTIFIER)? ; partitionSpecLocation @@ -717,6 +718,7 @@ WITH: 'WITH'; VALUES: 'VALUES'; CREATE: 'CREATE'; TABLE: 'TABLE'; +DIRECTORY: 'DIRECTORY'; VIEW: 'VIEW'; REPLACE: 'REPLACE'; INSERT: 'INSERT'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala index 0e08bf013c8d9..a2d24236470d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala @@ -55,6 +55,9 @@ object UnsupportedOperationChecker { case _: InsertIntoTable => throwError("InsertIntoTable is not supported with streaming DataFrames/Datasets") + case _: InsertIntoDir => + throwError("InsertIntoDir is not supported with streaming DataFrames/Datasets") + case Aggregate(_, _, child) if child.isStreaming => if (outputMode == Append) { throwError( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index a13c03a529f37..c8d3b47f9975a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -27,6 +27,7 @@ import org.antlr.v4.runtime.tree.{ParseTree, RuleNode, TerminalNode} import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.parser.SqlBaseParser._ import org.apache.spark.sql.catalyst.plans._ @@ -193,20 +194,125 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { } /** - * Add an INSERT INTO [TABLE]/INSERT OVERWRITE TABLE operation to the logical plan. + * A table property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a table property + * identifier. + */ + override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * Convert a table property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitTablePropertyList( + ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.tableProperty.asScala.map { property => + val key = visitTablePropertyKey(property.key) + val value = Option(property.value).map(string).orNull + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties, ctx) + properties.toMap + } + + /** Empty storage format for default values and copies. */ + protected val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) + + /** + * Create a [[CatalogStorageFormat]] used for creating tables. + * + * Example format: + * {{{ + * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] + * }}} + * + * OR + * + * {{{ + * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] + * [COLLECTION ITEMS TERMINATED BY char] + * [MAP KEYS TERMINATED BY char] + * [LINES TERMINATED BY char] + * [NULL DEFINED AS char] + * }}} + */ + protected def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { + ctx match { + case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) + case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) + } + } + + /** + * Create SERDE row format name and properties pair. + */ + override def visitRowFormatSerde( + ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { + import ctx._ + EmptyStorageFormat.copy( + serde = Option(string(name)), + serdeProperties = Option(tablePropertyList).map(visitTablePropertyList).getOrElse(Map.empty)) + } + + /** + * Create a delimited row format properties object. + */ + override def visitRowFormatDelimited( + ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { + // Collect the entries if any. + def entry(key: String, value: Token): Seq[(String, String)] = { + Option(value).toSeq.map(x => key -> string(x)) + } + // TODO we need proper support for the NULL format. + val entries = + entry("field.delim", ctx.fieldsTerminatedBy) ++ + entry("serialization.format", ctx.fieldsTerminatedBy) ++ + entry("escape.delim", ctx.escapedBy) ++ + // The following typo is inherited from Hive... + entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ + entry("mapkey.delim", ctx.keysTerminatedBy) ++ + Option(ctx.linesSeparatedBy).toSeq.map { token => + val value = string(token) + assert( + value == "\n", + s"LINES TERMINATED BY only supports newline '\\n' right now: $value", + ctx) + "line.delim" -> value + } + EmptyStorageFormat.copy(serdeProperties = entries.toMap) + } + + /** + * Add an INSERT INTO [TABLE] / INSERT OVERWRITE TABLE / INSERT OVERWRITE DIRECTORY + * operation to the logical plan. */ private def withInsertInto( ctx: InsertIntoContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { - val tableIdent = visitTableIdentifier(ctx.tableIdentifier) + val tableIdent = Option(ctx.tableIdentifier) + .map(ti => Option(visitTableIdentifier(ti))).getOrElse(None) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) - InsertIntoTable( - UnresolvedRelation(tableIdent, None), + tableIdent.map(ti => InsertIntoTable( + UnresolvedRelation(ti, None), partitionKeys, query, ctx.OVERWRITE != null, - ctx.EXISTS != null) + ctx.EXISTS != null)).getOrElse( + InsertIntoDir( + string(ctx.path), + ctx.LOCAL != null, + Option(ctx.format).map(_.getText).getOrElse("textfile"), + Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat), + query) + ) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index ca0096eeb2087..720980a49052f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import scala.collection.mutable.ArrayBuffer import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.plans._ @@ -386,6 +387,17 @@ case class InsertIntoTable( } } +case class InsertIntoDir( + path: String, + isLocal: Boolean, + fileFormat: String, + rowFormat: CatalogStorageFormat, + child: LogicalPlan) + extends LogicalPlan { + override def children: Seq[LogicalPlan] = child :: Nil + override def output: Seq[Attribute] = Seq.empty +} + /** * A container for holding named common table expressions (CTEs) and a query plan. * This operator will be removed during analysis and the relations will be substituted into child. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index f85d6062e8d35..d06bb48275ba3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -381,22 +381,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ) } - /** - * Convert a table property list into a key-value map. - * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. - */ - override def visitTablePropertyList( - ctx: TablePropertyListContext): Map[String, String] = withOrigin(ctx) { - val properties = ctx.tableProperty.asScala.map { property => - val key = visitTablePropertyKey(property.key) - val value = Option(property.value).map(string).orNull - key -> value - } - // Check for duplicate property names. - checkDuplicateKeys(properties, ctx) - properties.toMap - } - /** * Parse a key-value map from a [[TablePropertyListContext]], assuming all values are specified. */ @@ -423,22 +407,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { props.keys.toSeq } - /** - * A table property key can either be String or a collection of dot separated elements. This - * function extracts the property key based on whether its a string literal or a table property - * identifier. - */ - override def visitTablePropertyKey(key: TablePropertyKeyContext): String = { - if (key.STRING != null) { - string(key.STRING) - } else { - key.getText - } - } - /** * Create a [[CreateDatabaseCommand]] command. - * * For example: * {{{ * CREATE DATABASE [IF NOT EXISTS] database_name [COMMENT database_comment] @@ -950,9 +920,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) - /** * Create a [[CatalogStorageFormat]]. */ @@ -980,70 +947,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** - * Create a [[CatalogStorageFormat]] used for creating tables. - * - * Example format: - * {{{ - * SERDE serde_name [WITH SERDEPROPERTIES (k1=v1, k2=v2, ...)] - * }}} - * - * OR - * - * {{{ - * DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] - * [COLLECTION ITEMS TERMINATED BY char] - * [MAP KEYS TERMINATED BY char] - * [LINES TERMINATED BY char] - * [NULL DEFINED AS char] - * }}} - */ - private def visitRowFormat(ctx: RowFormatContext): CatalogStorageFormat = withOrigin(ctx) { - ctx match { - case serde: RowFormatSerdeContext => visitRowFormatSerde(serde) - case delimited: RowFormatDelimitedContext => visitRowFormatDelimited(delimited) - } - } - - /** - * Create SERDE row format name and properties pair. - */ - override def visitRowFormatSerde( - ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { - import ctx._ - EmptyStorageFormat.copy( - serde = Option(string(name)), - serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) - } - - /** - * Create a delimited row format properties object. - */ - override def visitRowFormatDelimited( - ctx: RowFormatDelimitedContext): CatalogStorageFormat = withOrigin(ctx) { - // Collect the entries if any. - def entry(key: String, value: Token): Seq[(String, String)] = { - Option(value).toSeq.map(x => key -> string(x)) - } - // TODO we need proper support for the NULL format. - val entries = - entry("field.delim", ctx.fieldsTerminatedBy) ++ - entry("serialization.format", ctx.fieldsTerminatedBy) ++ - entry("escape.delim", ctx.escapedBy) ++ - // The following typo is inherited from Hive... - entry("colelction.delim", ctx.collectionItemsTerminatedBy) ++ - entry("mapkey.delim", ctx.keysTerminatedBy) ++ - Option(ctx.linesSeparatedBy).toSeq.map { token => - val value = string(token) - assert( - value == "\n", - s"LINES TERMINATED BY only supports newline '\\n' right now: $value", - ctx) - "line.delim" -> value - } - EmptyStorageFormat.copy(serdeProperties = entries.toMap) - } - /** * Throw a [[ParseException]] if the user specified incompatible SerDes through ROW FORMAT * and STORED AS. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 71b180e55b58c..908811e581336 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -50,6 +50,10 @@ private[hive] trait HiveStrategies { table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil + case logical.InsertIntoDir( + path, isLocal, fileFormat, rowFormat, child) => + execution.InsertIntoDir( + path, isLocal, fileFormat, rowFormat, planLater(child)) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala new file mode 100644 index 0000000000000..7e428e0ae2f3c --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.File +import java.util.Properties + +import scala.language.existentials + +import antlr.SemanticException +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.ql.io.{HiveIgnoreKeyTextOutputFormat, RCFileOutputFormat} +import org.apache.hadoop.hive.ql.io.orc.{OrcOutputFormat, OrcSerde} +import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe +import org.apache.hadoop.io.Text +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat, TextInputFormat} + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.util.{SerializableJobConf, Utils} + +case class InsertIntoDir( + path: String, + isLocal: Boolean, + fileFormat: String, + rowFormat: CatalogStorageFormat, + child: SparkPlan) extends SaveAsHiveFile { + + @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] + def output: Seq[Attribute] = Seq.empty + + protected[sql] lazy val sideEffectResult: Seq[InternalRow] = { + val hadoopConf = sessionState.newHadoopConf() + + val properties = new Properties() + + val Array(cols, types) = child.output.foldLeft(Array("", ""))((r, a) => { + r(0) = r(0) + a.name + "," + r(1) = r(1) + a.dataType.typeName + ":" + r + }) + + properties.put("columns", cols.dropRight(1)) + properties.put("columns.types", types.dropRight(1)) + + val fileFormatMap = Map( + "orc" -> (classOf[OrcOutputFormat], classOf[OrcSerde]), + "parquet" -> (classOf[MapredParquetOutputFormat], classOf[ParquetHiveSerDe]), +// "rcfile" -> (classOf[RCFileOutputFormat], classOf[LazySimpleSerDe]), + "textfile" -> (classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]], classOf[LazySimpleSerDe]), + "sequencefile" -> (classOf[SequenceFileOutputFormat[Any, Any]], classOf[LazySimpleSerDe]) + ) + + val (ouputFormatClass, serdeClass) = fileFormatMap.getOrElse(fileFormat.toLowerCase, + throw new SemanticException(s"Unrecognized file format in STORED AS clause: $fileFormat," + + s" expected one of ${fileFormatMap.keys.mkString(",")}")) + + properties.put(serdeConstants.SERIALIZATION_LIB, serdeClass.getName) + import scala.collection.JavaConverters._ + properties.putAll(rowFormat.serdeProperties.asJava) + + // if user specified a serde in the ROW FORMAT, use that. + rowFormat.serde.map(properties.put(serdeConstants.SERIALIZATION_LIB, _)) + + val tableDesc = new TableDesc( + classOf[TextInputFormat], + ouputFormatClass, + properties + ) + + val isCompressed = + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean + + val targetPath = new Path(path) + + val fileSinkConf = new FileSinkDesc(targetPath.toString, tableDesc, isCompressed) + + val jobConf = new JobConf(hadoopConf) + jobConf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") + + val jobConfSer = new SerializableJobConf(jobConf) + + val writerContainer = new SparkHiveWriterContainer( + jobConf, + fileSinkConf, + child.output) + + if( !isLocal ) { + FileSystem.get(jobConf).delete(targetPath, true) + } + + @transient val outputClass = writerContainer.newSerializer(tableDesc).getSerializedClass + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer, + isCompressed) + + val outputPath = FileOutputFormat.getOutputPath(jobConf) + if( isLocal ) { + Utils.deleteRecursively(new File(path)) + outputPath.getFileSystem(hadoopConf).copyToLocalFile(true, outputPath, targetPath) + log.info(s"Copied results from ${outputPath} to local dir ${path}") + } else { + log.info(s"Results available at path ${outputPath}") + } + + Seq.empty[InternalRow] + } + + override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray + + protected override def doExecute(): RDD[InternalRow] = { + sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3805674d39589..5db1bcf490c5d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -35,7 +35,7 @@ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException @@ -47,7 +47,7 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryExecNode { + ifNotExists: Boolean) extends SaveAsHiveFile { @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] @transient private val client = sessionState.metadataHive @@ -109,28 +109,6 @@ case class InsertIntoHiveTable( new Path(getStagingDir(path, hadoopConf), "-ext-10000") // Hive uses 10000 } - private def saveAsHiveFile( - rdd: RDD[InternalRow], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: SerializableJobConf, - writerContainer: SparkHiveWriterContainer): Unit = { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - writerContainer.driverSideSetup() - sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) - writerContainer.commitJob() - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -146,18 +124,6 @@ case class InsertIntoHiveTable( val hadoopConf = sessionState.newHadoopConf() val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = - sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean - - if (isCompressed) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - hadoopConf.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) - } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) @@ -203,18 +169,6 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) - // When speculation is on and output committer class name contains "Direct", we should warn - // users that they may loss data if they are using a direct output committer. - val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) - val outputCommitterClass = jobConf.get("mapred.output.committer.class", "") - if (speculationEnabled && outputCommitterClass.contains("Direct")) { - val warningMessage = - s"$outputCommitterClass may be an output committer that writes data directly to " + - "the final location. Because speculation is enabled, this output committer may " + - "cause data loss (see the case in SPARK-10063). If possible, please use a output " + - "committer that does not have this behavior (e.g. FileOutputCommitter)." - logWarning(warningMessage) - } val writerContainer = if (numDynamicPartitions > 0) { val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) @@ -228,12 +182,12 @@ case class InsertIntoHiveTable( new SparkHiveWriterContainer( jobConf, fileSinkConf, - child.output, - table) + child.output) } @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer, + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala new file mode 100644 index 0000000000000..aaf3560375b2c --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.hadoop.mapred.FileOutputFormat + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.UnaryExecNode +import org.apache.spark.sql.hive.HiveShim.ShimFileSinkDesc +import org.apache.spark.sql.hive.SparkHiveWriterContainer +import org.apache.spark.util.SerializableJobConf + +// Base trait from which all hive insert statement physical execution extends. +private[hive] trait SaveAsHiveFile extends UnaryExecNode { + + protected def saveAsHiveFile( + rdd: RDD[InternalRow], + valueClass: Class[_], + fileSinkConf: ShimFileSinkDesc, + conf: SerializableJobConf, + writerContainer: SparkHiveWriterContainer, + isCompressed: Boolean): Unit = { + assert(valueClass != null, "Output value class not set") + + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + conf.value.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) + } + + // When speculation is on and output committer class name contains "Direct", we should warn + // users that they may loss data if they are using a direct output committer. + val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false) + val outputCommitterClass = conf.value.get("mapred.output.committer.class", "") + if (speculationEnabled && outputCommitterClass.contains("Direct")) { + val warningMessage = + s"$outputCommitterClass may be an output committer that writes data directly to " + + "the final location. Because speculation is enabled, this output committer may " + + "cause data loss (see the case in SPARK-10063). If possible, please use a output " + + "committer that does not have this behavior (e.g. FileOutputCommitter)." + logWarning(warningMessage) + } + conf.value.setOutputValueClass(valueClass) + + + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value)) + log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + writerContainer.driverSideSetup() + sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) + writerContainer.commitJob() + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 794fe264ead5d..566b9e24202bf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -52,8 +52,7 @@ import org.apache.spark.util.SerializableJobConf private[hive] class SparkHiveWriterContainer( @transient private val jobConf: JobConf, fileSinkConf: FileSinkDesc, - inputSchema: Seq[Attribute], - table: MetastoreRelation) + inputSchema: Seq[Attribute]) extends Logging with HiveInspectors with Serializable { @@ -218,7 +217,7 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( dynamicPartColNames: Array[String], inputSchema: Seq[Attribute], table: MetastoreRelation) - extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema, table) { + extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) { import SparkHiveDynamicPartitionWriterContainer._ From dddebff86f76d43b152744dfe631e7520f7d6e48 Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Thu, 12 May 2016 10:13:12 -0700 Subject: [PATCH 2/3] Addressing review comments. --- .../spark/sql/hive/execution/InsertIntoDir.scala | 13 +++++++------ .../sql/hive/execution/SaveAsHiveFile.scala | 16 ++++++++-------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala index 7e428e0ae2f3c..66dc4e6ad592a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala @@ -30,9 +30,10 @@ import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.SerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe import org.apache.hadoop.io.Text -import org.apache.hadoop.mapred.{FileOutputFormat, JobConf, SequenceFileOutputFormat, TextInputFormat} +import org.apache.hadoop.mapred._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat @@ -58,19 +59,19 @@ case class InsertIntoDir( val properties = new Properties() - val Array(cols, types) = child.output.foldLeft(Array("", ""))((r, a) => { + val Array(cols, types) = child.output.foldLeft(Array("", "")) { case (r, a) => r(0) = r(0) + a.name + "," r(1) = r(1) + a.dataType.typeName + ":" r - }) + } properties.put("columns", cols.dropRight(1)) properties.put("columns.types", types.dropRight(1)) - val fileFormatMap = Map( + val fileFormatMap = Map[String, (Class[_ <: OutputFormat[_, _]], Class[_ <: SerDe])]( "orc" -> (classOf[OrcOutputFormat], classOf[OrcSerde]), "parquet" -> (classOf[MapredParquetOutputFormat], classOf[ParquetHiveSerDe]), -// "rcfile" -> (classOf[RCFileOutputFormat], classOf[LazySimpleSerDe]), + "rcfile" -> (classOf[RCFileOutputFormat], classOf[LazySimpleSerDe]), "textfile" -> (classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]], classOf[LazySimpleSerDe]), "sequencefile" -> (classOf[SequenceFileOutputFormat[Any, Any]], classOf[LazySimpleSerDe]) ) @@ -110,7 +111,7 @@ case class InsertIntoDir( child.output) if( !isLocal ) { - FileSystem.get(jobConf).delete(targetPath, true) + FileSystem.get(jobConf).delete(targetPath, true) } @transient val outputClass = writerContainer.newSerializer(tableDesc).getSerializedClass diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala index aaf3560375b2c..1bd2296b07797 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala @@ -28,13 +28,13 @@ import org.apache.spark.util.SerializableJobConf // Base trait from which all hive insert statement physical execution extends. private[hive] trait SaveAsHiveFile extends UnaryExecNode { - protected def saveAsHiveFile( - rdd: RDD[InternalRow], - valueClass: Class[_], - fileSinkConf: ShimFileSinkDesc, - conf: SerializableJobConf, - writerContainer: SparkHiveWriterContainer, - isCompressed: Boolean): Unit = { +protected def saveAsHiveFile( + rdd: RDD[InternalRow], + valueClass: Class[_], + fileSinkConf: ShimFileSinkDesc, + conf: SerializableJobConf, + writerContainer: SparkHiveWriterContainer, + isCompressed: Boolean): Unit = { assert(valueClass != null, "Output value class not set") if (isCompressed) { @@ -73,5 +73,5 @@ private[hive] trait SaveAsHiveFile extends UnaryExecNode { writerContainer.driverSideSetup() sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) writerContainer.commitJob() - } + } } From 056eac64a646c77d375bd5ae863388ab4cbeee1b Mon Sep 17 00:00:00 2001 From: Parth Brahmbhatt Date: Mon, 23 May 2016 15:34:33 -0700 Subject: [PATCH 3/3] Addressing more review comments. --- .../spark/sql/catalyst/parser/SqlBase.g4 | 4 +-- .../sql/catalyst/parser/AstBuilder.scala | 26 +++++++--------- .../plans/logical/basicLogicalOperators.scala | 1 - .../spark/sql/hive/HiveStrategies.scala | 4 +-- .../sql/hive/execution/InsertIntoDir.scala | 30 +++++-------------- 5 files changed, 23 insertions(+), 42 deletions(-) diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 6f7bec50ecdf5..c226cc2437db9 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -202,7 +202,7 @@ query insertInto : INSERT OVERWRITE TABLE tableIdentifier partitionSpec? (IF NOT EXISTS)? | INSERT INTO TABLE? tableIdentifier partitionSpec? - | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING (rowFormat)? (STORED AS format=IDENTIFIER)? + | INSERT OVERWRITE LOCAL? DIRECTORY path=STRING rowFormat? (STORED AS format=IDENTIFIER)? ; partitionSpecLocation @@ -650,7 +650,7 @@ nonReserved | ASC | DESC | LIMIT | RENAME | SETS | AT | NULLS | OVERWRITE | ALL | ALTER | AS | BETWEEN | BY | CREATE | DELETE | DESCRIBE | DROP | EXISTS | FALSE | FOR | GROUP | IN | INSERT | INTO | IS |LIKE - | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE + | NULL | ORDER | OUTER | TABLE | TRUE | WITH | RLIKE | DIRECTORY ; SELECT: 'SELECT'; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index c8d3b47f9975a..cb83db3d61bb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -296,23 +296,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { private def withInsertInto( ctx: InsertIntoContext, query: LogicalPlan): LogicalPlan = withOrigin(ctx) { - val tableIdent = Option(ctx.tableIdentifier) - .map(ti => Option(visitTableIdentifier(ti))).getOrElse(None) + val tableIdent = Option(ctx.tableIdentifier).map(visitTableIdentifier).getOrElse(None) val partitionKeys = Option(ctx.partitionSpec).map(visitPartitionSpec).getOrElse(Map.empty) - tableIdent.map(ti => InsertIntoTable( - UnresolvedRelation(ti, None), - partitionKeys, - query, - ctx.OVERWRITE != null, - ctx.EXISTS != null)).getOrElse( - InsertIntoDir( - string(ctx.path), - ctx.LOCAL != null, - Option(ctx.format).map(_.getText).getOrElse("textfile"), - Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat), - query) - ) + var storageFormat = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + storageFormat = storageFormat.copy(serde = Option(ctx.format).map(format => format.getText)) + + tableIdent match { + case Some(ti: TableIdentifier) => InsertIntoTable(UnresolvedRelation(ti, None), + partitionKeys, + query, + ctx.OVERWRITE != null, ctx.EXISTS != null) + case _ => InsertIntoDir(string(ctx.path), ctx.LOCAL != null, storageFormat, query) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 720980a49052f..f628af614ec2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -390,7 +390,6 @@ case class InsertIntoTable( case class InsertIntoDir( path: String, isLocal: Boolean, - fileFormat: String, rowFormat: CatalogStorageFormat, child: LogicalPlan) extends LogicalPlan { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 908811e581336..3ccf23de21ae7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -51,9 +51,9 @@ private[hive] trait HiveStrategies { execution.InsertIntoHiveTable( table, partition, planLater(child), overwrite, ifNotExists) :: Nil case logical.InsertIntoDir( - path, isLocal, fileFormat, rowFormat, child) => + path, isLocal, rowFormat, child) => execution.InsertIntoDir( - path, isLocal, fileFormat, rowFormat, planLater(child)) :: Nil + path, isLocal, rowFormat, planLater(child)) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala index 66dc4e6ad592a..efa56be370bda 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoDir.scala @@ -24,15 +24,9 @@ import scala.language.existentials import antlr.SemanticException import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.hadoop.hive.ql.io.{HiveIgnoreKeyTextOutputFormat, RCFileOutputFormat} -import org.apache.hadoop.hive.ql.io.orc.{OrcOutputFormat, OrcSerde} -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat -import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.SerDe import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe -import org.apache.hadoop.io.Text import org.apache.hadoop.mapred._ import org.apache.spark.rdd.RDD @@ -42,12 +36,12 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.util.{SerializableJobConf, Utils} case class InsertIntoDir( path: String, isLocal: Boolean, - fileFormat: String, rowFormat: CatalogStorageFormat, child: SparkPlan) extends SaveAsHiveFile { @@ -68,28 +62,20 @@ case class InsertIntoDir( properties.put("columns", cols.dropRight(1)) properties.put("columns.types", types.dropRight(1)) - val fileFormatMap = Map[String, (Class[_ <: OutputFormat[_, _]], Class[_ <: SerDe])]( - "orc" -> (classOf[OrcOutputFormat], classOf[OrcSerde]), - "parquet" -> (classOf[MapredParquetOutputFormat], classOf[ParquetHiveSerDe]), - "rcfile" -> (classOf[RCFileOutputFormat], classOf[LazySimpleSerDe]), - "textfile" -> (classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]], classOf[LazySimpleSerDe]), - "sequencefile" -> (classOf[SequenceFileOutputFormat[Any, Any]], classOf[LazySimpleSerDe]) - ) + val defaultSerde = hadoopConf.get("hive.default.fileformat", "textFile") + val serDe = rowFormat.serde.getOrElse(defaultSerde).toLowerCase + val hiveSerDe = HiveSerDe.sourceToSerDe(serDe, sessionState.conf).getOrElse( + throw new SemanticException(s"Unrecognized serde format ${serDe}")) - val (ouputFormatClass, serdeClass) = fileFormatMap.getOrElse(fileFormat.toLowerCase, - throw new SemanticException(s"Unrecognized file format in STORED AS clause: $fileFormat," + - s" expected one of ${fileFormatMap.keys.mkString(",")}")) + properties.put(serdeConstants.SERIALIZATION_LIB, + hiveSerDe.serde.getOrElse(classOf[LazySimpleSerDe].getName)) - properties.put(serdeConstants.SERIALIZATION_LIB, serdeClass.getName) import scala.collection.JavaConverters._ properties.putAll(rowFormat.serdeProperties.asJava) - // if user specified a serde in the ROW FORMAT, use that. - rowFormat.serde.map(properties.put(serdeConstants.SERIALIZATION_LIB, _)) - val tableDesc = new TableDesc( classOf[TextInputFormat], - ouputFormatClass, + Utils.classForName(hiveSerDe.outputFormat.get), properties )