Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions python/pyarrow/_exec_plan.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib import tobytes
from pyarrow._compute cimport Expression, _true
from pyarrow._dataset cimport Dataset
Expand Down Expand Up @@ -73,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
shared_ptr[CRecordBatchReader] c_recordbatchreader
vector[CDeclaration].iterator plan_iter
vector[CDeclaration.Input] no_c_inputs
CStatus c_plan_status

if use_threads:
c_executor = GetCpuThreadPool()
Expand Down Expand Up @@ -157,7 +158,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
else:
raise TypeError("Unsupported output type")

deref(c_exec_plan).StopProducing()
with nogil:
c_plan_status = deref(c_exec_plan).finished().status()
check_status(c_plan_status)

return output

Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ cdef extern from "arrow/config.h" namespace "arrow" nogil:
CRuntimeInfo GetRuntimeInfo()


cdef extern from "arrow/util/future.h" namespace "arrow" nogil:
cdef cppclass CFuture_Void" arrow::Future<>":
CStatus status()
Comment on lines +84 to +86
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprising we've made it this far without exposing Future. I wonder if we want to add a WaitForFinish method on ExecPlan instead of exposing Future (essentially a sync mirror-api) like we do in the Scanner.

I don't really have any problems with this approach though. Just making an observation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose at some point we'll want to expose async APIs in Python, so we're probably bound to interface with Future in Cython anyway.



cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef enum Type" arrow::Type::type":
_Type_NA" arrow::Type::NA"
Expand Down Expand Up @@ -2497,6 +2502,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog
CStatus Validate()
CStatus StopProducing()

CFuture_Void finished()

vector[CExecNode*] sinks() const
vector[CExecNode*] sources() const

Expand Down