From 001a5866c62b9a42344561aee2d3d6f175e74dad Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 1 Nov 2020 20:28:12 +0900 Subject: [PATCH] [SPARK-33277][PYSPARK][SQL] Use ContextAwareIterator to stop consuming after the task ends As the Python evaluation consumes the parent iterator in a separate thread, it could consume more data from the parent even after the task ends and the parent is closed. Thus, we should use `ContextAwareIterator` to stop consuming after the task ends. Python/Pandas UDF right after off-heap vectorized reader could cause executor crash. E.g.,: ```py spark.range(0, 100000, 1, 1).write.parquet(path) spark.conf.set("spark.sql.columnVector.offheap.enabled", True) def f(x): return 0 fUdf = udf(f, LongType()) spark.read.parquet(path).select(fUdf('id')).head() ``` This is because, the Python evaluation consumes the parent iterator in a separate thread and it consumes more data from the parent even after the task ends and the parent is closed. If an off-heap column vector exists in the parent iterator, it could cause segmentation fault which crashes the executor. No. Added tests, and manually. Closes #30177 from ueshin/issues/SPARK-33277/python_pandas_udf. Authored-by: Takuya UESHIN Signed-off-by: HyukjinKwon --- python/pyspark/sql/tests.py | 42 +++++++++++++++++++ .../sql/execution/python/EvalPythonExec.scala | 18 +++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b9952270bfc35..8a25311afdc4a 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -3628,6 +3628,26 @@ def test_udf_in_subquery(self): finally: self.spark.catalog.dropTempView("v") + # SPARK-33277 + def test_udf_with_column_vector(self): + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + self.spark.range(0, 100000, 1, 1).write.parquet(path) + + def f(x): + return 0 + + fUdf = udf(f, LongType()) + + for offheap in ["true", "false"]: + with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): + self.assertEquals( + self.spark.read.parquet(path).select(fUdf('id')).head(), Row(0)) + finally: + shutil.rmtree(path) + class HiveSparkSubmitTests(SparkSubmitTests): @@ -5575,6 +5595,28 @@ def test_datasource_with_udf(self): finally: shutil.rmtree(path) + # SPARK-33277 + def test_pandas_udf_with_column_vector(self): + import pandas as pd + from pyspark.sql.functions import pandas_udf + + path = tempfile.mkdtemp() + shutil.rmtree(path) + + try: + self.spark.range(0, 200000, 1, 1).write.parquet(path) + + @pandas_udf(LongType()) + def udf(x): + return pd.Series([0] * len(x)) + + for offheap in ["true", "false"]: + with self.sql_conf({"spark.sql.columnVector.offheap.enabled": offheap}): + self.assertEquals( + self.spark.read.parquet(path).select(udf('id')).head(), Row(0)) + finally: + shutil.rmtree(path) + @unittest.skipIf( not _have_pandas or not _have_pyarrow, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala index 942a6db57416e..293a7c0241e97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvalPythonExec.scala @@ -88,6 +88,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil inputRDD.mapPartitions { iter => val context = TaskContext.get() + val contextAwareIterator = new ContextAwareIterator(iter, context) // The queue used to buffer input rows so we can drain it to // combine input with output from Python. @@ -119,7 +120,7 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil }) // Add rows to queue to join later with the result. - val projectedRowIter = iter.map { inputRow => + val projectedRowIter = contextAwareIterator.map { inputRow => queue.add(inputRow.asInstanceOf[UnsafeRow]) projection(inputRow) } @@ -136,3 +137,18 @@ abstract class EvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chil } } } + +/** + * A TaskContext aware iterator. + * + * As the Python evaluation consumes the parent iterator in a separate thread, + * it could consume more data from the parent even after the task ends and the parent is closed. + * Thus, we should use ContextAwareIterator to stop consuming after the task ends. + */ +class ContextAwareIterator[IN](iter: Iterator[IN], context: TaskContext) extends Iterator[IN] { + + override def hasNext: Boolean = + !context.isCompleted() && !context.isInterrupted() && iter.hasNext + + override def next(): IN = iter.next() +}