From a96e8d9f1058915c290d20abe8b64f5f95839f8c Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 13 May 2015 09:21:26 -0700 Subject: [PATCH 1/7] orc data source support --- .../sql/hive/orc/HadoopTypeConverter.scala | 93 +++ .../spark/sql/hive/orc/OrcFileOperator.scala | 87 +++ .../spark/sql/hive/orc/OrcFilters.scala | 117 ++++ .../spark/sql/hive/orc/OrcRelation.scala | 176 ++++++ .../sql/hive/orc/OrcTableOperations.scala | 116 ++++ .../apache/spark/sql/hive/orc/package.scala | 61 ++ .../hive/orc/OrcPartitionDiscoverySuite.scala | 278 +++++++++ .../spark/sql/hive/orc/OrcQuerySuite.scala | 211 +++++++ .../spark/sql/hive/orc/OrcRelationTest.scala | 533 ++++++++++++++++++ .../apache/spark/sql/hive/orc/OrcSuite.scala | 211 +++++++ 10 files changed, 1883 insertions(+) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala create mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala new file mode 100644 index 0000000000000..aabc5477b05a6 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.hadoop.hive.common.`type`.HiveVarchar +import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ +import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow} + +import scala.collection.JavaConversions._ + +/** + * We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use + * this class. + * + */ +private[hive] object HadoopTypeConverter extends HiveInspectors { + /** + * Builds specific unwrappers ahead of time according to object inspector + * types to avoid pattern matching and branching costs per row. + */ + def unwrappers(fieldRefs: Seq[StructField]): Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map { + _.getFieldObjectInspector match { + case oi: BooleanObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value)) + case oi: ByteObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value)) + case oi: ShortObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value)) + case oi: IntObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value)) + case oi: LongObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value)) + case oi: FloatObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value)) + case oi: DoubleObjectInspector => + (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value)) + case oi => + (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrap(value, oi) + } + } + + /** + * Wraps with Hive types based on object inspector. + */ + def wrappers(oi: ObjectInspector): Any => Any = oi match { + case _: JavaHiveVarcharObjectInspector => + (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size) + + case _: JavaHiveDecimalObjectInspector => + (o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying()) + + case soi: StandardStructObjectInspector => + val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) + (o: Any) => { + val struct = soi.create() + (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach { + (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) + } + struct + } + + case loi: ListObjectInspector => + val wrapper = wrapperFor(loi.getListElementObjectInspector) + (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) + + case moi: MapObjectInspector => + val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) + val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) + (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => + keyWrapper(key) -> valueWrapper(value) + }) + + case _ => + identity[Any] + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala new file mode 100644 index 0000000000000..6805677e84ea9 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.IOException + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector +import org.apache.spark.Logging +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql.hive.HiveMetastoreTypes +import org.apache.spark.sql.types.StructType + +private[orc] object OrcFileOperator extends Logging{ + + def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { + var conf = config.getOrElse(new Configuration) + val fspath = new Path(pathStr) + val fs = fspath.getFileSystem(conf) + val orcFiles = listOrcFiles(pathStr, conf) + OrcFile.createReader(fs, orcFiles(0)) + } + + def readSchema(path: String, conf: Option[Configuration]): StructType = { + val reader = getFileReader(path, conf) + val readerInspector: StructObjectInspector = reader.getObjectInspector + .asInstanceOf[StructObjectInspector] + val schema = readerInspector.getTypeName + HiveMetastoreTypes.toDataType(schema).asInstanceOf[StructType] + } + + def getObjectInspector(path: String, conf: Option[Configuration]): StructObjectInspector = { + val reader = getFileReader(path, conf) + val readerInspector: StructObjectInspector = reader.getObjectInspector + .asInstanceOf[StructObjectInspector] + readerInspector + } + + def deletePath(pathStr: String, conf: Configuration): Unit = { + val fspath = new Path(pathStr) + val fs = fspath.getFileSystem(conf) + try { + fs.delete(fspath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${fspath.toString} prior" + + s" to InsertIntoOrcTable:\n${e.toString}") + } + } + + def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { + val origPath = new Path(pathStr) + val fs = origPath.getFileSystem(conf) + val path = origPath.makeQualified(fs) + val paths = SparkHadoopUtil.get.listLeafStatuses(fs, origPath) + .filterNot(_.isDir) + .map(_.getPath) + .filterNot(_.getName.startsWith("_")) + .filterNot(_.getName.startsWith(".")) + + if (paths == null || paths.size == 0) { + throw new IllegalArgumentException( + s"orcFileOperator: path $path does not have valid orc files matching the pattern") + } + logInfo("Qualified file list: ") + paths.foreach{x=>logInfo(x.toString)} + paths + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala new file mode 100644 index 0000000000000..8e73f9181f08b --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder +import org.apache.spark.Logging +import org.apache.spark.sql.sources._ + +private[sql] object OrcFilters extends Logging { + + def createFilter(expr: Array[Filter]): Option[SearchArgument] = { + if (expr == null || expr.size == 0) return None + var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder()) + sarg.get.startAnd() + expr.foreach { + x => { + sarg match { + case Some(s1) => sarg = createFilter(x, s1) + case _ => None + } + } + } + sarg match { + case Some(b) => Some(b.end.build) + case _ => None + } + } + + def createFilter(expression: Filter, builder: Builder): Option[Builder] = { + expression match { + case p@And(left: Filter, right: Filter) => { + val b1 = builder.startAnd() + val b2 = createFilter(left, b1) + b2 match { + case Some(b) => val b3 = createFilter(right, b) + if (b3.isDefined) { + Some(b3.get.end) + } else { + None + } + case _ => None + } + } + case p@Or(left: Filter, right: Filter) => { + val b1 = builder.startOr() + val b2 = createFilter(left, b1) + b2 match { + case Some(b) => val b3 = createFilter(right, b) + if (b3.isDefined) { + Some(b3.get.end) + } else { + None + } + case _ => None + } + } + case p@Not(child: Filter) => { + val b1 = builder.startNot() + val b2 = createFilter(child, b1) + b2 match { + case Some(b) => Some(b.end) + case _ => None + } + } + case p@EqualTo(attribute: String, value: Any) => { + val b1 = builder.equals(attribute, value) + Some(b1) + } + case p@LessThan(attribute: String, value: Any) => { + val b1 = builder.lessThan(attribute ,value) + Some(b1) + } + case p@LessThanOrEqual(attribute: String, value: Any) => { + val b1 = builder.lessThanEquals(attribute, value) + Some(b1) + } + case p@GreaterThan(attribute: String, value: Any) => { + val b1 = builder.startNot().lessThanEquals(attribute, value).end() + Some(b1) + } + case p@GreaterThanOrEqual(attribute: String, value: Any) => { + val b1 = builder.startNot().lessThan(attribute, value).end() + Some(b1) + } + case p@IsNull(attribute: String) => { + val b1 = builder.startNot().isNull(attribute).end() + Some(b1) + } + case p@In(attribute: String, values: Array[Any]) => { + val b1 = builder.in(attribute, values) + Some(b1) + } + // not supported in filter + // case p@EqualNullSafe(left: String, right: String) => { + // val b1 = builder.nullSafeEquals(left, right) + // Some(b1) + // } + case _ => None + } + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala new file mode 100644 index 0000000000000..816f3794a6a02 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util.Objects + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.io.orc.{OrcSerde, OrcOutputFormat} +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspector, StructObjectInspector} +import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo} +import org.apache.hadoop.io.{Writable, NullWritable} +import org.apache.hadoop.mapred.{RecordWriter, Reporter, JobConf} +import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext} +import org.apache.spark.Logging +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.mapred.SparkHadoopMapRedUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.hive.HiveMetastoreTypes +import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.sources._ +import scala.collection.JavaConversions._ + + +private[sql] class DefaultSource extends FSBasedRelationProvider { + + def createRelation( + sqlContext: SQLContext, + paths: Array[String], + schema: Option[StructType], + partitionColumns: Option[StructType], + parameters: Map[String, String]): FSBasedRelation ={ + val partitionSpec = partitionColumns.map(PartitionSpec(_, Seq.empty[Partition])) + OrcRelation(paths, parameters, + schema, partitionSpec)(sqlContext) + } +} + + +private[sql] class OrcOutputWriter extends OutputWriter with SparkHadoopMapRedUtil { + var recordWriter: RecordWriter[NullWritable, Writable] = _ + var taskAttemptContext: TaskAttemptContext = _ + var serializer: OrcSerde = _ + var wrappers: Array[Any => Any] = _ + var created = false + var path: String = _ + var dataSchema: StructType = _ + var fieldOIs: Array[ObjectInspector] = _ + var standardOI: StructObjectInspector = _ + + + override def init(path: String, + dataSchema: StructType, + context: TaskAttemptContext): Unit = { + this.path = path + this.dataSchema = dataSchema + taskAttemptContext = context + } + + // Avoid create empty file without schema attached + private def initWriter() = { + if (!created) { + created = true + val conf = taskAttemptContext.getConfiguration + val outputFormat = new OrcOutputFormat() + val taskId: TaskID = taskAttemptContext.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = s"part-r-${partition}-${System.currentTimeMillis}.orc" + val file = new Path(path, filename) + val fs = file.getFileSystem(conf) + val orcSchema = HiveMetastoreTypes.toMetastoreType(dataSchema) + + serializer = new OrcSerde + val typeInfo: TypeInfo = + TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) + standardOI = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) + .asInstanceOf[StructObjectInspector] + fieldOIs = standardOI + .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + wrappers = fieldOIs.map(HadoopTypeConverter.wrappers) + recordWriter = { + outputFormat.getRecordWriter(fs, + conf.asInstanceOf[JobConf], + file.toUri.getPath, Reporter.NULL) + .asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] + } + } + } + override def write(row: Row): Unit = { + initWriter() + var i = 0 + val outputData = new Array[Any](fieldOIs.length) + while (i < row.length) { + outputData(i) = wrappers(i)(row(i)) + i += 1 + } + val writable = serializer.serialize(outputData, standardOI) + recordWriter.write(NullWritable.get(), writable) + } + + override def close(): Unit = { + if (recordWriter != null) { + recordWriter.close(Reporter.NULL) + } + } +} + + +@DeveloperApi +private[sql] case class OrcRelation(override val paths: Array[String], + parameters: Map[String, String], + maybeSchema: Option[StructType] = None, + maybePartitionSpec: Option[PartitionSpec] = None)( + @transient val sqlContext: SQLContext) + extends FSBasedRelation(paths, maybePartitionSpec) + with Logging { + self: Product => + @transient val conf = sqlContext.sparkContext.hadoopConfiguration + + + override def dataSchema: StructType = + maybeSchema.getOrElse(OrcFileOperator.readSchema(paths(0), Some(conf))) + + override def outputWriterClass: Class[_ <: OutputWriter] = classOf[OrcOutputWriter] + /** Attributes */ + var output: Seq[Attribute] = schema.toAttributes + + override def needConversion: Boolean = false + + // Equals must also take into account the output attributes so that we can distinguish between + // different instances of the same relation, + override def equals(other: Any): Boolean = other match { + case that: OrcRelation => + paths.toSet == that.paths.toSet && + dataSchema == that.dataSchema && + schema == that.schema && + partitionColumns == that.partitionColumns + case _ => false + } + + override def hashCode(): Int = { + Objects.hashCode( + paths.toSet, + dataSchema, + schema, + maybePartitionSpec) + } + override def buildScan(requiredColumns: Array[String], + filters: Array[Filter], + inputPaths: Array[String]): RDD[Row] = { + val output = StructType(requiredColumns.map(dataSchema(_))).toAttributes + OrcTableScan(output, this, filters, inputPaths).execute() + } +} + +private[sql] object OrcRelation extends Logging { + // Default partition name to use when the partition column value is null or empty string. + val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala new file mode 100644 index 0000000000000..94c78a14524b5 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.util._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.ql.io.orc._ +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.Job +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.hive.HiveShim +import org.apache.spark.sql.sources.Filter +import org.apache.spark.{Logging, SerializableWritable} +import scala.collection.JavaConversions._ + +case class OrcTableScan(attributes: Seq[Attribute], + @transient relation: OrcRelation, + filters: Array[Filter], + inputPaths: Array[String]) extends Logging { + @transient val sqlContext = relation.sqlContext + val path = relation.paths(0) + + def addColumnIds(output: Seq[Attribute], + relation: OrcRelation, conf: Configuration) { + val ids = + output.map(a => + relation.dataSchema.toAttributes.indexWhere(_.name == a.name): Integer) + .filter(_ >= 0) + val names = attributes.map(_.name) + val sorted = ids.zip(names).sorted + HiveShim.appendReadColumns(conf, sorted.map(_._1), sorted.map(_._2)) + } + + def buildFilter(job: Job, filters: Array[Filter]): Unit = { + if (ORC_FILTER_PUSHDOWN_ENABLED) { + val conf: Configuration = job.getConfiguration + val recordFilter = OrcFilters.createFilter(filters) + if (recordFilter.isDefined) { + conf.set(SARG_PUSHDOWN, toKryo(recordFilter.get)) + conf.setBoolean(INDEX_FILTER, true) + } + } + } + + // Transform all given raw `Writable`s into `Row`s. + def fillObject(conf: Configuration, + iterator: Iterator[org.apache.hadoop.io.Writable], + nonPartitionKeyAttrs: Seq[(Attribute, Int)], + mutableRow: MutableRow): Iterator[Row] = { + val deserializer = new OrcSerde + val soi = OrcFileOperator.getObjectInspector(path, Some(conf)) + val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { + case (attr, ordinal) => + soi.getStructFieldRef(attr.name.toLowerCase) -> ordinal + }.unzip + val unwrappers = HadoopTypeConverter.unwrappers(fieldRefs) + // Map each tuple to a row object + iterator.map { value => + val raw = deserializer.deserialize(value) + logDebug("Raw data: " + raw) + var i = 0 + while (i < fieldRefs.length) { + val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) + if (fieldValue == null) { + mutableRow.setNullAt(fieldOrdinals(i)) + } else { + unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i)) + } + i += 1 + } + mutableRow: Row + } + } + + def execute(): RDD[Row] = { + val sc = sqlContext.sparkContext + val job = new Job(sc.hadoopConfiguration) + val conf: Configuration = job.getConfiguration + + buildFilter(job, filters) + addColumnIds(attributes, relation, conf) + FileInputFormat.setInputPaths(job, inputPaths.map(new Path(_)): _*) + + val inputClass = classOf[OrcInputFormat].asInstanceOf[ + Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] + + val rdd = sc.hadoopRDD(conf.asInstanceOf[JobConf], + inputClass, classOf[NullWritable], classOf[Writable]).map(_._2) + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + val wrappedConf = new SerializableWritable(conf) + val rowRdd: RDD[Row] = rdd.mapPartitions { iter => + fillObject(wrappedConf.value, iter, attributes.zipWithIndex, mutableRow) + } + rowRdd + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala new file mode 100644 index 0000000000000..a85f035a9424d --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive + +import com.esotericsoftware.kryo.io.Output +import com.esotericsoftware.kryo.Kryo +import org.apache.commons.codec.binary.Base64 +import org.apache.spark.sql.{SaveMode, DataFrame} +import scala.reflect.runtime.universe.{TypeTag, typeTag} + +package object orc { + implicit class OrcContext(sqlContext: HiveContext) { + import sqlContext._ + @scala.annotation.varargs + def orcFile(paths: String*): DataFrame = { + if (paths.isEmpty) { + emptyDataFrame + } else { + val orcRelation = OrcRelation(paths.toArray, Map.empty)(sqlContext) + sqlContext.baseRelationToDataFrame(orcRelation) + } + } + } + + implicit class OrcSchemaRDD(dataFrame: DataFrame) { + def saveAsOrcFile(path: String, mode: SaveMode = SaveMode.Overwrite): Unit = { + dataFrame.save( + path, + source = classOf[DefaultSource].getCanonicalName, + mode = SaveMode.Overwrite) + } + } + + // Flags for orc copression, predicates pushdown, etc. + val orcDefaultCompressVar = "hive.exec.orc.default.compress" + var ORC_FILTER_PUSHDOWN_ENABLED = true + val SARG_PUSHDOWN = "sarg.pushdown"; + val INDEX_FILTER = "hive.optimize.index.filter" + + def toKryo(input: Any): String = { + val out = new Output(4 * 1024, 10 * 1024 * 1024); + new Kryo().writeObject(out, input); + out.close(); + Base64.encodeBase64String(out.toBytes()); + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala new file mode 100644 index 0000000000000..b8fe582498ecf --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} + +import scala.reflect.ClassTag +import scala.reflect.runtime.universe.TypeTag + + +// The data where the partitioning key exists only in the directory structure. +case class OrcParData(intField: Int, stringField: String) + +// The data that also includes the partitioning key +case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) + +class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { + val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" + + def withTempDir(f: File => Unit): Unit = { + val dir = Utils.createTempDir().getCanonicalFile + try f(dir) finally Utils.deleteRecursively(dir) + } + + def makeOrcFile[T <: Product: ClassTag: TypeTag]( + data: Seq[T], path: File): Unit = { + data.toDF().saveAsOrcFile(path.getCanonicalPath) + } + + + def makeOrcFile[T <: Product: ClassTag: TypeTag]( + df: DataFrame, path: File): Unit = { + df.saveAsOrcFile(path.getCanonicalPath) + } + + protected def withTempTable(tableName: String)(f: => Unit): Unit = { + try f finally TestHive.dropTempTable(tableName) + } + + protected def makePartitionDir( + basePath: File, + defaultPartitionName: String, + partitionCols: (String, Any)*): File = { + val partNames = partitionCols.map { case (k, v) => + val valueString = if (v == null || v == "") defaultPartitionName else v.toString + s"$k=$valueString" + } + + val partDir = partNames.foldLeft(basePath) { (parent, child) => + new File(parent, child) + } + + assert(partDir.mkdirs(), s"Couldn't create directory $partDir") + partDir + } + + test("read partitioned table - normal case") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } { + makeOrcFile( + (1 to 10).map(i => OrcParData(i, i.toString)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + TestHive.orcFile(base.getCanonicalPath).registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } yield Row(i, i.toString, pi, ps)) + + checkAnswer( + sql("SELECT intField, pi FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + _ <- Seq("foo", "bar") + } yield Row(i, pi)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi = 1"), + for { + i <- 1 to 10 + ps <- Seq("foo", "bar") + } yield Row(i, i.toString, 1, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps = 'foo'"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, i.toString, pi, "foo")) + } + } + } + + test("read partitioned table - partition key included in orc file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } { + makeOrcFile( + (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + TestHive.orcFile(base.getCanonicalPath).registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", "bar") + } yield Row(i, pi, i.toString, ps)) + + checkAnswer( + sql("SELECT intField, pi FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + _ <- Seq("foo", "bar") + } yield Row(i, pi)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi = 1"), + for { + i <- 1 to 10 + ps <- Seq("foo", "bar") + } yield Row(i, 1, i.toString, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps = 'foo'"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, pi, i.toString, "foo")) + } + } + } + + + test("read partitioned table - with nulls") { + withTempDir { base => + for { + // Must be `Integer` rather than `Int` here. `null.asInstanceOf[Int]` results in a zero... + pi <- Seq(1, null.asInstanceOf[Integer]) + ps <- Seq("foo", null.asInstanceOf[String]) + } { + makeOrcFile( + (1 to 10).map(i => OrcParData(i, i.toString)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + val orcRelation = load( + "org.apache.spark.sql.hive.orc.DefaultSource", + Map( + "path" -> base.getCanonicalPath, + OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + + orcRelation.registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, null.asInstanceOf[Integer]) + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, i.toString, pi, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE pi IS NULL"), + for { + i <- 1 to 10 + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, i.toString, null, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps IS NULL"), + for { + i <- 1 to 10 + pi <- Seq(1, null.asInstanceOf[Integer]) + } yield Row(i, i.toString, pi, null)) + } + } + } + + test("read partitioned table - with nulls and partition keys are included in Orc file") { + withTempDir { base => + for { + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } { + makeOrcFile( + (1 to 10).map(i => OrcParDataWithKey(i, pi, i.toString, ps)), + makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps)) + } + + val orcRelation = load( + "org.apache.spark.sql.hive.orc.DefaultSource", + Map( + "path" -> base.getCanonicalPath, + OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + + orcRelation.registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + ps <- Seq("foo", null.asInstanceOf[String]) + } yield Row(i, pi, i.toString, ps)) + + checkAnswer( + sql("SELECT * FROM t WHERE ps IS NULL"), + for { + i <- 1 to 10 + pi <- Seq(1, 2) + } yield Row(i, pi, i.toString, null)) + } + } + } + + ignore("read partitioned table - merging compatible schemas: not supported yet") { + withTempDir { base => + makeOrcFile( + (1 to 10).map(i => Tuple1(i)).toDF("intField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 1)) + + makeOrcFile( + (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), + makePartitionDir(base, defaultPartitionName, "pi" -> 2)) + + load(base.getCanonicalPath, "org.apache.spark.sql.hive.orc").registerTempTable("t") + + withTempTable("t") { + checkAnswer( + sql("SELECT * FROM t"), + (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2))) + } + } + } +} + diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala new file mode 100644 index 0000000000000..90490df765ca6 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File + +import org.apache.hadoop.hive.ql.io.orc.CompressionKind +import org.apache.spark.sql._ +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.hive.test.TestHive.implicits._ +import org.apache.spark.util.Utils +import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} + +case class TestRDDEntry(key: Int, value: String) + +case class NullReflectData( + intField: java.lang.Integer, + longField: java.lang.Long, + floatField: java.lang.Float, + doubleField: java.lang.Double, + booleanField: java.lang.Boolean) + +case class OptionalReflectData( + intField: Option[Int], + longField: Option[Long], + floatField: Option[Float], + doubleField: Option[Double], + booleanField: Option[Boolean]) + +case class Nested(i: Int, s: String) + +case class Data(array: Seq[Int], nested: Nested) + +case class AllDataTypes( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean) + +case class AllDataTypesWithNonPrimitiveType( + stringField: String, + intField: Int, + longField: Long, + floatField: Float, + doubleField: Double, + shortField: Short, + byteField: Byte, + booleanField: Boolean, + array: Seq[Int], + arrayContainsNull: Seq[Option[Int]], + map: Map[Int, Long], + mapValueContainsNull: Map[Int, Option[Long]], + data: Data) + +case class BinaryData(binaryData: Array[Byte]) + +case class Contact(name: String, phone: String) + +case class Person(name: String, age: Int, contacts: Seq[Contact]) + +class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { + + def getTempFilePath(prefix: String, suffix: String = ""): File = { + val tempFile = File.createTempFile(prefix, suffix) + tempFile.delete() + tempFile + } + + test("Read/Write All Types") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext.parallelize(range) + .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + data.toDF().saveAsOrcFile(tempDir) + checkAnswer( + TestHive.orcFile(tempDir), + data.toDF().collect().toSeq) + Utils.deleteRecursively(new File(tempDir)) + } + + test("read/write binary data") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).toDF().saveAsOrcFile(tempDir) + TestHive.orcFile(tempDir) + .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8")) + .collect().toSeq == Seq("test") + Utils.deleteRecursively(new File(tempDir)) + } + + test("Read/Write All Types with non-primitive type") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val range = (0 to 255) + val data = sparkContext.parallelize(range) + .map(x => AllDataTypesWithNonPrimitiveType( + s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0, + (0 until x), + (0 until x).map(Option(_).filter(_ % 3 == 0)), + (0 until x).map(i => i -> i.toLong).toMap, + (0 until x).map(i => i -> Option(i.toLong)).toMap + (x -> None), + Data((0 until x), Nested(x, s"$x")))) + data.toDF().saveAsOrcFile(tempDir) + + checkAnswer( + TestHive.orcFile(tempDir), + data.toDF().collect().toSeq) + Utils.deleteRecursively(new File(tempDir)) + } + + test("Creating case class RDD table") { + sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + .toDF().registerTempTable("tmp") + val rdd = sql("SELECT * FROM tmp").collect().sortBy(_.getInt(0)) + var counter = 1 + rdd.foreach { + // '===' does not like string comparison? + row: Row => { + assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + counter = counter + 1 + } + } + } + + test("Simple selection form orc table") { + val tempDir = getTempFilePath("orcTest").getCanonicalPath + val data = sparkContext.parallelize((1 to 10)) + .map(i => Person(s"name_$i", i, (0 until 2).map{ m=> + Contact(s"contact_$m", s"phone_$m") })) + data.toDF().saveAsOrcFile(tempDir) + val f = TestHive.orcFile(tempDir) + f.registerTempTable("tmp") + var rdd = sql("SELECT name FROM tmp where age <= 5") + assert(rdd.count() == 5) + + rdd = sql("SELECT name, contacts FROM tmp where age > 5") + assert(rdd.count() == 5) + val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 10) + Utils.deleteRecursively(new File(tempDir)) + } + + test("save and load case class RDD with Nones as orc") { + val data = OptionalReflectData(None, None, None, None, None) + val rdd = sparkContext.parallelize(data :: Nil) + val tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.toDF().saveAsOrcFile(tempDir) + val readFile = TestHive.orcFile(tempDir) + val rdd_saved = readFile.collect() + assert(rdd_saved(0).toSeq === Seq.fill(5)(null)) + Utils.deleteRecursively(new File(tempDir)) + } + + // We only support zlib in hive0.12.0 now + test("Default Compression options for writing to an Orcfile") { + //TODO: support other compress codec + var tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.toDF().saveAsOrcFile(tempDir) + var actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression + assert(actualCodec == CompressionKind.ZLIB) + Utils.deleteRecursively(new File(tempDir)) + } + + // Following codec is supported in hive-0.13.1, ignore it now + ignore("Other Compression options for writing to an Orcfile only supported in hive 0.13.1 and above") { + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY") + var tempDir = getTempFilePath("orcTest").getCanonicalPath + val rdd = sparkContext.parallelize((1 to 100)) + .map(i => TestRDDEntry(i, s"val_$i")) + rdd.toDF().saveAsOrcFile(tempDir) + var actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression + assert(actualCodec == CompressionKind.SNAPPY) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "NONE") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.toDF().saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression + assert(actualCodec == CompressionKind.NONE) + Utils.deleteRecursively(new File(tempDir)) + + TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "LZO") + tempDir = getTempFilePath("orcTest").getCanonicalPath + rdd.toDF().saveAsOrcFile(tempDir) + actualCodec = OrcFileOperator.getFileReader(tempDir).getCompression + assert(actualCodec == CompressionKind.LZO) + Utils.deleteRecursively(new File(tempDir)) + } +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala new file mode 100644 index 0000000000000..8ed8c53e81613 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala @@ -0,0 +1,533 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import org.apache.hadoop.fs.Path + +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.sql._ +import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.parquet.ParquetTest +import org.apache.spark.sql.sources.{FSBasedRelation, LogicalRelation} +import org.apache.spark.sql.types._ + +// TODO Don't extend ParquetTest +// This test suite extends ParquetTest for some convenient utility methods. These methods should be +// moved to some more general places, maybe QueryTest. +class OrcRelationTest extends QueryTest with ParquetTest { + override val sqlContext: SQLContext = TestHive + + import sqlContext._ + import sqlContext.implicits._ + + val dataSourceName = classOf[DefaultSource].getCanonicalName + + val dataSchema = + StructType( + Seq( + StructField("a", IntegerType, nullable = false), + StructField("b", StringType, nullable = false))) + + val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") + + val partitionedTestDF1 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF2 = (for { + i <- 1 to 3 + p2 <- Seq("foo", "bar") + } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") + + val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) + + def checkQueries(df: DataFrame): Unit = { + // Selects everything + checkAnswer( + df, + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + + // Simple filtering and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 === 2), + for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) + + // Simple projection and filtering + checkAnswer( + df.filter('a > 1).select('b, 'a + 1), + for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) + + // Simple projection and partition pruning + checkAnswer( + df.filter('a > 1 && 'p1 < 2).select('b, 'p1), + for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) + + // Self-join + df.registerTempTable("t") + withTempTable("t") { + checkAnswer( + sql( + """SELECT l.a, r.b, l.p1, r.p2 + |FROM t l JOIN t r + |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 + """.stripMargin), + for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) + } + } + + test("save()/load() - non-partitioned table - Overwrite") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + testDF.collect()) + } + } + + test("save()/load() - non-partitioned table - Append") { + withTempPath { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Append) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)).orderBy("a"), + testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("save()/load() - non-partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("save()/load() - non-partitioned table - Ignore") { + withTempDir { file => + testDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + assert(fs.listStatus(path).isEmpty) + } + } + + test("save()/load() - partitioned table - simple queries") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json))) + } + } + + test("save()/load() - partitioned table - Overwrite") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - Append") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("save()/load() - partitioned table - Append - new partition values") { + withTempPath { file => + partitionedTestDF1.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.save( + source = dataSourceName, + mode = SaveMode.Append, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + checkAnswer( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchema.json)), + partitionedTestDF.collect()) + } + } + + test("save()/load() - partitioned table - ErrorIfExists") { + withTempDir { file => + intercept[RuntimeException] { + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("save()/load() - partitioned table - Ignore") { + withTempDir { file => + partitionedTestDF.save( + path = file.getCanonicalPath, + source = dataSourceName, + mode = SaveMode.Ignore) + + val path = new Path(file.getCanonicalPath) + val fs = path.getFileSystem(SparkHadoopUtil.get.conf) + assert(fs.listStatus(path).isEmpty) + } + } + + def withTable(tableName: String)(f: => Unit): Unit = { + try f finally sql(s"DROP TABLE $tableName") + } + + test("saveAsTable()/load() - non-partitioned table - Overwrite") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkAnswer(table("t"), testDF.collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - Append") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite) + + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append) + + withTable("t") { + checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) + } + } + + test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.ErrorIfExists) + } + } + } + + test("saveAsTable()/load() - non-partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + testDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Ignore) + + assert(table("t").collect().isEmpty) + } + } + + test("saveAsTable()/load() - partitioned table - simple queries") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + Map("dataSchema" -> dataSchema.json)) + + withTable("t") { + checkQueries(table("t")) + } + } + + test("saveAsTable()/load() - partitioned table - Overwrite") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - new partition values") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + withTable("t") { + checkAnswer(table("t"), partitionedTestDF.collect()) + } + } + + test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { + partitionedTestDF1.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + // Using only a subset of all partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1")) + } + + // Using different order of partition columns + intercept[Throwable] { + partitionedTestDF2.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Append, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p2", "p1")) + } + } + + test("saveAsTable()/load() - partitioned table - ErrorIfExists") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + intercept[AnalysisException] { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.ErrorIfExists, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + } + } + } + + test("saveAsTable()/load() - partitioned table - Ignore") { + Seq.empty[(Int, String)].toDF().registerTempTable("t") + + withTempTable("t") { + partitionedTestDF.saveAsTable( + tableName = "t", + source = dataSourceName, + mode = SaveMode.Ignore, + options = Map("dataSchema" -> dataSchema.json), + partitionColumns = Seq("p1", "p2")) + + assert(table("t").collect().isEmpty) + } + } + + test("Hadoop style globbing") { + withTempPath { file => + partitionedTestDF.save( + source = dataSourceName, + mode = SaveMode.Overwrite, + options = Map("path" -> file.getCanonicalPath), + partitionColumns = Seq("p1", "p2")) + + val df = load( + source = dataSourceName, + options = Map( + "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", + "dataSchema" -> dataSchema.json)) + + val expectedPaths = Set( + s"${file.getCanonicalFile}/p1=1/p2=foo", + s"${file.getCanonicalFile}/p1=2/p2=foo", + s"${file.getCanonicalFile}/p1=1/p2=bar", + s"${file.getCanonicalFile}/p1=2/p2=bar" + ).map { p => + val path = new Path(p) + val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString + } + val actualPaths = df.queryExecution.analyzed.collectFirst { + case LogicalRelation(relation: FSBasedRelation) => + relation.paths.toSet + }.getOrElse { + fail("Expect an FSBasedRelation, but none could be found") + } + + assert(actualPaths === expectedPaths) + checkAnswer(df, partitionedTestDF.collect()) + } + } +} + +class FSBasedOrcRelationSuite extends OrcRelationTest { + override val dataSourceName: String = classOf[DefaultSource].getCanonicalName + + import sqlContext._ + import sqlContext.implicits._ + + test("save()/load() - partitioned table - simple queries - partition columns in data") { + withTempDir { file => + val basePath = new Path(file.getCanonicalPath) + val fs = basePath.getFileSystem(SparkHadoopUtil.get.conf) + val qualifiedBasePath = fs.makeQualified(basePath) + + for (p1 <- 1 to 2; p2 <- Seq("foo", "bar")) { + val partitionDir = new Path(qualifiedBasePath, s"p1=$p1/p2=$p2") + sparkContext + .parallelize(for (i <- 1 to 3) yield (i, s"val_$i", p1)) + .toDF("a", "b", "p1") + .saveAsOrcFile(partitionDir.toString) + } + + val dataSchemaWithPartition = + StructType(dataSchema.fields :+ StructField("p1", IntegerType, nullable = true)) + + checkQueries( + load( + source = dataSourceName, + options = Map( + "path" -> file.getCanonicalPath, + "dataSchema" -> dataSchemaWithPartition.json))) + } + } +} \ No newline at end of file diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala new file mode 100644 index 0000000000000..8e0252e971105 --- /dev/null +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.orc + +import java.io.File +import org.scalatest.BeforeAndAfterAll + +import org.apache.spark.sql.{Row, QueryTest} +import org.apache.spark.sql.hive.test.TestHive._ + +case class OrcData(intField: Int, stringField: String) + +abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { + var orcTableDir: File = null + var orcTableAsDir: File = null + + override def beforeAll(): Unit = { + super.beforeAll() + + orcTableAsDir = File.createTempFile("orctests", "sparksql") + orcTableAsDir.delete() + orcTableAsDir.mkdir() + + // Hack: to prepare orc data files using hive external tables + orcTableDir = File.createTempFile("orctests", "sparksql") + orcTableDir.delete() + orcTableDir.mkdir() + import org.apache.spark.sql.hive.test.TestHive.implicits._ + + (sparkContext + .makeRDD(1 to 10) + .map(i => OrcData(i, s"part-$i"))) + .toDF.registerTempTable(s"orc_temp_table") + + sql(s""" + create external table normal_orc + ( + intField INT, + stringField STRING + ) + STORED AS orc + location '${orcTableDir.getCanonicalPath}' + """) + + sql( + s"""insert into table normal_orc + select intField, stringField from orc_temp_table""") + + } + + override def afterAll(): Unit = { + orcTableDir.delete() + orcTableAsDir.delete() + } + + test("create temporary orc table") { + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_source"), Row(10)) + + checkAnswer( + sql("SELECT * FROM normal_orc_source"), + Row(1, "part-1") :: + Row(2, "part-2") :: + Row(3, "part-3") :: + Row(4, "part-4") :: + Row(5, "part-5") :: + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(10, "part-10") :: Nil + ) + + checkAnswer( + sql("SELECT * FROM normal_orc_source where intField > 5"), + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(10, "part-10") :: Nil + ) + + checkAnswer( + sql("SELECT count(intField), stringField FROM normal_orc_source group by stringField"), + Row(1, "part-1") :: + Row(1, "part-2") :: + Row(1, "part-3") :: + Row(1, "part-4") :: + Row(1, "part-5") :: + Row(1, "part-6") :: + Row(1, "part-7") :: + Row(1, "part-8") :: + Row(1, "part-9") :: + Row(1, "part-10") :: Nil + ) + + } + + test("create temporary orc table as") { + checkAnswer(sql("SELECT COUNT(*) FROM normal_orc_as_source"), Row(10)) + + checkAnswer( + sql("SELECT * FROM normal_orc_source"), + Row(1, "part-1") :: + Row(2, "part-2") :: + Row(3, "part-3") :: + Row(4, "part-4") :: + Row(5, "part-5") :: + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(10, "part-10") :: Nil + ) + + checkAnswer( + sql("SELECT * FROM normal_orc_source where intField > 5"), + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(10, "part-10") :: Nil + ) + + checkAnswer( + sql("SELECT count(intField), stringField FROM normal_orc_source group by stringField"), + Row(1, "part-1") :: + Row(1, "part-2") :: + Row(1, "part-3") :: + Row(1, "part-4") :: + Row(1, "part-5") :: + Row(1, "part-6") :: + Row(1, "part-7") :: + Row(1, "part-8") :: + Row(1, "part-9") :: + Row(1, "part-10") :: Nil + ) + + } + + test("appending insert") { + sql("insert into table normal_orc_source select * from orc_temp_table where intField > 5") + checkAnswer( + sql("select * from normal_orc_source"), + Row(1, "part-1") :: + Row(2, "part-2") :: + Row(3, "part-3") :: + Row(4, "part-4") :: + Row(5, "part-5") :: + Row(6, "part-6") :: + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(9, "part-9") :: + Row(10, "part-10") :: + Row(10, "part-10") :: Nil + ) + } + + test("overwrite insert") { + sql("insert overwrite table normal_orc_as_source select * from orc_temp_table where intField > 5") + checkAnswer( + sql("select * from normal_orc_as_source"), + Row(6, "part-6") :: + Row(7, "part-7") :: + Row(8, "part-8") :: + Row(9, "part-9") :: + Row(10, "part-10") :: Nil + ) + } +} + +class OrcSourceSuite extends OrcSuite { + override def beforeAll(): Unit = { + super.beforeAll() + + sql( s""" + create temporary table normal_orc_source + USING org.apache.spark.sql.hive.orc + OPTIONS ( + path '${new File(orcTableDir.getAbsolutePath).getCanonicalPath}' + ) + """) + + sql( s""" + create temporary table normal_orc_as_source + USING org.apache.spark.sql.hive.orc + OPTIONS ( + path '${new File(orcTableAsDir.getAbsolutePath).getCanonicalPath}' + ) + as select * from orc_temp_table + """) + } +} From 90ded0b23e6a0478a0524f289b42bd7f446209e7 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 13 May 2015 12:17:02 -0700 Subject: [PATCH 2/7] minor change --- .../apache/spark/sql/hive/orc/OrcQuerySuite.scala | 13 ++++++++----- ...OrcRelationTest.scala => OrcRelationSuite.scala} | 2 +- .../orc/{OrcSuite.scala => OrcSourceSuite.scala} | 3 ++- 3 files changed, 11 insertions(+), 7 deletions(-) rename sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/{OrcRelationTest.scala => OrcRelationSuite.scala} (99%) rename sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/{OrcSuite.scala => OrcSourceSuite.scala} (98%) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 90490df765ca6..3c596d0654324 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -91,7 +91,8 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { val tempDir = getTempFilePath("orcTest").getCanonicalPath val range = (0 to 255) val data = sparkContext.parallelize(range) - .map(x => AllDataTypes(s"$x", x, x.toLong, x.toFloat, x.toDouble, x.toShort, x.toByte, x % 2 == 0)) + .map(x => + AllDataTypes(s"$x", x, x.toLong, x.toFloat,x.toDouble, x.toShort, x.toByte, x % 2 == 0)) data.toDF().saveAsOrcFile(tempDir) checkAnswer( TestHive.orcFile(tempDir), @@ -101,7 +102,8 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { test("read/write binary data") { val tempDir = getTempFilePath("orcTest").getCanonicalPath - sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil).toDF().saveAsOrcFile(tempDir) + sparkContext.parallelize(BinaryData("test".getBytes("utf8")) :: Nil) + .toDF().saveAsOrcFile(tempDir) TestHive.orcFile(tempDir) .map(r => new String(r(0).asInstanceOf[Array[Byte]], "utf8")) .collect().toSeq == Seq("test") @@ -136,7 +138,8 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { rdd.foreach { // '===' does not like string comparison? row: Row => { - assert(row.getString(1).equals(s"val_$counter"), s"row $counter value ${row.getString(1)} does not match val_$counter") + assert(row.getString(1).equals(s"val_$counter"), + s"row $counter value ${row.getString(1)} does not match val_$counter") counter = counter + 1 } } @@ -173,7 +176,7 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { // We only support zlib in hive0.12.0 now test("Default Compression options for writing to an Orcfile") { - //TODO: support other compress codec + // TODO: support other compress codec var tempDir = getTempFilePath("orcTest").getCanonicalPath val rdd = sparkContext.parallelize((1 to 100)) .map(i => TestRDDEntry(i, s"val_$i")) @@ -184,7 +187,7 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { } // Following codec is supported in hive-0.13.1, ignore it now - ignore("Other Compression options for writing to an Orcfile only supported in hive 0.13.1 and above") { + ignore("Other Compression options for writing to an Orcfile - 0.13.1 and above") { TestHive.sparkContext.hadoopConfiguration.set(orcDefaultCompressVar, "SNAPPY") var tempDir = getTempFilePath("orcTest").getCanonicalPath val rdd = sparkContext.parallelize((1 to 100)) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala similarity index 99% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala index 8ed8c53e81613..0b486068d4b0b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala @@ -530,4 +530,4 @@ class FSBasedOrcRelationSuite extends OrcRelationTest { "dataSchema" -> dataSchemaWithPartition.json))) } } -} \ No newline at end of file +} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala similarity index 98% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala index 8e0252e971105..f86750bcfb6d4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcSourceSuite.scala @@ -175,7 +175,8 @@ abstract class OrcSuite extends QueryTest with BeforeAndAfterAll { } test("overwrite insert") { - sql("insert overwrite table normal_orc_as_source select * from orc_temp_table where intField > 5") + sql("insert overwrite table normal_orc_as_source select * " + + "from orc_temp_table where intField > 5") checkAnswer( sql("select * from normal_orc_as_source"), Row(6, "part-6") :: From e7f71782d24a7caacf1d0312af533eb660537134 Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 13 May 2015 16:12:49 -0700 Subject: [PATCH 3/7] predicate fix --- .../main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 8e73f9181f08b..0a9924b139a48 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -99,6 +99,10 @@ private[sql] object OrcFilters extends Logging { Some(b1) } case p@IsNull(attribute: String) => { + val b1 = builder.isNull(attribute) + Some(b1) + } + case p@IsNotNull(attribute: String) => { val b1 = builder.startNot().isNull(attribute).end() Some(b1) } From a76d5b893554fcc1fdc280b4bde27c292c58f0cf Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 13 May 2015 17:01:06 -0700 Subject: [PATCH 4/7] reuse test suite --- .../hive/orc/OrcPartitionDiscoverySuite.scala | 20 - .../spark/sql/hive/orc/OrcRelationSuite.scala | 479 +----------------- 2 files changed, 2 insertions(+), 497 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index b8fe582498ecf..7ebf3c6eced26 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -254,25 +254,5 @@ class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with Before } } } - - ignore("read partitioned table - merging compatible schemas: not supported yet") { - withTempDir { base => - makeOrcFile( - (1 to 10).map(i => Tuple1(i)).toDF("intField"), - makePartitionDir(base, defaultPartitionName, "pi" -> 1)) - - makeOrcFile( - (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"), - makePartitionDir(base, defaultPartitionName, "pi" -> 2)) - - load(base.getCanonicalPath, "org.apache.spark.sql.hive.orc").registerTempTable("t") - - withTempTable("t") { - checkAnswer( - sql("SELECT * FROM t"), - (1 to 10).map(i => Row(i, null, 1)) ++ (1 to 10).map(i => Row(i, i.toString, 2))) - } - } - } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala index 0b486068d4b0b..1d8c421b90678 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcRelationSuite.scala @@ -20,486 +20,11 @@ package org.apache.spark.sql.hive.orc import org.apache.hadoop.fs.Path import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.sql._ -import org.apache.spark.sql.hive.test.TestHive -import org.apache.spark.sql.parquet.ParquetTest -import org.apache.spark.sql.sources.{FSBasedRelation, LogicalRelation} +import org.apache.spark.sql.sources.{FSBasedRelationTest} import org.apache.spark.sql.types._ -// TODO Don't extend ParquetTest -// This test suite extends ParquetTest for some convenient utility methods. These methods should be -// moved to some more general places, maybe QueryTest. -class OrcRelationTest extends QueryTest with ParquetTest { - override val sqlContext: SQLContext = TestHive - import sqlContext._ - import sqlContext.implicits._ - - val dataSourceName = classOf[DefaultSource].getCanonicalName - - val dataSchema = - StructType( - Seq( - StructField("a", IntegerType, nullable = false), - StructField("b", StringType, nullable = false))) - - val testDF = (1 to 3).map(i => (i, s"val_$i")).toDF("a", "b") - - val partitionedTestDF1 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 1, p2)).toDF("a", "b", "p1", "p2") - - val partitionedTestDF2 = (for { - i <- 1 to 3 - p2 <- Seq("foo", "bar") - } yield (i, s"val_$i", 2, p2)).toDF("a", "b", "p1", "p2") - - val partitionedTestDF = partitionedTestDF1.unionAll(partitionedTestDF2) - - def checkQueries(df: DataFrame): Unit = { - // Selects everything - checkAnswer( - df, - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - - // Simple filtering and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 === 2), - for (i <- 2 to 3; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", 2, p2)) - - // Simple projection and filtering - checkAnswer( - df.filter('a > 1).select('b, 'a + 1), - for (i <- 2 to 3; _ <- 1 to 2; _ <- Seq("foo", "bar")) yield Row(s"val_$i", i + 1)) - - // Simple projection and partition pruning - checkAnswer( - df.filter('a > 1 && 'p1 < 2).select('b, 'p1), - for (i <- 2 to 3; _ <- Seq("foo", "bar")) yield Row(s"val_$i", 1)) - - // Self-join - df.registerTempTable("t") - withTempTable("t") { - checkAnswer( - sql( - """SELECT l.a, r.b, l.p1, r.p2 - |FROM t l JOIN t r - |ON l.a = r.a AND l.p1 = r.p1 AND l.p2 = r.p2 - """.stripMargin), - for (i <- 1 to 3; p1 <- 1 to 2; p2 <- Seq("foo", "bar")) yield Row(i, s"val_$i", p1, p2)) - } - } - - test("save()/load() - non-partitioned table - Overwrite") { - withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) - - checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - testDF.collect()) - } - } - - test("save()/load() - non-partitioned table - Append") { - withTempPath { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Append) - - checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)).orderBy("a"), - testDF.unionAll(testDF).orderBy("a").collect()) - } - } - - test("save()/load() - non-partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[RuntimeException] { - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.ErrorIfExists) - } - } - } - - test("save()/load() - non-partitioned table - Ignore") { - withTempDir { file => - testDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Ignore) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - assert(fs.listStatus(path).isEmpty) - } - } - - test("save()/load() - partitioned table - simple queries") { - withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkQueries( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json))) - } - } - - test("save()/load() - partitioned table - Overwrite") { - withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - Append") { - withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.unionAll(partitionedTestDF).collect()) - } - } - - test("save()/load() - partitioned table - Append - new partition values") { - withTempPath { file => - partitionedTestDF1.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.save( - source = dataSourceName, - mode = SaveMode.Append, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - checkAnswer( - load( - source = dataSourceName, - options = Map( - "path" -> file.getCanonicalPath, - "dataSchema" -> dataSchema.json)), - partitionedTestDF.collect()) - } - } - - test("save()/load() - partitioned table - ErrorIfExists") { - withTempDir { file => - intercept[RuntimeException] { - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - } - } - } - - test("save()/load() - partitioned table - Ignore") { - withTempDir { file => - partitionedTestDF.save( - path = file.getCanonicalPath, - source = dataSourceName, - mode = SaveMode.Ignore) - - val path = new Path(file.getCanonicalPath) - val fs = path.getFileSystem(SparkHadoopUtil.get.conf) - assert(fs.listStatus(path).isEmpty) - } - } - - def withTable(tableName: String)(f: => Unit): Unit = { - try f finally sql(s"DROP TABLE $tableName") - } - - test("saveAsTable()/load() - non-partitioned table - Overwrite") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) - - withTable("t") { - checkAnswer(table("t"), testDF.collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - Append") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite) - - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append) - - withTable("t") { - checkAnswer(table("t"), testDF.unionAll(testDF).orderBy("a").collect()) - } - } - - test("saveAsTable()/load() - non-partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.ErrorIfExists) - } - } - } - - test("saveAsTable()/load() - non-partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - testDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Ignore) - - assert(table("t").collect().isEmpty) - } - } - - test("saveAsTable()/load() - partitioned table - simple queries") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - Map("dataSchema" -> dataSchema.json)) - - withTable("t") { - checkQueries(table("t")) - } - } - - test("saveAsTable()/load() - partitioned table - Overwrite") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.unionAll(partitionedTestDF).collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - new partition values") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - withTable("t") { - checkAnswer(table("t"), partitionedTestDF.collect()) - } - } - - test("saveAsTable()/load() - partitioned table - Append - mismatched partition columns") { - partitionedTestDF1.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - // Using only a subset of all partition columns - intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1")) - } - - // Using different order of partition columns - intercept[Throwable] { - partitionedTestDF2.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Append, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p2", "p1")) - } - } - - test("saveAsTable()/load() - partitioned table - ErrorIfExists") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - intercept[AnalysisException] { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.ErrorIfExists, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - } - } - } - - test("saveAsTable()/load() - partitioned table - Ignore") { - Seq.empty[(Int, String)].toDF().registerTempTable("t") - - withTempTable("t") { - partitionedTestDF.saveAsTable( - tableName = "t", - source = dataSourceName, - mode = SaveMode.Ignore, - options = Map("dataSchema" -> dataSchema.json), - partitionColumns = Seq("p1", "p2")) - - assert(table("t").collect().isEmpty) - } - } - - test("Hadoop style globbing") { - withTempPath { file => - partitionedTestDF.save( - source = dataSourceName, - mode = SaveMode.Overwrite, - options = Map("path" -> file.getCanonicalPath), - partitionColumns = Seq("p1", "p2")) - - val df = load( - source = dataSourceName, - options = Map( - "path" -> s"${file.getCanonicalPath}/p1=*/p2=???", - "dataSchema" -> dataSchema.json)) - - val expectedPaths = Set( - s"${file.getCanonicalFile}/p1=1/p2=foo", - s"${file.getCanonicalFile}/p1=2/p2=foo", - s"${file.getCanonicalFile}/p1=1/p2=bar", - s"${file.getCanonicalFile}/p1=2/p2=bar" - ).map { p => - val path = new Path(p) - val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) - path.makeQualified(fs.getUri, fs.getWorkingDirectory).toString - } - val actualPaths = df.queryExecution.analyzed.collectFirst { - case LogicalRelation(relation: FSBasedRelation) => - relation.paths.toSet - }.getOrElse { - fail("Expect an FSBasedRelation, but none could be found") - } - - assert(actualPaths === expectedPaths) - checkAnswer(df, partitionedTestDF.collect()) - } - } -} - -class FSBasedOrcRelationSuite extends OrcRelationTest { +class FSBasedOrcRelationSuite extends FSBasedRelationTest { override val dataSourceName: String = classOf[DefaultSource].getCanonicalName import sqlContext._ From dc1bfa16cd9a3a559aed658571fd8acbf5d002cd Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Wed, 13 May 2015 19:48:02 -0700 Subject: [PATCH 5/7] save mode fix --- .../src/main/scala/org/apache/spark/sql/hive/orc/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala index a85f035a9424d..93af5af196dab 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -42,7 +42,7 @@ package object orc { dataFrame.save( path, source = classOf[DefaultSource].getCanonicalName, - mode = SaveMode.Overwrite) + mode) } } From 8b885d6cf384d5785a76362547cafdcf1196769d Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 14 May 2015 13:19:30 -0700 Subject: [PATCH 6/7] resolve review comments --- .../sql/hive/orc/HadoopTypeConverter.scala | 39 +---- .../spark/sql/hive/orc/OrcFileOperator.scala | 22 +-- .../spark/sql/hive/orc/OrcFilters.scala | 133 ++++++------------ .../spark/sql/hive/orc/OrcRelation.scala | 94 +++++-------- .../sql/hive/orc/OrcTableOperations.scala | 51 +++---- .../apache/spark/sql/hive/orc/package.scala | 19 +-- .../hive/orc/OrcPartitionDiscoverySuite.scala | 7 +- .../spark/sql/hive/orc/OrcQuerySuite.scala | 29 +++- 8 files changed, 157 insertions(+), 237 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala index aabc5477b05a6..713c076aee457 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/HadoopTypeConverter.scala @@ -17,13 +17,12 @@ package org.apache.spark.sql.hive.orc -import org.apache.hadoop.hive.common.`type`.HiveVarchar -import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} + import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive._ -import org.apache.spark.sql.catalyst.expressions.{Row, MutableRow} -import scala.collection.JavaConversions._ +import org.apache.spark.sql.catalyst.expressions.MutableRow +import org.apache.spark.sql.hive.{HiveInspectors, HiveShim} /** * We can consolidate TableReader.unwrappers and HiveInspectors.wrapperFor to use @@ -59,35 +58,5 @@ private[hive] object HadoopTypeConverter extends HiveInspectors { /** * Wraps with Hive types based on object inspector. */ - def wrappers(oi: ObjectInspector): Any => Any = oi match { - case _: JavaHiveVarcharObjectInspector => - (o: Any) => new HiveVarchar(o.asInstanceOf[String], o.asInstanceOf[String].size) - - case _: JavaHiveDecimalObjectInspector => - (o: Any) => HiveShim.createDecimal(o.asInstanceOf[BigDecimal].underlying()) - - case soi: StandardStructObjectInspector => - val wrappers = soi.getAllStructFieldRefs.map(ref => wrapperFor(ref.getFieldObjectInspector)) - (o: Any) => { - val struct = soi.create() - (soi.getAllStructFieldRefs, wrappers, o.asInstanceOf[Row].toSeq).zipped.foreach { - (field, wrapper, data) => soi.setStructFieldData(struct, field, wrapper(data)) - } - struct - } - - case loi: ListObjectInspector => - val wrapper = wrapperFor(loi.getListElementObjectInspector) - (o: Any) => seqAsJavaList(o.asInstanceOf[Seq[_]].map(wrapper)) - - case moi: MapObjectInspector => - val keyWrapper = wrapperFor(moi.getMapKeyObjectInspector) - val valueWrapper = wrapperFor(moi.getMapValueObjectInspector) - (o: Any) => mapAsJavaMap(o.asInstanceOf[Map[_, _]].map { case (key, value) => - keyWrapper(key) -> valueWrapper(value) - }) - - case _ => - identity[Any] - } + def wrappers(oi: ObjectInspector): Any => Any = wrapperFor(oi) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala index 6805677e84ea9..4dd2d8951b728 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFileOperator.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.hive.orc -import java.io.IOException - import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.io.orc.{OrcFile, Reader} import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector + import org.apache.spark.Logging import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql.hive.HiveMetastoreTypes @@ -31,7 +30,7 @@ import org.apache.spark.sql.types.StructType private[orc] object OrcFileOperator extends Logging{ def getFileReader(pathStr: String, config: Option[Configuration] = None ): Reader = { - var conf = config.getOrElse(new Configuration) + val conf = config.getOrElse(new Configuration) val fspath = new Path(pathStr) val fs = fspath.getFileSystem(conf) val orcFiles = listOrcFiles(pathStr, conf) @@ -53,19 +52,6 @@ private[orc] object OrcFileOperator extends Logging{ readerInspector } - def deletePath(pathStr: String, conf: Configuration): Unit = { - val fspath = new Path(pathStr) - val fs = fspath.getFileSystem(conf) - try { - fs.delete(fspath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${fspath.toString} prior" - + s" to InsertIntoOrcTable:\n${e.toString}") - } - } - def listOrcFiles(pathStr: String, conf: Configuration): Seq[Path] = { val origPath = new Path(pathStr) val fs = origPath.getFileSystem(conf) @@ -80,8 +66,6 @@ private[orc] object OrcFileOperator extends Logging{ throw new IllegalArgumentException( s"orcFileOperator: path $path does not have valid orc files matching the pattern") } - logInfo("Qualified file list: ") - paths.foreach{x=>logInfo(x.toString)} paths } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala index 0a9924b139a48..eda1cffe49810 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcFilters.scala @@ -22,100 +22,55 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.Builder import org.apache.spark.Logging import org.apache.spark.sql.sources._ -private[sql] object OrcFilters extends Logging { - +/** + * It may be optimized by push down partial filters. But we are conservative here. + * Because if some filters fail to be parsed, the tree may be corrupted, + * and cannot be used anymore. + */ +private[orc] object OrcFilters extends Logging { def createFilter(expr: Array[Filter]): Option[SearchArgument] = { - if (expr == null || expr.size == 0) return None - var sarg: Option[Builder] = Some(SearchArgument.FACTORY.newBuilder()) - sarg.get.startAnd() - expr.foreach { - x => { - sarg match { - case Some(s1) => sarg = createFilter(x, s1) - case _ => None - } - } - } - sarg match { - case Some(b) => Some(b.end.build) - case _ => None + if (expr.nonEmpty) { + expr.foldLeft(Some(SearchArgument.FACTORY.newBuilder().startAnd()): Option[Builder]) { + (maybeBuilder, e) => createFilter(e, maybeBuilder) + }.map(_.end().build()) + } else { + None } } - def createFilter(expression: Filter, builder: Builder): Option[Builder] = { - expression match { - case p@And(left: Filter, right: Filter) => { - val b1 = builder.startAnd() - val b2 = createFilter(left, b1) - b2 match { - case Some(b) => val b3 = createFilter(right, b) - if (b3.isDefined) { - Some(b3.get.end) - } else { - None - } - case _ => None - } - } - case p@Or(left: Filter, right: Filter) => { - val b1 = builder.startOr() - val b2 = createFilter(left, b1) - b2 match { - case Some(b) => val b3 = createFilter(right, b) - if (b3.isDefined) { - Some(b3.get.end) - } else { - None - } - case _ => None - } - } - case p@Not(child: Filter) => { - val b1 = builder.startNot() - val b2 = createFilter(child, b1) - b2 match { - case Some(b) => Some(b.end) - case _ => None - } - } - case p@EqualTo(attribute: String, value: Any) => { - val b1 = builder.equals(attribute, value) - Some(b1) - } - case p@LessThan(attribute: String, value: Any) => { - val b1 = builder.lessThan(attribute ,value) - Some(b1) - } - case p@LessThanOrEqual(attribute: String, value: Any) => { - val b1 = builder.lessThanEquals(attribute, value) - Some(b1) - } - case p@GreaterThan(attribute: String, value: Any) => { - val b1 = builder.startNot().lessThanEquals(attribute, value).end() - Some(b1) - } - case p@GreaterThanOrEqual(attribute: String, value: Any) => { - val b1 = builder.startNot().lessThan(attribute, value).end() - Some(b1) - } - case p@IsNull(attribute: String) => { - val b1 = builder.isNull(attribute) - Some(b1) - } - case p@IsNotNull(attribute: String) => { - val b1 = builder.startNot().isNull(attribute).end() - Some(b1) - } - case p@In(attribute: String, values: Array[Any]) => { - val b1 = builder.in(attribute, values) - Some(b1) + private def createFilter(expression: Filter, maybeBuilder: Option[Builder]): Option[Builder] = { + maybeBuilder.flatMap { builder => + expression match { + case p@And(left, right) => + for { + lhs <- createFilter(left, Some(builder.startAnd())) + rhs <- createFilter(right, Some(lhs)) + } yield rhs.end() + case p@Or(left, right) => + for { + lhs <- createFilter(left, Some(builder.startOr())) + rhs <- createFilter(right, Some(lhs)) + } yield rhs.end() + case p@Not(child) => + createFilter(child, Some(builder.startNot())).map(_.end()) + case p@EqualTo(attribute, value) => + Some(builder.equals(attribute, value)) + case p@LessThan(attribute, value) => + Some(builder.lessThan(attribute, value)) + case p@LessThanOrEqual(attribute, value) => + Some(builder.lessThanEquals(attribute, value)) + case p@GreaterThan(attribute, value) => + Some(builder.startNot().lessThanEquals(attribute, value).end()) + case p@GreaterThanOrEqual(attribute, value) => + Some(builder.startNot().lessThan(attribute, value).end()) + case p@IsNull(attribute) => + Some(builder.isNull(attribute)) + case p@IsNotNull(attribute) => + Some(builder.startNot().isNull(attribute).end()) + case p@In(attribute, values) => + Some(builder.in(attribute, values)) + case _ => None } - // not supported in filter - // case p@EqualNullSafe(left: String, right: String) => { - // val b1 = builder.nullSafeEquals(left, right) - // Some(b1) - // } - case _ => None } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 816f3794a6a02..c68a58647cad7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -26,11 +26,11 @@ import org.apache.hadoop.hive.serde2.typeinfo.{TypeInfoUtils, TypeInfo} import org.apache.hadoop.io.{Writable, NullWritable} import org.apache.hadoop.mapred.{RecordWriter, Reporter, JobConf} import org.apache.hadoop.mapreduce.{TaskID, TaskAttemptContext} + import org.apache.spark.Logging import org.apache.spark.annotation.DeveloperApi import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} @@ -39,7 +39,6 @@ import scala.collection.JavaConversions._ private[sql] class DefaultSource extends FSBasedRelationProvider { - def createRelation( sqlContext: SQLContext, paths: Array[String], @@ -54,7 +53,7 @@ private[sql] class DefaultSource extends FSBasedRelationProvider { private[sql] class OrcOutputWriter extends OutputWriter with SparkHadoopMapRedUtil { - var recordWriter: RecordWriter[NullWritable, Writable] = _ + var taskAttemptContext: TaskAttemptContext = _ var serializer: OrcSerde = _ var wrappers: Array[Any => Any] = _ @@ -62,90 +61,75 @@ private[sql] class OrcOutputWriter extends OutputWriter with SparkHadoopMapRedUt var path: String = _ var dataSchema: StructType = _ var fieldOIs: Array[ObjectInspector] = _ - var standardOI: StructObjectInspector = _ - + var structOI: StructObjectInspector = _ + var outputData: Array[Any] = _ + lazy val recordWriter: RecordWriter[NullWritable, Writable] = { + created = true + val conf = taskAttemptContext.getConfiguration + val taskId: TaskID = taskAttemptContext.getTaskAttemptID.getTaskID + val partition: Int = taskId.getId + val filename = f"part-r-$partition%05d-${System.currentTimeMillis}%015d.orc" + val file = new Path(path, filename) + val fs = file.getFileSystem(conf) + val outputFormat = new OrcOutputFormat() + outputFormat.getRecordWriter(fs, + conf.asInstanceOf[JobConf], + file.toUri.getPath, Reporter.NULL) + .asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] + } override def init(path: String, dataSchema: StructType, context: TaskAttemptContext): Unit = { this.path = path - this.dataSchema = dataSchema taskAttemptContext = context + val orcSchema = HiveMetastoreTypes.toMetastoreType(dataSchema) + serializer = new OrcSerde + val typeInfo: TypeInfo = + TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) + structOI = TypeInfoUtils + .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) + .asInstanceOf[StructObjectInspector] + fieldOIs = structOI + .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + outputData = new Array[Any](fieldOIs.length) + wrappers = fieldOIs.map(HadoopTypeConverter.wrappers) } - // Avoid create empty file without schema attached - private def initWriter() = { - if (!created) { - created = true - val conf = taskAttemptContext.getConfiguration - val outputFormat = new OrcOutputFormat() - val taskId: TaskID = taskAttemptContext.getTaskAttemptID.getTaskID - val partition: Int = taskId.getId - val filename = s"part-r-${partition}-${System.currentTimeMillis}.orc" - val file = new Path(path, filename) - val fs = file.getFileSystem(conf) - val orcSchema = HiveMetastoreTypes.toMetastoreType(dataSchema) - - serializer = new OrcSerde - val typeInfo: TypeInfo = - TypeInfoUtils.getTypeInfoFromTypeString(orcSchema) - standardOI = TypeInfoUtils - .getStandardJavaObjectInspectorFromTypeInfo(typeInfo) - .asInstanceOf[StructObjectInspector] - fieldOIs = standardOI - .getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - wrappers = fieldOIs.map(HadoopTypeConverter.wrappers) - recordWriter = { - outputFormat.getRecordWriter(fs, - conf.asInstanceOf[JobConf], - file.toUri.getPath, Reporter.NULL) - .asInstanceOf[org.apache.hadoop.mapred.RecordWriter[NullWritable, Writable]] - } - } - } override def write(row: Row): Unit = { - initWriter() var i = 0 - val outputData = new Array[Any](fieldOIs.length) while (i < row.length) { outputData(i) = wrappers(i)(row(i)) i += 1 } - val writable = serializer.serialize(outputData, standardOI) + val writable = serializer.serialize(outputData, structOI) recordWriter.write(NullWritable.get(), writable) } override def close(): Unit = { - if (recordWriter != null) { + if (created) { recordWriter.close(Reporter.NULL) } } } - @DeveloperApi -private[sql] case class OrcRelation(override val paths: Array[String], +private[sql] case class OrcRelation( + override val paths: Array[String], parameters: Map[String, String], maybeSchema: Option[StructType] = None, maybePartitionSpec: Option[PartitionSpec] = None)( @transient val sqlContext: SQLContext) extends FSBasedRelation(paths, maybePartitionSpec) with Logging { - self: Product => - @transient val conf = sqlContext.sparkContext.hadoopConfiguration - - - override def dataSchema: StructType = - maybeSchema.getOrElse(OrcFileOperator.readSchema(paths(0), Some(conf))) + override val dataSchema: StructType = + maybeSchema.getOrElse(OrcFileOperator.readSchema(paths(0), + Some(sqlContext.sparkContext.hadoopConfiguration))) override def outputWriterClass: Class[_ <: OutputWriter] = classOf[OrcOutputWriter] - /** Attributes */ - var output: Seq[Attribute] = schema.toAttributes override def needConversion: Boolean = false - // Equals must also take into account the output attributes so that we can distinguish between - // different instances of the same relation, override def equals(other: Any): Boolean = other match { case that: OrcRelation => paths.toSet == that.paths.toSet && @@ -162,6 +146,7 @@ private[sql] case class OrcRelation(override val paths: Array[String], schema, maybePartitionSpec) } + override def buildScan(requiredColumns: Array[String], filters: Array[Filter], inputPaths: Array[String]): RDD[Row] = { @@ -169,8 +154,3 @@ private[sql] case class OrcRelation(override val paths: Array[String], OrcTableScan(output, this, filters, inputPaths).execute() } } - -private[sql] object OrcRelation extends Logging { - // Default partition name to use when the partition column value is null or empty string. - val DEFAULT_PARTITION_NAME = "__HIVE_DEFAULT_PARTITION__" -} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala index 94c78a14524b5..2163b0ce70e99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcTableOperations.scala @@ -20,50 +20,52 @@ package org.apache.spark.sql.hive.orc import java.util._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.io.orc._ import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.input.FileInputFormat -import org.apache.spark.rdd.RDD + +import org.apache.spark.rdd.{HadoopRDD, RDD} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.hive.HiveShim import org.apache.spark.sql.sources.Filter import org.apache.spark.{Logging, SerializableWritable} + +/* Implicit conversions */ import scala.collection.JavaConversions._ -case class OrcTableScan(attributes: Seq[Attribute], +private[orc] case class OrcTableScan(attributes: Seq[Attribute], @transient relation: OrcRelation, filters: Array[Filter], inputPaths: Array[String]) extends Logging { - @transient val sqlContext = relation.sqlContext - val path = relation.paths(0) + @transient private val sqlContext = relation.sqlContext - def addColumnIds(output: Seq[Attribute], - relation: OrcRelation, conf: Configuration) { - val ids = - output.map(a => - relation.dataSchema.toAttributes.indexWhere(_.name == a.name): Integer) - .filter(_ >= 0) - val names = attributes.map(_.name) - val sorted = ids.zip(names).sorted - HiveShim.appendReadColumns(conf, sorted.map(_._1), sorted.map(_._2)) + private def addColumnIds( + output: Seq[Attribute], + relation: OrcRelation, + conf: Configuration): Unit = { + val ids = output.map(a => relation.dataSchema.fieldIndex(a.name): Integer) + val (sortedIds, sortedNames) = ids.zip(attributes.map(_.name)).sorted.unzip + HiveShim.appendReadColumns(conf, sortedIds, sortedNames) } - def buildFilter(job: Job, filters: Array[Filter]): Unit = { + private def buildFilter(job: Job, filters: Array[Filter]): Unit = { if (ORC_FILTER_PUSHDOWN_ENABLED) { val conf: Configuration = job.getConfiguration - val recordFilter = OrcFilters.createFilter(filters) - if (recordFilter.isDefined) { - conf.set(SARG_PUSHDOWN, toKryo(recordFilter.get)) - conf.setBoolean(INDEX_FILTER, true) + OrcFilters.createFilter(filters).foreach { f => + conf.set(SARG_PUSHDOWN, toKryo(f)) + conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) } } } // Transform all given raw `Writable`s into `Row`s. - def fillObject(conf: Configuration, + private def fillObject( + path: String, + conf: Configuration, iterator: Iterator[org.apache.hadoop.io.Writable], nonPartitionKeyAttrs: Seq[(Attribute, Int)], mutableRow: MutableRow): Iterator[Row] = { @@ -77,7 +79,6 @@ case class OrcTableScan(attributes: Seq[Attribute], // Map each tuple to a row object iterator.map { value => val raw = deserializer.deserialize(value) - logDebug("Raw data: " + raw) var i = 0 while (i < fieldRefs.length) { val fieldValue = soi.getStructFieldData(raw, fieldRefs(i)) @@ -105,11 +106,13 @@ case class OrcTableScan(attributes: Seq[Attribute], Class[_ <: org.apache.hadoop.mapred.InputFormat[NullWritable, Writable]]] val rdd = sc.hadoopRDD(conf.asInstanceOf[JobConf], - inputClass, classOf[NullWritable], classOf[Writable]).map(_._2) - val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + inputClass, classOf[NullWritable], classOf[Writable]) + .asInstanceOf[HadoopRDD[NullWritable, Writable]] val wrappedConf = new SerializableWritable(conf) - val rowRdd: RDD[Row] = rdd.mapPartitions { iter => - fillObject(wrappedConf.value, iter, attributes.zipWithIndex, mutableRow) + val rowRdd: RDD[Row] = rdd.mapPartitionsWithInputSplit { case (split: OrcSplit, iter) => + val pathStr = split.getPath.toString + val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) + fillObject(pathStr, wrappedConf.value, iter.map(_._2), attributes.zipWithIndex, mutableRow) } rowRdd } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala index 93af5af196dab..b219fbb44ca0d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/package.scala @@ -21,19 +21,21 @@ import com.esotericsoftware.kryo.io.Output import com.esotericsoftware.kryo.Kryo import org.apache.commons.codec.binary.Base64 import org.apache.spark.sql.{SaveMode, DataFrame} -import scala.reflect.runtime.universe.{TypeTag, typeTag} package object orc { implicit class OrcContext(sqlContext: HiveContext) { import sqlContext._ @scala.annotation.varargs - def orcFile(paths: String*): DataFrame = { - if (paths.isEmpty) { - emptyDataFrame - } else { - val orcRelation = OrcRelation(paths.toArray, Map.empty)(sqlContext) - sqlContext.baseRelationToDataFrame(orcRelation) + def orcFile(path: String, paths: String*): DataFrame = { + val pathArray: Array[String] = { + if (paths.isEmpty) { + Array(path) + } else { + paths.toArray ++ Array(path) + } } + val orcRelation = OrcRelation(pathArray, Map.empty)(sqlContext) + sqlContext.baseRelationToDataFrame(orcRelation) } } @@ -49,8 +51,7 @@ package object orc { // Flags for orc copression, predicates pushdown, etc. val orcDefaultCompressVar = "hive.exec.orc.default.compress" var ORC_FILTER_PUSHDOWN_ENABLED = true - val SARG_PUSHDOWN = "sarg.pushdown"; - val INDEX_FILTER = "hive.optimize.index.filter" + val SARG_PUSHDOWN = "sarg.pushdown" def toKryo(input: Any): String = { val out = new Output(4 * 1024, 10 * 1024 * 1024); diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala index 7ebf3c6eced26..31a829a81124d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcPartitionDiscoverySuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hive.orc import java.io.File +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.hive.test.TestHive @@ -37,7 +38,7 @@ case class OrcParData(intField: Int, stringField: String) case class OrcParDataWithKey(intField: Int, pi: Int, stringField: String, ps: String) class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { - val defaultPartitionName = "__HIVE_DEFAULT_PARTITION__" + val defaultPartitionName = ConfVars.DEFAULTPARTITIONNAME.defaultVal def withTempDir(f: File => Unit): Unit = { val dir = Utils.createTempDir().getCanonicalFile @@ -187,7 +188,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with Before "org.apache.spark.sql.hive.orc.DefaultSource", Map( "path" -> base.getCanonicalPath, - OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + ConfVars.DEFAULTPARTITIONNAME.varname -> defaultPartitionName)) orcRelation.registerTempTable("t") @@ -232,7 +233,7 @@ class OrcPartitionDiscoverySuite extends QueryTest with FunSuiteLike with Before "org.apache.spark.sql.hive.orc.DefaultSource", Map( "path" -> base.getCanonicalPath, - OrcRelation.DEFAULT_PARTITION_NAME -> defaultPartitionName)) + ConfVars.DEFAULTPARTITIONNAME.varname -> defaultPartitionName)) orcRelation.registerTempTable("t") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala index 3c596d0654324..475af3d4c94e4 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala @@ -153,13 +153,40 @@ class OrcQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterAll { data.toDF().saveAsOrcFile(tempDir) val f = TestHive.orcFile(tempDir) f.registerTempTable("tmp") + + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // expr = leaf-0 var rdd = sql("SELECT name FROM tmp where age <= 5") assert(rdd.count() == 5) + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // expr = (not leaf-0) rdd = sql("SELECT name, contacts FROM tmp where age > 5") assert(rdd.count() == 5) - val contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + var contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) assert(contacts.count() == 10) + + // ppd: + // leaf-0 = (LESS_THAN_EQUALS age 5) + // leaf-1 = (LESS_THAN age 8) + // expr = (and (not leaf-0) leaf-1) + rdd = sql("SELECT name, contacts FROM tmp where age > 5 and age < 8") + assert(rdd.count() == 2) + contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 4) + + // ppd: + // leaf-0 = (LESS_THAN age 2) + // leaf-1 = (LESS_THAN_EQUALS age 8) + // expr = (or leaf-0 (not leaf-1)) + rdd = sql("SELECT name, contacts FROM tmp where age < 2 or age > 8") + assert(rdd.count() == 3) + contacts = rdd.flatMap(t=>t(1).asInstanceOf[Seq[_]]) + assert(contacts.count() == 6) + + Utils.deleteRecursively(new File(tempDir)) } From 4dbea6ee3feafd549938aebf7bba4181b5a097ae Mon Sep 17 00:00:00 2001 From: Zhan Zhang Date: Thu, 14 May 2015 13:22:07 -0700 Subject: [PATCH 7/7] resolve review comments --- .../main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index c68a58647cad7..44ac728b09aa3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -35,6 +35,8 @@ import org.apache.spark.sql.hive.HiveMetastoreTypes import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.sources._ + +/* Implicit conversions */ import scala.collection.JavaConversions._