From 84207c7aa08ae2e6d7d97a22b64497a3e1604ac8 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 04:20:34 +0000 Subject: [PATCH 01/10] Skip automatically broadcast a plan when it contains ObjectProducer. --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index a4b0fa59dbb24..8d624a7e02840 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan, ObjectProducer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} @@ -82,7 +82,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object CanBroadcast { def unapply(plan: LogicalPlan): Option[LogicalPlan] = { - if (plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) { + // We can't estimate the size of ObjectType + if (plan.find(_.isInstanceOf[ObjectProducer]).isDefined) { + None + } else if (plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) { Some(plan) } else { None From 8e0541c35d71608db1d2de36d8bd117079d4cd2c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 05:03:33 +0000 Subject: [PATCH 02/10] Add test. --- .../org/apache/spark/sql/DatasetSuite.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 2a1867f67c178..63daee5bf2e2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -630,6 +630,29 @@ class DatasetSuite extends QueryTest with SharedSQLContext { // Make sure the generated code for this plan can compile and execute. checkDataset(wideDF.map(_.getLong(0)), 0L until 10 : _*) } + + test("Estimate size on ObjectProducer will cause failure") { + val dataset = Seq( + (0, 3, 54f), + (0, 4, 44f), + (0, 5, 42f), + (1, 3, 39f), + (1, 5, 33f), + (1, 4, 26f), + (2, 3, 51f), + (2, 5, 45f), + (2, 4, 30f) + ).toDF("user", "item", "rating") + + val actual = dataset + .select("user", "item") + .as[(Int, Int)] + .groupByKey(_._1) + .mapGroups { case (src, ids) => (src, ids.map(_._2).toArray) } + .toDF("id", "actual") + + dataset.join(actual, dataset("user") === actual("id")).collect() + } } case class OtherTuple(_1: String, _2: Int) From bb9dd6f8512565e4f29c99ae7c9870d6653006b0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 06:36:46 +0000 Subject: [PATCH 03/10] Different approach that implements statistices in SerializeFromObject. --- .../spark/sql/catalyst/plans/logical/object.scala | 12 +++++++++++- .../apache/spark/sql/execution/SparkStrategies.scala | 5 +---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 4a1bdb0b8ac2e..2a6dadff847c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{DataType, ObjectType, StructType} object CatalystSerde { def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = { @@ -83,6 +83,16 @@ case class SerializeFromObject( child: LogicalPlan) extends UnaryNode with ObjectConsumer { override def output: Seq[Attribute] = serializer.map(_.toAttribute) + + // We can't estimate the size of ObjectType. We implement statistics here to avoid + // directly estimate any child plan which produces domain objects as output. + override def statistics: Statistics = { + if (child.output.find(_.dataType.isInstanceOf[ObjectType]).isDefined) { + Statistics(sizeInBytes = Long.MaxValue) + } else { + super.statistics + } + } } object MapPartitions { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8d624a7e02840..123689ccb674c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -82,10 +82,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { */ object CanBroadcast { def unapply(plan: LogicalPlan): Option[LogicalPlan] = { - // We can't estimate the size of ObjectType - if (plan.find(_.isInstanceOf[ObjectProducer]).isDefined) { - None - } else if (plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) { + if (plan.statistics.sizeInBytes <= conf.autoBroadcastJoinThreshold) { Some(plan) } else { None From 9d2033ffbdf37ba1e1140d88415977372a91cba7 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 06:38:23 +0000 Subject: [PATCH 04/10] Revert unused import. --- .../scala/org/apache/spark/sql/execution/SparkStrategies.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 123689ccb674c..a4b0fa59dbb24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ -import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan, ObjectProducer} +import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution import org.apache.spark.sql.execution.columnar.{InMemoryColumnarTableScan, InMemoryRelation} From 4b5f66df9b763cef45550e3c2a17793ead205bfd Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 06:59:10 +0000 Subject: [PATCH 05/10] Simplify condition check. --- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 2a6dadff847c0..3e548404b53d6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -87,7 +87,7 @@ case class SerializeFromObject( // We can't estimate the size of ObjectType. We implement statistics here to avoid // directly estimate any child plan which produces domain objects as output. override def statistics: Statistics = { - if (child.output.find(_.dataType.isInstanceOf[ObjectType]).isDefined) { + if (child.output.head.dataType.isInstanceOf[ObjectType]) { Statistics(sizeInBytes = Long.MaxValue) } else { super.statistics From dcd6056d124463cb4546df2db1a2d903c3d77060 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 09:15:13 +0000 Subject: [PATCH 06/10] Instead Long.MaxValue, trying to find an underlying logical plan that can give statistics. --- .../spark/sql/catalyst/plans/logical/object.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 3e548404b53d6..8897115dadc52 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -88,7 +88,20 @@ case class SerializeFromObject( // directly estimate any child plan which produces domain objects as output. override def statistics: Statistics = { if (child.output.head.dataType.isInstanceOf[ObjectType]) { - Statistics(sizeInBytes = Long.MaxValue) + // Try to find the first underlying child of it which doesn't output object + val underlyingPlan = child.find { p => + // This means the child plan that produces domain object but its children do not produce + // domain object + p.isInstanceOf[ObjectProducer] && !p.isInstanceOf[ObjectConsumer] + } + if (underlyingPlan.isDefined) { + underlyingPlan.get match { + case u: UnaryNode => u.child.statistics + case o => Statistics(sizeInBytes = o.children.map(_.statistics.sizeInBytes).product) + } + } else { + Statistics(sizeInBytes = Long.MaxValue) + } } else { super.statistics } From 3ff11a12b6a148b08f6dfc7677be37d4de4a703b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 22 Apr 2016 09:40:52 +0000 Subject: [PATCH 07/10] Remove wrong comment. --- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index 8897115dadc52..cc8995c1f9ade 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -88,7 +88,6 @@ case class SerializeFromObject( // directly estimate any child plan which produces domain objects as output. override def statistics: Statistics = { if (child.output.head.dataType.isInstanceOf[ObjectType]) { - // Try to find the first underlying child of it which doesn't output object val underlyingPlan = child.find { p => // This means the child plan that produces domain object but its children do not produce // domain object From 6b6c12d9f1287ec18df29629d528306c8c18d165 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 23 Apr 2016 23:48:45 +0000 Subject: [PATCH 08/10] Set default size for ObjectType. --- .../sql/catalyst/plans/logical/object.scala | 22 ------------------- .../apache/spark/sql/types/ObjectType.scala | 3 +-- 2 files changed, 1 insertion(+), 24 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index cc8995c1f9ade..a0985ff4f681c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -83,28 +83,6 @@ case class SerializeFromObject( child: LogicalPlan) extends UnaryNode with ObjectConsumer { override def output: Seq[Attribute] = serializer.map(_.toAttribute) - - // We can't estimate the size of ObjectType. We implement statistics here to avoid - // directly estimate any child plan which produces domain objects as output. - override def statistics: Statistics = { - if (child.output.head.dataType.isInstanceOf[ObjectType]) { - val underlyingPlan = child.find { p => - // This means the child plan that produces domain object but its children do not produce - // domain object - p.isInstanceOf[ObjectProducer] && !p.isInstanceOf[ObjectConsumer] - } - if (underlyingPlan.isDefined) { - underlyingPlan.get match { - case u: UnaryNode => u.child.statistics - case o => Statistics(sizeInBytes = o.children.map(_.statistics.sizeInBytes).product) - } - } else { - Statistics(sizeInBytes = Long.MaxValue) - } - } else { - super.statistics - } - } } object MapPartitions { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala index b7b1acc58242e..c741a2dd3ea30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ObjectType.scala @@ -37,8 +37,7 @@ private[sql] object ObjectType extends AbstractDataType { * outside of the execution engine. */ private[sql] case class ObjectType(cls: Class[_]) extends DataType { - override def defaultSize: Int = - throw new UnsupportedOperationException("No size estimation available for objects.") + override def defaultSize: Int = 4096 def asNullable: DataType = this From c26c3bd05e1ceb5459ae8e9c6ac4a4ae8c36f2fb Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 24 Apr 2016 02:38:41 +0000 Subject: [PATCH 09/10] Correct test name. --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 63daee5bf2e2f..810a3fcb80472 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -631,7 +631,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext { checkDataset(wideDF.map(_.getLong(0)), 0L until 10 : _*) } - test("Estimate size on ObjectProducer will cause failure") { + test("SPARK-14838: estimating sizeInBytes in operators with ObjectProducer shouldn't fail") { val dataset = Seq( (0, 3, 54f), (0, 4, 44f), From 471d9ab43f6171c3844149ecf768b792c0482b0e Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sun, 24 Apr 2016 04:14:15 +0000 Subject: [PATCH 10/10] Remove unnecessary import. --- .../org/apache/spark/sql/catalyst/plans/logical/object.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala index a0985ff4f681c..4a1bdb0b8ac2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.Encoder import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.{DataType, ObjectType, StructType} +import org.apache.spark.sql.types.{DataType, StructType} object CatalystSerde { def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {