From bbb49f2639fd3067fbe63b20cbb991d6039a8f79 Mon Sep 17 00:00:00 2001
From: David Li
Date: Fri, 29 Apr 2022 16:27:47 -0400
Subject: [PATCH] ARROW-16419: [Python] Properly wait for ExecPlan to finish
---
python/pyarrow/_exec_plan.pyx | 7 +++++--
python/pyarrow/includes/libarrow.pxd | 7 +++++++
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx
index 763329583a3..909d12ed2c2 100644
--- a/python/pyarrow/_exec_plan.pyx
+++ b/python/pyarrow/_exec_plan.pyx
@@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
-from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table)
+from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib import tobytes
from pyarrow._compute cimport Expression, _true
from pyarrow._dataset cimport Dataset
@@ -73,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
shared_ptr[CRecordBatchReader] c_recordbatchreader
vector[CDeclaration].iterator plan_iter
vector[CDeclaration.Input] no_c_inputs
+ CStatus c_plan_status
if use_threads:
c_executor = GetCpuThreadPool()
@@ -157,7 +158,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
else:
raise TypeError("Unsupported output type")
- deref(c_exec_plan).StopProducing()
+ with nogil:
+ c_plan_status = deref(c_exec_plan).finished().status()
+ check_status(c_plan_status)
return output
diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd
index 4485f744cd7..e34bc7a28fe 100644
--- a/python/pyarrow/includes/libarrow.pxd
+++ b/python/pyarrow/includes/libarrow.pxd
@@ -81,6 +81,11 @@ cdef extern from "arrow/config.h" namespace "arrow" nogil:
CRuntimeInfo GetRuntimeInfo()
+cdef extern from "arrow/util/future.h" namespace "arrow" nogil:
+ cdef cppclass CFuture_Void" arrow::Future<>":
+ CStatus status()
+
+
cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef enum Type" arrow::Type::type":
_Type_NA" arrow::Type::NA"
@@ -2497,6 +2502,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog
CStatus Validate()
CStatus StopProducing()
+ CFuture_Void finished()
+
vector[CExecNode*] sinks() const
vector[CExecNode*] sources() const