Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,8 @@ if(ARROW_COMPUTE)
compute/exec/tpch_node.cc
compute/exec/union_node.cc
compute/exec/util.cc
compute/exec/window_functions/merge_tree.cc
compute/exec/window_functions/window_rank.cc
compute/function.cc
compute/function_internal.cc
compute/kernel.cc
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ add_arrow_compute_test(util_test
SOURCES
util_test.cc
task_util_test.cc)
add_arrow_compute_test(window_functions_test
PREFIX
"arrow-compute"
SOURCES
window_functions/window_test.cc)

add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")

Expand Down
60 changes: 54 additions & 6 deletions cpp/src/arrow/compute/exec/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,20 @@ class TempVectorHolder {
uint32_t num_elements_;
};

#define TEMP_VECTOR(type, name) \
auto name##_buf = arrow::util::TempVectorHolder<type>( \
temp_vector_stack, arrow::util::MiniBatch::kMiniBatchLength); \
auto name = name##_buf.mutable_data();

#define BEGIN_MINI_BATCH_FOR(batch_begin, batch_length, num_rows) \
for (int64_t batch_begin = 0; batch_begin < num_rows; \
batch_begin += arrow::util::MiniBatch::kMiniBatchLength) { \
int64_t batch_length = \
std::min(static_cast<int64_t>(num_rows) - batch_begin, \
static_cast<int64_t>(arrow::util::MiniBatch::kMiniBatchLength));

#define END_MINI_BATCH_FOR }

class bit_util {
public:
static void bits_to_indexes(int bit_to_search, int64_t hardware_flags,
Expand Down Expand Up @@ -365,13 +379,14 @@ struct ARROW_EXPORT TableSinkNodeConsumer : public SinkNodeConsumer {
/// Modify an Expression with pre-order and post-order visitation.
/// `pre` will be invoked on each Expression. `pre` will visit Calls before their
/// arguments, `post_call` will visit Calls (and no other Expressions) after their
/// arguments. Visitors should return the Identical expression to indicate no change; this
/// will prevent unnecessary construction in the common case where a modification is not
/// possible/necessary/...
/// arguments. Visitors should return the Identical expression to indicate no change;
/// this will prevent unnecessary construction in the common case where a modification
/// is not possible/necessary/...
///
/// If an argument was modified, `post_call` visits a reconstructed Call with the modified
/// arguments but also receives a pointer to the unmodified Expression as a second
/// argument. If no arguments were modified the unmodified Expression* will be nullptr.
/// If an argument was modified, `post_call` visits a reconstructed Call with the
/// modified arguments but also receives a pointer to the unmodified Expression as a
/// second argument. If no arguments were modified the unmodified Expression* will be
/// nullptr.
template <typename PreVisit, typename PostVisitCall>
Result<Expression> ModifyExpression(Expression expr, const PreVisit& pre,
const PostVisitCall& post_call) {
Expand Down Expand Up @@ -409,5 +424,38 @@ Result<Expression> ModifyExpression(Expression expr, const PreVisit& pre,
return post_call(std::move(expr), NULLPTR);
}

struct ThreadContext {
int64_t thread_index;
util::TempVectorStack* temp_vector_stack;
int64_t hardware_flags;
};

struct ParallelForStream {
using TaskCallback = std::function<Status(int64_t, ThreadContext&)>;

void InsertParallelFor(int64_t num_tasks, TaskCallback task_callback) {
parallel_fors_.push_back(std::make_pair(num_tasks, task_callback));
}

void InsertTaskSingle(TaskCallback task_callback) {
parallel_fors_.push_back(std::make_pair(static_cast<int64_t>(1), task_callback));
}

// If any of the tasks returns an error status then all the remaining parallel
// fors in the stream will not be executed and the first error status within
// the failing parallel for loop step will be returned.
//
Status RunOnSingleThread(ThreadContext& thread_context) {
for (size_t i = 0; i < parallel_fors_.size(); ++i) {
for (int64_t j = 0; j < parallel_fors_[i].first; ++j) {
ARROW_RETURN_NOT_OK(parallel_fors_[i].second(j, thread_context));
}
}
return Status::OK();
}

std::vector<std::pair<int64_t, TaskCallback>> parallel_fors_;
};

} // namespace compute
} // namespace arrow
Loading