From a5522874e972244b8361fa3258a2a82423344e98 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Aug 2020 22:57:37 -0700 Subject: [PATCH 1/3] [SPARK-32524][SQL][TESTS] CachedBatchSerializerSuite should clean up InMemoryRelation.ser --- .../spark/sql/execution/columnar/InMemoryRelation.scala | 3 +++ .../sql/execution/columnar/CachedBatchSerializerSuite.scala | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index be3dc5934e84f..427c01e82fda8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -277,6 +277,9 @@ object InMemoryRelation { ser.get } + /* Visible for testing */ + private[spark] def clearSerializer(): Unit = synchronized { ser = None } + def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = plan match { case gen: WholeStageCodegenExec => gen.child match { case c2r: ColumnarToRowTransition => c2r.child match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala index 72eba7f6e6907..e2993014d0a71 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.{QueryTest, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.columnar.{CachedBatch, CachedBatchSerializer} +import org.apache.spark.sql.execution.columnar.InMemoryRelation.clearSerializer import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.sql.test.SharedSparkSession @@ -120,6 +121,11 @@ class CachedBatchSerializerSuite extends QueryTest with SharedSparkSession { classOf[TestSingleIntColumnarCachedBatchSerializer].getName) } + protected override def afterAll(): Unit = { + clearSerializer() + super.afterAll() + } + test("Columnar Cache Plugin") { withTempPath { workDir => val workDirPath = workDir.getAbsolutePath From 57c0797c496420402e148af32709aa77b7fdcc89 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Aug 2020 23:04:10 -0700 Subject: [PATCH 2/3] Reduce scope from spark to columnar --- .../apache/spark/sql/execution/columnar/InMemoryRelation.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 427c01e82fda8..07411c0d3803c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -278,7 +278,7 @@ object InMemoryRelation { } /* Visible for testing */ - private[spark] def clearSerializer(): Unit = synchronized { ser = None } + private[columnar] def clearSerializer(): Unit = synchronized { ser = None } def convertToColumnarIfPossible(plan: SparkPlan): SparkPlan = plan match { case gen: WholeStageCodegenExec => gen.child match { From 71a75a0d0b9c9addf4852ef7cb6e3e9f705652e9 Mon Sep 17 00:00:00 2001 From: Dongjoon Hyun Date: Mon, 3 Aug 2020 23:08:24 -0700 Subject: [PATCH 3/3] Add beforeAll --- .../sql/execution/columnar/CachedBatchSerializerSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala index e2993014d0a71..099a1aa996c11 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/CachedBatchSerializerSuite.scala @@ -121,6 +121,11 @@ class CachedBatchSerializerSuite extends QueryTest with SharedSparkSession { classOf[TestSingleIntColumnarCachedBatchSerializer].getName) } + protected override def beforeAll(): Unit = { + super.beforeAll() + clearSerializer() + } + protected override def afterAll(): Unit = { clearSerializer() super.afterAll()