Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions cpp/src/arrow/compute/exec/hash_join_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -634,16 +634,17 @@ class HashJoinNode : public ExecNode {
Status ScheduleTaskCallback(std::function<Status(size_t)> func) {
auto executor = plan_->exec_context()->executor();
if (executor) {
ARROW_ASSIGN_OR_RAISE(auto task_fut, executor->Submit([this, func] {
size_t thread_index = thread_indexer_();
Status status = func(thread_index);
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
return;
}
}));
return task_group_.AddTask(task_fut);
return task_group_.AddTask([this, executor, func] {
return DeferNotOk(executor->Submit([this, func] {
size_t thread_index = thread_indexer_();
Status status = func(thread_index);
if (!status.ok()) {
StopProducing();
ErrorIfNotOk(status);
return;
}
}));
});
} else {
// We should not get here in serial execution mode
ARROW_DCHECK(false);
Expand Down
29 changes: 16 additions & 13 deletions python/pyarrow/_exec_plan.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ from cython.operator cimport dereference as deref, preincrement as inc
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport (Table, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib import tobytes
from pyarrow._compute cimport Expression, _true
from pyarrow._dataset cimport Dataset
Expand Down Expand Up @@ -56,29 +56,29 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
"""
cdef:
CExecutor *c_executor
shared_ptr[CThreadPool] c_executor_sptr
shared_ptr[CExecContext] c_exec_context
shared_ptr[CExecPlan] c_exec_plan
vector[CDeclaration] c_decls
vector[CExecNode*] _empty
vector[CExecNode*] c_final_node_vec
CExecNode *c_node
CTable* c_table
shared_ptr[CTable] c_in_table
shared_ptr[CTable] c_out_table
shared_ptr[CSourceNodeOptions] c_sourceopts
shared_ptr[CTableSourceNodeOptions] c_tablesourceopts
shared_ptr[CScanNodeOptions] c_scanopts
shared_ptr[CExecNodeOptions] c_input_node_opts
shared_ptr[CSinkNodeOptions] c_sinkopts
shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen
shared_ptr[CRecordBatchReader] c_recordbatchreader
vector[CDeclaration].iterator plan_iter
vector[CDeclaration.Input] no_c_inputs
CStatus c_plan_status

if use_threads:
c_executor = GetCpuThreadPool()
else:
c_executor_sptr = GetResultValue(CThreadPool.Make(1))
c_executor = c_executor_sptr.get()
c_executor = NULL

c_exec_context = make_shared[CExecContext](
c_default_memory_pool(), c_executor)
Expand All @@ -89,12 +89,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
# Create source nodes for each input
for ipt in inputs:
if isinstance(ipt, Table):
node_factory = "source"
c_in_table = pyarrow_unwrap_table(ipt).get()
c_sourceopts = GetResultValue(
CSourceNodeOptions.FromTable(deref(c_in_table), deref(c_exec_context).executor()))
c_input_node_opts = static_pointer_cast[CExecNodeOptions, CSourceNodeOptions](
c_sourceopts)
node_factory = "table_source"
c_in_table = pyarrow_unwrap_table(ipt)
c_tablesourceopts = make_shared[CTableSourceNodeOptions](
c_in_table, 1 << 20)
c_input_node_opts = static_pointer_cast[CExecNodeOptions, CTableSourceNodeOptions](
c_tablesourceopts)
elif isinstance(ipt, Dataset):
node_factory = "scan"
c_in_dataset = (<Dataset>ipt).unwrap()
Expand Down Expand Up @@ -157,7 +157,9 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
else:
raise TypeError("Unsupported output type")

deref(c_exec_plan).StopProducing()
with nogil:
c_plan_status = deref(c_exec_plan).finished().status()
check_status(c_plan_status)

return output

Expand Down Expand Up @@ -345,6 +347,7 @@ def _perform_join(join_type, left_operand not None, left_keys,

result_table = execplan([left_operand, right_operand],
plan=c_decl_plan,
output_type=output_type)
output_type=output_type,
use_threads=use_threads)

return result_table
12 changes: 9 additions & 3 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ cdef extern from "arrow/config.h" namespace "arrow" nogil:
CRuntimeInfo GetRuntimeInfo()


cdef extern from "arrow/util/future.h" namespace "arrow" nogil:
cdef cppclass CFuture_Void" arrow::Future<>":
CStatus status()


cdef extern from "arrow/api.h" namespace "arrow" nogil:
cdef enum Type" arrow::Type::type":
_Type_NA" arrow::Type::NA"
Expand Down Expand Up @@ -2442,9 +2447,8 @@ cdef extern from "arrow/compute/exec/options.h" namespace "arrow::compute" nogil
cdef cppclass CExecNodeOptions "arrow::compute::ExecNodeOptions":
pass

cdef cppclass CSourceNodeOptions "arrow::compute::SourceNodeOptions"(CExecNodeOptions):
@staticmethod
CResult[shared_ptr[CSourceNodeOptions]] FromTable(const CTable& table, CExecutor*)
cdef cppclass CTableSourceNodeOptions "arrow::compute::TableSourceNodeOptions"(CExecNodeOptions):
CTableSourceNodeOptions(shared_ptr[CTable] table, int64_t max_batch_size)

cdef cppclass CSinkNodeOptions "arrow::compute::SinkNodeOptions"(CExecNodeOptions):
pass
Expand Down Expand Up @@ -2497,6 +2501,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog
CStatus Validate()
CStatus StopProducing()

CFuture_Void finished()

vector[CExecNode*] sinks() const
vector[CExecNode*] sources() const

Expand Down