diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index f17177a771c3b..e62efe3d7f312 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -411,6 +411,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { catalog.ParquetConversions :: catalog.CreateTables :: catalog.PreInsertionCasts :: + catalog.WriteToDirs :: ExtractPythonUDFs :: ResolveHiveWindowFunction :: PreInsertCastAndRename :: @@ -515,7 +516,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { } @transient - private val hivePlanner = new SparkPlanner with HiveStrategies { + private[hive] val hivePlanner = new SparkPlanner with HiveStrategies { val hiveContext = self override def strategies: Seq[Strategy] = experimental.extraStrategies ++ Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index ac9aaed19d566..072fb148d35f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -647,6 +647,32 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive } } + /** + * Resolve hive.WriteToDirectory node,to set the properties + * of columns and columns.types in tableDesc. + */ + object WriteToDirs extends Rule[LogicalPlan] with HiveInspectors { + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + // Wait until children are resolved. + case p: LogicalPlan if !p.childrenResolved => p + + case WriteToDirectory(path, child, isLocal, tableDesc) + if !tableDesc.getProperties.containsKey("columns.types") => + // generate column name and related type info as hive style + val Array(cols, types) = child.output.foldLeft(Array("", ""))((r, a) => { + r(0) = r(0) + a.name + "," + r(1) = r(1) + a.dataType.toTypeInfo.getTypeName + ":" + r + }) + tableDesc.getProperties.setProperty("columns", cols.dropRight(1)) + tableDesc.getProperties.setProperty("columns.types", types.dropRight(1)) + WriteToDirectory(path, child, isLocal, tableDesc) + case WriteToDirectory(path, child, isLocal, tableDesc) => + execution.WriteToDirectory(path, hive.executePlan(child).executedPlan, isLocal, tableDesc) + } + } + /** * Casts input data to correct data types according to table definition before inserting into * that table. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index c3f29350101d3..5c84a4b9cfd92 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.ql.{ErrorMsg, Context} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry, FunctionInfo} import org.apache.hadoop.hive.ql.lib.Node import org.apache.hadoop.hive.ql.parse._ -import org.apache.hadoop.hive.ql.plan.PlanUtils +import org.apache.hadoop.hive.ql.plan.{TableDesc, PlanUtils} import org.apache.hadoop.hive.ql.session.SessionState import org.apache.spark.Logging @@ -77,6 +77,22 @@ private[hive] case class CreateTableAsSelect( childrenResolved } +/** + * Logical node for "INSERT OVERWRITE [LOCAL] DIRECTORY directory + * [ROW FORMAT row_format] STORED AS file_format SELECT ... FROM ..." + * @param path the target path to write data. + * @param child the child logical plan. + * @param isLocal whether to write data to local file system. + * @param desc describe the write property such as file format. + */ +private[hive] case class WriteToDirectory( + path: String, + child: LogicalPlan, + isLocal: Boolean, + desc: TableDesc) extends UnaryNode with Command { + override def output: Seq[Attribute] = Seq.empty[Attribute] +} + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl extends Logging { protected val nativeCommands = Seq( @@ -1210,6 +1226,19 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C Token("TOK_TMP_FILE", Nil) :: Nil) :: Nil) => query + case Token(destinationToken(), + Token("TOK_DIR", path :: formats) :: Nil) => + var isLocal = false + formats.collect { + case Token("LOCAL", others) => + isLocal = true + } + WriteToDirectory( + BaseSemanticAnalyzer.unescapeSQLString(path.getText), + query, + isLocal, + parseTableDesc(formats)) + case Token(destinationToken(), Token("TOK_TAB", tableArgs) :: Nil) => @@ -1678,6 +1707,81 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C } } + def parseTableDesc(nodeList: Seq[ASTNode]): TableDesc = { + import org.apache.hadoop.hive.ql.plan._ + + val createTableDesc = new CreateTableDesc() + + nodeList.collect { + case Token("TOK_FILEFORMAT_GENERIC", child :: Nil) => + child.getText().toLowerCase(Locale.ENGLISH) match { + case "orc" => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat") + createTableDesc.setSerName("org.apache.hadoop.hive.ql.io.orc.OrcSerde") + + case "parquet" => + createTableDesc + .setOutputFormat("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat") + createTableDesc + .setSerName("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe") + + case "rcfile" => + createTableDesc.setOutputFormat("org.apache.hadoop.hive.ql.io.RCFileOutputFormat") + createTableDesc.setSerName(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE)) + + case "textfile" => + createTableDesc + .setOutputFormat("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat") + + case "sequencefile" => + createTableDesc.setOutputFormat("org.apache.hadoop.mapred.SequenceFileOutputFormat") + + case _ => + throw new SemanticException( + s"Unrecognized file format in STORED AS clause: ${child.getText}") + } + + case Token("TOK_TABLEROWFORMAT", Token("TOK_SERDEPROPS", child :: Nil) :: Nil) => + val serdeParams = new java.util.HashMap[String, String]() + child match { + case Token("TOK_TABLEROWFORMATFIELD", rowChild1 :: rowChild2) => + val fieldDelim = BaseSemanticAnalyzer.unescapeSQLString (rowChild1.getText()) + serdeParams.put(serdeConstants.FIELD_DELIM, fieldDelim) + serdeParams.put(serdeConstants.SERIALIZATION_FORMAT, fieldDelim) + if (rowChild2.length > 1) { + val fieldEscape = BaseSemanticAnalyzer.unescapeSQLString (rowChild2(0).getText) + serdeParams.put(serdeConstants.ESCAPE_CHAR, fieldEscape) + } + case Token("TOK_TABLEROWFORMATCOLLITEMS", rowChild :: Nil) => + val collItemDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + serdeParams.put(serdeConstants.COLLECTION_DELIM, collItemDelim) + case Token("TOK_TABLEROWFORMATMAPKEYS", rowChild :: Nil) => + val mapKeyDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + serdeParams.put(serdeConstants.MAPKEY_DELIM, mapKeyDelim) + case Token("TOK_TABLEROWFORMATLINES", rowChild :: Nil) => + val lineDelim = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + if (!(lineDelim == "\n") && !(lineDelim == "10")) { + throw new AnalysisException( + SemanticAnalyzer.generateErrorMessage( + rowChild, + ErrorMsg.LINES_TERMINATED_BY_NON_NEWLINE.getMsg)) + } + serdeParams.put(serdeConstants.LINE_DELIM, lineDelim) + + case Token("TOK_TABLEROWFORMATNULL", rowChild :: Nil) => + val nullFormat = BaseSemanticAnalyzer.unescapeSQLString(rowChild.getText) + // TODO support the nullFormat + case _ => assert(false) + } + createTableDesc.setSerdeProps(serdeParams) + + case _ => // Unsupport features + } + // Note: we do not know the columns and column types when parsing, so here + // just input `null` for column types. column types will be set in analyzer. + PlanUtils.getDefaultTableDesc(createTableDesc, "", null) + } + def dumpTree(node: Node, builder: StringBuilder = new StringBuilder, indent: Int = 0) : StringBuilder = { node match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala new file mode 100644 index 0000000000000..3406d05c07a02 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SaveAsHiveFile.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} +import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorUtils} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.util.SerializableJobConf +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.{Logging, SparkContext, TaskContext} + +/** + * A trait for subclasses that write data using arbitrary SerDes to a file system . + */ +private[hive] trait SaveAsHiveFile extends HiveInspectors with Logging { + def newSerializer(tableDesc: TableDesc): Serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + def saveAsHiveFile( + sparkContext: SparkContext, + rdd: RDD[InternalRow], + schema: StructType, + dataTypes: Array[DataType], + valueClass: Class[_], + fileSinkConf: FileSinkDesc, + conf: SerializableJobConf, + writerContainer: SparkHiveWriterContainer): Unit = { + assert(valueClass != null, "Output value class not set") + conf.value.setOutputValueClass(valueClass) + + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) + log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + + writerContainer.driverSideSetup() + sparkContext.runJob(rdd, writeToFile _) + writerContainer.commitJob() + + // Note that this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} + val outputData = new Array[Any](fieldOIs.length) + + writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) + + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) + i += 1 + } + + writerContainer + .getLocalFileWriter(row, schema) + .write(serializer.serialize(outputData, standardOI)) + } + + writerContainer.close() + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12c667e6e92da..b92507b0fc391 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -17,18 +17,14 @@ package org.apache.spark.sql.hive.execution -import java.util +import java.util.LinkedHashMap import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.ql.{Context, ErrorMsg} -import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow @@ -36,11 +32,10 @@ import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.execution.{UnaryNode, SparkPlan} import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.sql.hive._ -import org.apache.spark.sql.types.DataType -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.util.SerializableJobConf import scala.collection.JavaConversions._ -import org.apache.spark.util.SerializableJobConf + private[hive] case class InsertIntoHiveTable( @@ -48,75 +43,15 @@ case class InsertIntoHiveTable( partition: Map[String, Option[String]], child: SparkPlan, overwrite: Boolean, - ifNotExists: Boolean) extends UnaryNode with HiveInspectors { + ifNotExists: Boolean) extends UnaryNode with SaveAsHiveFile { @transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext] @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @transient private lazy val catalog = sc.catalog - private def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - def output: Seq[Attribute] = child.output - def saveAsHiveFile( - rdd: RDD[InternalRow], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: SerializableJobConf, - writerContainer: SparkHiveWriterContainer): Unit = { - assert(valueClass != null, "Output value class not set") - conf.value.setOutputValueClass(valueClass) - - val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName - assert(outputFileFormatClassName != null, "Output format class not set") - conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - FileOutputFormat.setOutputPath( - conf.value, - SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) - log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - - writerContainer.driverSideSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writerContainer.commitJob() - - // Note that this function is executed on executor side - def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = { - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val dataTypes: Array[DataType] = child.output.map(_.dataType).toArray - val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt)} - val outputData = new Array[Any](fieldOIs.length) - - writerContainer.executorSideSetup(context.stageId, context.partitionId, context.attemptNumber) - - iterator.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i))) - i += 1 - } - - writerContainer - .getLocalFileWriter(row, table.schema) - .write(serializer.serialize(outputData, standardOI)) - } - - writerContainer.close() - } - } - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -185,7 +120,15 @@ case class InsertIntoHiveTable( new SparkHiveWriterContainer(jobConf, fileSinkConf) } - saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) + saveAsHiveFile( + sc.sparkContext, + child.execute(), + table.schema, + child.output.map(_.dataType).toArray, + outputClass, + fileSinkConf, + jobConfSer, + writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. @@ -197,7 +140,7 @@ case class InsertIntoHiveTable( if (partition.nonEmpty) { // loadPartition call orders directories created on the iteration order of the this map - val orderedPartitionSpec = new util.LinkedHashMap[String, String]() + val orderedPartitionSpec = new LinkedHashMap[String, String]() table.hiveQlTable.getPartCols().foreach { entry => orderedPartitionSpec.put(entry.getName, partitionSpec.get(entry.getName).getOrElse("")) } @@ -257,8 +200,6 @@ case class InsertIntoHiveTable( Seq.empty[Row] } - override def executeCollect(): Array[Row] = sideEffectResult.toArray - protected override def doExecute(): RDD[InternalRow] = { sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala new file mode 100644 index 0000000000000..c250e59481fb9 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/WriteToDirectory.scala @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException + +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.conf.HiveConf.ConfVars +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.mapred.JobConf + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.execution.{RunnableCommand, SparkPlan} +import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} +import org.apache.spark.sql.hive._ +import org.apache.spark.sql.types.StructType +import org.apache.spark.util.SerializableJobConf + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class WriteToDirectory( + path: String, + child: SparkPlan, + isLocal: Boolean, + desc: TableDesc) extends RunnableCommand with SaveAsHiveFile { + + override def output: Seq[Attribute] = child.output + + def run(sqlContext: SQLContext): Seq[Row] = { + @transient val hiveContext = sqlContext.asInstanceOf[HiveContext] + @transient lazy val context = new Context(hiveContext.hiveconf) + @transient lazy val outputClass = newSerializer(desc).getSerializedClass + val jobConf = new JobConf(hiveContext.hiveconf) + val jobConfSer = new SerializableJobConf(jobConf) + val targetPath = new Path(path) + + val writeToPath = if (isLocal) { + val localFileSystem = FileSystem.getLocal(jobConf) + val localPath = localFileSystem.makeQualified(targetPath) + // remove old dir + if (localFileSystem.exists(localPath)) { + localFileSystem.delete(localPath, true) + } + localPath + } else { + val qualifiedPath = FileUtils.makeQualified(targetPath, hiveContext.hiveconf) + val dfs = qualifiedPath.getFileSystem(jobConf) + if (dfs.exists(qualifiedPath)) { + dfs.delete(qualifiedPath, true) + } else { + dfs.mkdirs(qualifiedPath.getParent) + } + qualifiedPath + } + + val fileSinkConf = new FileSinkDesc(writeToPath.toString, desc, false) + val isCompressed = hiveContext.hiveconf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + hiveContext.hiveconf.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hiveContext.hiveconf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(hiveContext.hiveconf.get("mapred.output.compression.type")) + } + + val writerContainer = new SparkHiveWriterContainer(jobConf, fileSinkConf) + + saveAsHiveFile( + hiveContext.sparkContext, + child.execute(), + StructType.fromAttributes(output), + child.output.map(_.dataType).toArray, + outputClass, + fileSinkConf, + jobConfSer, + writerContainer) + + Seq.empty[Row] + } + +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 79a136ae6f619..3601ad2ecb45f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -526,6 +526,81 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils { sql("SELECT key, value FROM src ORDER BY key").collect().toSeq) } + test("test insert overwrite to dir from hive metastore table") { + import org.apache.spark.util.Utils + + val path = Utils.createTempDir() + path.delete() + checkAnswer( + sql(s"INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' SELECT * FROM src where key < 10"), + Seq.empty[Row]) + + checkAnswer( + sql(s"""INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc + |SELECT * FROM src where key < 10""".stripMargin), + Seq.empty[Row]) + + // use orc data source to check the data of path is right. + sql( + s"""CREATE TEMPORARY TABLE orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${path.getCanonicalPath}' + |) + """.stripMargin) + checkAnswer( + sql("select * from orc_source"), + sql("select * from src where key < 10").collect() + ) + + Utils.deleteRecursively(path) + dropTempTable("orc_source") + } + + test("test insert overwrite to dir from temp table") { + import org.apache.spark.util.Utils + + sparkContext + .parallelize(1 to 10) + .map(i => TestData(i, i.toString)) + .toDF() + .registerTempTable("test_insert_table") + + val path = Utils.createTempDir() + path.delete() + checkAnswer( + sql( + s""" + |INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |SELECT * FROM test_insert_table + """.stripMargin), + Seq.empty[Row]) + + checkAnswer( + sql(s""" + INSERT OVERWRITE LOCAL DIRECTORY '${path.toString}' + |ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc + |SELECT * FROM test_insert_table""".stripMargin), + Seq.empty[Row]) + + // use orc data source to check the data of path is right. + sql( + s"""CREATE TEMPORARY TABLE orc_source + |USING org.apache.spark.sql.hive.orc + |OPTIONS ( + | PATH '${path.getCanonicalPath}' + |) + """.stripMargin) + checkAnswer( + sql("select * from orc_source"), + sql("select * from test_insert_table").collect() + ) + Utils.deleteRecursively(path) + dropTempTable("test_insert_table") + dropTempTable("orc_source") + } + test("SPARK-4825 save join to table") { val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF() sql("CREATE TABLE test1 (key INT, value STRING)")