diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 763329583a3..909d12ed2c2 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table) +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true from pyarrow._dataset cimport Dataset @@ -73,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CRecordBatchReader] c_recordbatchreader vector[CDeclaration].iterator plan_iter vector[CDeclaration.Input] no_c_inputs + CStatus c_plan_status if use_threads: c_executor = GetCpuThreadPool() @@ -157,7 +158,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads else: raise TypeError("Unsupported output type") - deref(c_exec_plan).StopProducing() + with nogil: + c_plan_status = deref(c_exec_plan).finished().status() + check_status(c_plan_status) return output diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4485f744cd7..e34bc7a28fe 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -81,6 +81,11 @@ cdef extern from "arrow/config.h" namespace "arrow" nogil: CRuntimeInfo GetRuntimeInfo() +cdef extern from "arrow/util/future.h" namespace "arrow" nogil: + cdef cppclass CFuture_Void" arrow::Future<>": + CStatus status() + + cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef enum Type" arrow::Type::type": _Type_NA" arrow::Type::NA" @@ -2497,6 +2502,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog CStatus Validate() CStatus StopProducing() + CFuture_Void finished() + vector[CExecNode*] sinks() const vector[CExecNode*] sources() const