From c9c81825eb71c9942889f073a8bb11fc38f832bc Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Tue, 25 Jun 2019 12:03:08 +0900 Subject: [PATCH 1/3] Use InheritableThreadLocal for current epoch in EpochTracker (to support Python UDFs) --- .../continuous/ContinuousCoalesceRDD.scala | 5 +++-- .../streaming/continuous/EpochTracker.scala | 13 ++++++++++-- .../continuous/ContinuousSuite.scala | 20 +++++++++++++++++++ 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala index aec756c0eb2a4..4faec4c6fef3f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala @@ -110,8 +110,9 @@ class ContinuousCoalesceRDD( context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) while (!context.isInterrupted() && !context.isCompleted()) { writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) - // Note that current epoch is a non-inheritable thread local, so each writer thread - // can properly increment its own epoch without affecting the main task thread. + // Note that current epoch is a inheritable thread local but makes a clone, so + // each writer thread can properly increment its own epoch without affecting + // the main task thread. EpochTracker.incrementCurrentEpoch() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala index bc0ae428d4521..b07bcd54072c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.continuous import java.util.concurrent.atomic.AtomicLong +import org.apache.commons.lang3.SerializationUtils + /** * Tracks the current continuous processing epoch within a task. Call * EpochTracker.getCurrentEpoch to get the current epoch. @@ -26,8 +28,15 @@ import java.util.concurrent.atomic.AtomicLong object EpochTracker { // The current epoch. Note that this is a shared reference; ContinuousWriteRDD.compute() will // update the underlying AtomicLong as it finishes epochs. Other code should only read the value. - private val currentEpoch: ThreadLocal[AtomicLong] = new ThreadLocal[AtomicLong] { - override def initialValue() = new AtomicLong(-1) + private val currentEpoch: InheritableThreadLocal[AtomicLong] = { + new InheritableThreadLocal[AtomicLong] { + override protected def childValue(parent: AtomicLong): AtomicLong = { + // Note: make a clone such that changes in the parent epoch aren't reflected in + // those in the children threads. This is required at `ContinuousCoalesceRDD`. + SerializationUtils.clone(parent) + } + override def initialValue() = new AtomicLong(-1) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index c6921010a002f..522658ca8dc41 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -85,6 +85,7 @@ class ContinuousSuiteBase extends StreamTest { } class ContinuousSuite extends ContinuousSuiteBase { + import IntegratedUDFTestUtils._ import testImplicits._ test("basic") { @@ -252,6 +253,25 @@ class ContinuousSuite extends ContinuousSuiteBase { assert(expected.map(Row(_)).subsetOf(results.toSet), s"Result set ${results.toSet} are not a superset of $expected!") } + + Seq(TestScalaUDF("udf"), TestPythonUDF("udf"), TestScalarPandasUDF("udf")).foreach { udf => + test(s"continuous mode with various UDFs - ${udf.prettyName}") { + assume( + shouldTestScalarPandasUDFs && udf.isInstanceOf[TestScalarPandasUDF] || + shouldTestPythonUDFs && udf.isInstanceOf[TestPythonUDF] || + udf.isInstanceOf[TestScalaUDF]) + + val input = ContinuousMemoryStream[Int] + + testStream(input.toDF().select(udf($"value").cast("int")))( + AddData(input, 0, 1, 2), + CheckAnswer(0, 1, 2), + StopStream, + AddData(input, 3, 4, 5), + StartStream(), + CheckAnswer(0, 1, 2, 3, 4, 5)) + } + } } class ContinuousStressSuite extends ContinuousSuiteBase { From f4f2eff8bb3499213d2977e735feac6c44ab5399 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Wed, 26 Jun 2019 08:38:30 +0900 Subject: [PATCH 2/3] Address comments --- .../streaming/continuous/ContinuousCoalesceRDD.scala | 4 ++-- .../sql/execution/streaming/continuous/EpochTracker.scala | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala index 4faec4c6fef3f..14046f6a99c24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousCoalesceRDD.scala @@ -110,8 +110,8 @@ class ContinuousCoalesceRDD( context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong) while (!context.isInterrupted() && !context.isCompleted()) { writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]]) - // Note that current epoch is a inheritable thread local but makes a clone, so - // each writer thread can properly increment its own epoch without affecting + // Note that current epoch is a inheritable thread local but makes another instance, + // so each writer thread can properly increment its own epoch without affecting // the main task thread. EpochTracker.incrementCurrentEpoch() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala index b07bcd54072c8..631ae4806d2f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochTracker.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.streaming.continuous import java.util.concurrent.atomic.AtomicLong -import org.apache.commons.lang3.SerializationUtils - /** * Tracks the current continuous processing epoch within a task. Call * EpochTracker.getCurrentEpoch to get the current epoch. @@ -31,9 +29,9 @@ object EpochTracker { private val currentEpoch: InheritableThreadLocal[AtomicLong] = { new InheritableThreadLocal[AtomicLong] { override protected def childValue(parent: AtomicLong): AtomicLong = { - // Note: make a clone such that changes in the parent epoch aren't reflected in + // Note: make another instance so that changes in the parent epoch aren't reflected in // those in the children threads. This is required at `ContinuousCoalesceRDD`. - SerializationUtils.clone(parent) + new AtomicLong(parent.get) } override def initialValue() = new AtomicLong(-1) } From b103acc6631c8f5fe9292f34c53f53d2d8edc4ab Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Mon, 22 Jul 2019 13:49:09 +0900 Subject: [PATCH 3/3] Resolve conflicts --- .../spark/sql/streaming/continuous/ContinuousSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 522658ca8dc41..5bd75c850fe76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -262,8 +262,9 @@ class ContinuousSuite extends ContinuousSuiteBase { udf.isInstanceOf[TestScalaUDF]) val input = ContinuousMemoryStream[Int] + val df = input.toDF() - testStream(input.toDF().select(udf($"value").cast("int")))( + testStream(df.select(udf(df("value")).cast("int")))( AddData(input, 0, 1, 2), CheckAnswer(0, 1, 2), StopStream,