From bbb49f2639fd3067fbe63b20cbb991d6039a8f79 Mon Sep 17 00:00:00 2001 From: David Li Date: Fri, 29 Apr 2022 16:27:47 -0400 Subject: [PATCH 1/3] ARROW-16419: [Python] Properly wait for ExecPlan to finish --- python/pyarrow/_exec_plan.pyx | 7 +++++-- python/pyarrow/includes/libarrow.pxd | 7 +++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 763329583a3..909d12ed2c2 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table) +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) from pyarrow.lib import tobytes from pyarrow._compute cimport Expression, _true from pyarrow._dataset cimport Dataset @@ -73,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CRecordBatchReader] c_recordbatchreader vector[CDeclaration].iterator plan_iter vector[CDeclaration.Input] no_c_inputs + CStatus c_plan_status if use_threads: c_executor = GetCpuThreadPool() @@ -157,7 +158,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads else: raise TypeError("Unsupported output type") - deref(c_exec_plan).StopProducing() + with nogil: + c_plan_status = deref(c_exec_plan).finished().status() + check_status(c_plan_status) return output diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4485f744cd7..e34bc7a28fe 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -81,6 +81,11 @@ cdef extern from "arrow/config.h" namespace "arrow" nogil: CRuntimeInfo GetRuntimeInfo() +cdef extern from "arrow/util/future.h" namespace "arrow" nogil: + cdef cppclass CFuture_Void" arrow::Future<>": + CStatus status() + + cdef extern from "arrow/api.h" namespace "arrow" nogil: cdef enum Type" arrow::Type::type": _Type_NA" arrow::Type::NA" @@ -2497,6 +2502,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog CStatus Validate() CStatus StopProducing() + CFuture_Void finished() + vector[CExecNode*] sinks() const vector[CExecNode*] sources() const From 48f7ee03e180c55afd2bfe30117a3052ae7c3ca7 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 Apr 2022 15:07:14 -1000 Subject: [PATCH 2/3] Switch from non-owning SourceNode to non-owning TableSourceNode. Passing in NULL for executor if use_threads is false instead of short lived thread pool. Forwading use_threads from _perform_join to execplan. Fix bug in hash_join_node.cc that could allow bits of the plan to remain running after marking the plan finished. --- cpp/src/arrow/compute/exec/hash_join_node.cc | 21 ++++++++++---------- python/pyarrow/_exec_plan.pyx | 21 ++++++++++---------- python/pyarrow/includes/libarrow.pxd | 5 ++--- 3 files changed, 23 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index d28e3aeda49..0282e387c42 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -634,16 +634,17 @@ class HashJoinNode : public ExecNode { Status ScheduleTaskCallback(std::function func) { auto executor = plan_->exec_context()->executor(); if (executor) { - ARROW_ASSIGN_OR_RAISE(auto task_fut, executor->Submit([this, func] { - size_t thread_index = thread_indexer_(); - Status status = func(thread_index); - if (!status.ok()) { - StopProducing(); - ErrorIfNotOk(status); - return; - } - })); - return task_group_.AddTask(task_fut); + return task_group_.AddTask([this, executor, func] { + return DeferNotOk(executor->Submit([this, func] { + size_t thread_index = thread_indexer_(); + Status status = func(thread_index); + if (!status.ok()) { + StopProducing(); + ErrorIfNotOk(status); + return; + } + })); + }); } else { // We should not get here in serial execution mode ARROW_DCHECK(false); diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 909d12ed2c2..d8a6b9a0297 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -56,7 +56,6 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads """ cdef: CExecutor *c_executor - shared_ptr[CThreadPool] c_executor_sptr shared_ptr[CExecContext] c_exec_context shared_ptr[CExecPlan] c_exec_plan vector[CDeclaration] c_decls @@ -64,8 +63,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads vector[CExecNode*] c_final_node_vec CExecNode *c_node CTable* c_table + shared_ptr[CTable] c_in_table shared_ptr[CTable] c_out_table - shared_ptr[CSourceNodeOptions] c_sourceopts + shared_ptr[CTableSourceNodeOptions] c_tablesourceopts shared_ptr[CScanNodeOptions] c_scanopts shared_ptr[CExecNodeOptions] c_input_node_opts shared_ptr[CSinkNodeOptions] c_sinkopts @@ -78,8 +78,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads if use_threads: c_executor = GetCpuThreadPool() else: - c_executor_sptr = GetResultValue(CThreadPool.Make(1)) - c_executor = c_executor_sptr.get() + c_executor = NULL c_exec_context = make_shared[CExecContext]( c_default_memory_pool(), c_executor) @@ -90,12 +89,11 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads # Create source nodes for each input for ipt in inputs: if isinstance(ipt, Table): - node_factory = "source" - c_in_table = pyarrow_unwrap_table(ipt).get() - c_sourceopts = GetResultValue( - CSourceNodeOptions.FromTable(deref(c_in_table), deref(c_exec_context).executor())) - c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions]( - c_sourceopts) + node_factory = "table_source" + c_in_table = pyarrow_unwrap_table(ipt) + c_tablesourceopts = make_shared[CTableSourceNodeOptions](c_in_table, 1 << 20) + c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( + c_tablesourceopts) elif isinstance(ipt, Dataset): node_factory = "scan" c_in_dataset = (ipt).unwrap() @@ -348,6 +346,7 @@ def _perform_join(join_type, left_operand not None, left_keys, result_table = execplan([left_operand, right_operand], plan=c_decl_plan, - output_type=output_type) + output_type=output_type, + use_threads=use_threads) return result_table diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index e34bc7a28fe..cc52102ef8d 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2447,9 +2447,8 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions": pass - cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions): - @staticmethod - CResult[shared_ptr[CSourceNodeOptions]] FromTable(const CTable& table, CExecutor*) + cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions): + CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size) cdef cppclass CSinkNodeOptions "arrow::compute::SinkNodeOptions"(CExecNodeOptions): pass From 85a736b473d0a281e8007210eb11f8f808e45a2a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 29 Apr 2022 15:21:10 -1000 Subject: [PATCH 3/3] ARROW-16417: python lint --- python/pyarrow/_exec_plan.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index d8a6b9a0297..7cbce9baa6f 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -91,7 +91,8 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads if isinstance(ipt, Table): node_factory = "table_source" c_in_table = pyarrow_unwrap_table(ipt) - c_tablesourceopts = make_shared[CTableSourceNodeOptions](c_in_table, 1 << 20) + c_tablesourceopts = make_shared[CTableSourceNodeOptions]( + c_in_table, 1 << 20) c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions]( c_tablesourceopts) elif isinstance(ipt, Dataset):