From 0b2a7a203ad1ff5b32ca64fa860595a54a044acd Mon Sep 17 00:00:00 2001 From: chirag Date: Tue, 6 May 2014 16:20:29 +0530 Subject: [PATCH 1/4] unnecessary serialize/deserialize for cases where no conversion was performed --- .../scala/shark/execution/HadoopTableReader.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index d96596f5..0935f9f0 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorConverters} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} @@ -209,10 +210,14 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf convertedRow match { case _: LazyStruct => convertedRow case _: HiveColumnarStruct => convertedRow - case _ => tableSerDe.deserialize( - tableSerDe.asInstanceOf[Serializer].serialize( - convertedRow, tblConvertedOI)) - } + case _ => + if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { + convertedRow + } + else { + tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) + } + } } } rowWithPartArr.update(0, deserializedRow) From 7bfc9e69884ef74d2d5069e8c76ef98af2c01ae1 Mon Sep 17 00:00:00 2001 From: chiragaggarwal Date: Fri, 23 May 2014 13:58:30 +0530 Subject: [PATCH 2/4] unnecessary serialize/deserialize for cases where no conversion was performed Incorporated the review feedback. --- .../shark/execution/HadoopTableReader.scala | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index 0935f9f0..79b078b3 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -195,34 +195,36 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf partSerDe.getObjectInspector(), tblConvertedOI) val rowWithPartArr = new Array[Object](2) // Map each tuple to a row object - iter.map { value => - val deserializedRow = { - - // If partition schema does not match table schema, update the row to match - val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) - - // If conversion was performed, convertedRow will be a standard Object, but if - // conversion wasn't necessary, it will still be lazy. We can't have both across - // partitions, so we serialize and deserialize again to make it lazy. - if (tableSerDe.isInstanceOf[OrcSerde]) { - convertedRow - } else { + + if ((tableSerDe.isInstanceOf[OrcSerde]) || (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter])) { + iter.map { value => + val deserializedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) + rowWithPartArr.update(0, deserializedRow) + rowWithPartArr.update(1, partValues) + rowWithPartArr.asInstanceOf[Object] + } + } + else { + iter.map { value => + val deserializedRow = { + + // If partition schema does not match table schema, update the row to match + val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) + + // If conversion was performed, convertedRow will be a standard Object, but if + // conversion wasn't necessary, it will still be lazy. We can't have both across + // partitions, so we serialize and deserialize again to make it lazy. convertedRow match { case _: LazyStruct => convertedRow case _: HiveColumnarStruct => convertedRow - case _ => - if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { - convertedRow - } - else { - tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) - } - } + case _ => + tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) + } } + rowWithPartArr.update(0, deserializedRow) + rowWithPartArr.update(1, partValues) + rowWithPartArr.asInstanceOf[Object] } - rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) - rowWithPartArr.asInstanceOf[Object] } } }.toSeq From eea72cdab34492fa2ed0c435536948f39c94b6ed Mon Sep 17 00:00:00 2001 From: chiragaggarwal Date: Thu, 29 May 2014 10:38:06 +0530 Subject: [PATCH 3/4] Incorporated the feedback --- .../shark/execution/HadoopTableReader.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index 79b078b3..ca777b71 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -196,9 +196,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf val rowWithPartArr = new Array[Object](2) // Map each tuple to a row object - if ((tableSerDe.isInstanceOf[OrcSerde]) || (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter])) { + if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { iter.map { value => - val deserializedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) + val deserializedRow = partSerDe.deserialize(value) rowWithPartArr.update(0, deserializedRow) rowWithPartArr.update(1, partValues) rowWithPartArr.asInstanceOf[Object] @@ -214,11 +214,15 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf // If conversion was performed, convertedRow will be a standard Object, but if // conversion wasn't necessary, it will still be lazy. We can't have both across // partitions, so we serialize and deserialize again to make it lazy. - convertedRow match { - case _: LazyStruct => convertedRow - case _: HiveColumnarStruct => convertedRow - case _ => + if (tableSerDe.isInstanceOf[OrcSerde]) { + convertedRow + } else { + convertedRow match { + case _: LazyStruct => convertedRow + case _: HiveColumnarStruct => convertedRow + case _ => tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) + } } } rowWithPartArr.update(0, deserializedRow) From 6efb4d021a3a17ffeca3407ae5f1ce99a5c5f9b7 Mon Sep 17 00:00:00 2001 From: chiragaggarwal Date: Thu, 29 May 2014 11:11:08 +0530 Subject: [PATCH 4/4] Since partValues does not change, so moving it out of iter.map --- src/main/scala/shark/execution/HadoopTableReader.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/main/scala/shark/execution/HadoopTableReader.scala b/src/main/scala/shark/execution/HadoopTableReader.scala index ca777b71..3580cc1a 100644 --- a/src/main/scala/shark/execution/HadoopTableReader.scala +++ b/src/main/scala/shark/execution/HadoopTableReader.scala @@ -196,11 +196,11 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf val rowWithPartArr = new Array[Object](2) // Map each tuple to a row object + // this is done per partition, and no necessary put it in the iterations (in iter.map). + rowWithPartArr.update(1, partValues) if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { iter.map { value => - val deserializedRow = partSerDe.deserialize(value) - rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) + rowWithPartArr.update(0, partSerDe.deserialize(value)) rowWithPartArr.asInstanceOf[Object] } } @@ -226,7 +226,6 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf } } rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) rowWithPartArr.asInstanceOf[Object] } }