Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,7 @@ if(ARROW_COMPUTE)
compute/function_internal.cc
compute/kernel.cc
compute/registry.cc
compute/memory_resources.cc
compute/kernels/aggregate_basic.cc
compute/kernels/aggregate_mode.cc
compute/kernels/aggregate_quantile.cc
Expand Down Expand Up @@ -424,7 +425,9 @@ if(ARROW_COMPUTE)
compute/exec/util.cc
compute/exec/hash_join.cc
compute/exec/hash_join_node.cc
compute/exec/task_util.cc)
compute/exec/task_util.cc
compute/exec/data_holder_node.cc
)

append_avx2_src(compute/kernels/aggregate_basic_avx2.cc)
append_avx512_src(compute/kernels/aggregate_basic_avx512.cc)
Expand Down
7 changes: 6 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "arrow/compute/exec_internal.h"
#include "arrow/compute/function.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/memory_resources.h"
#include "arrow/compute/registry.h"
#include "arrow/datum.h"
#include "arrow/pretty_print.h"
Expand Down Expand Up @@ -1015,9 +1016,13 @@ std::unique_ptr<KernelExecutor> KernelExecutor::MakeScalarAggregate() {
} // namespace detail

ExecContext::ExecContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
FunctionRegistry* func_registry)
FunctionRegistry* func_registry,
MemoryResources* memory_resources)
: pool_(pool), executor_(executor) {
this->func_registry_ = func_registry == nullptr ? GetFunctionRegistry() : func_registry;

this->memory_resources_ =
memory_resources == nullptr ? GetMemoryResources(pool) : memory_resources;
}

CpuInfo* ExecContext::cpu_info() const { return CpuInfo::GetInstance(); }
Expand Down
10 changes: 9 additions & 1 deletion cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include "arrow/array/data.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/compute/memory_resources.h"
#include "arrow/datum.h"
#include "arrow/memory_pool.h"
#include "arrow/result.h"
Expand Down Expand Up @@ -62,7 +63,8 @@ class ARROW_EXPORT ExecContext {
// If no function registry passed, the default is used.
explicit ExecContext(MemoryPool* pool = default_memory_pool(),
::arrow::internal::Executor* executor = NULLPTR,
FunctionRegistry* func_registry = NULLPTR);
FunctionRegistry* func_registry = NULLPTR,
MemoryResources* memory_resources = NULLPTR);

/// \brief The MemoryPool used for allocations, default is
/// default_memory_pool().
Expand All @@ -78,6 +80,11 @@ class ARROW_EXPORT ExecContext {
/// registry provided by GetFunctionRegistry.
FunctionRegistry* func_registry() const { return func_registry_; }

/// \brief The MemoryResources for looking up memory resources by memory level
/// and getting data holders to enable out of core processing. Defaults to the
/// instance provided by GetMemoryResources.
MemoryResources* memory_resources() const { return memory_resources_; }

// \brief Set maximum length unit of work for kernel execution. Larger
// contiguous array inputs will be split into smaller chunks, and, if
// possible and enabled, processed in parallel. The default chunksize is
Expand Down Expand Up @@ -124,6 +131,7 @@ class ARROW_EXPORT ExecContext {
int64_t exec_chunksize_ = std::numeric_limits<int64_t>::max();
bool preallocate_contiguous_ = true;
bool use_threads_ = true;
MemoryResources* memory_resources_;
};

ARROW_EXPORT ExecContext* default_exec_context();
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ add_arrow_compute_test(expression_test
add_arrow_compute_test(plan_test PREFIX "arrow-compute")
add_arrow_compute_test(hash_join_node_test PREFIX "arrow-compute")
add_arrow_compute_test(union_node_test PREFIX "arrow-compute")
add_arrow_compute_test(data_holder_node_test PREFIX "arrow-compute")

add_arrow_compute_test(util_test PREFIX "arrow-compute")

Expand Down
220 changes: 220 additions & 0 deletions cpp/src/arrow/compute/exec/data_holder_node.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <mutex>

#include "arrow/api.h"
#include "arrow/compute/api.h"

#include "arrow/compute/memory_resources.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/logging.h"

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/util.h"
#include "arrow/util/bitmap_ops.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/future.h"
#include "arrow/util/make_unique.h"
#include "arrow/util/thread_pool.h"

namespace arrow {

using internal::checked_cast;

namespace compute {

class DataHolderManager {
public:
explicit DataHolderManager(ExecContext* context)
: context_(context), gen_(), producer_(gen_.producer()) {}

Status Push(const std::shared_ptr<RecordBatch>& batch) {
bool pushed = false;
auto resources = context_->memory_resources();
for (auto memory_resource : resources->memory_resources()) {
auto memory_used = memory_resource->memory_used();
if (memory_used < memory_resource->memory_limit()) {
ARROW_ASSIGN_OR_RAISE(auto data_holder, memory_resource->GetDataHolder(batch));
this->producer_.Push(std::move(data_holder));
pushed = true;
break;
}
}
if (!pushed) {
return Status::Invalid("No memory resource registered at all in the exec_context");
}
return Status::OK();
}
AsyncGenerator<std::shared_ptr<DataHolder>> generator() { return gen_; }

public:
ExecContext* context_;
PushGenerator<std::shared_ptr<DataHolder>> gen_;
PushGenerator<std::shared_ptr<DataHolder>>::Producer producer_;
};

class DataHolderNode : public ExecNode {
public:
DataHolderNode(ExecPlan* plan, NodeVector inputs, std::vector<std::string> input_labels,
std::shared_ptr<Schema> output_schema, int num_outputs)
: ExecNode(plan, std::move(inputs), input_labels, std::move(output_schema),
/*num_outputs=*/num_outputs) {
executor_ = plan->exec_context()->executor();

data_holder_manager_ =
::arrow::internal::make_unique<DataHolderManager>(plan->exec_context());

auto status = task_group_.AddTask([this]() -> Result<Future<>> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is how we should push into the next node. We need for a way for a node to be able to request this data from the DataHolder. Setting up a task that basically loops through inputs to push them forward doesn't allow for intelligent scheduling of these things in the future.

Two options I see are either

  1. Have the nodes that follow DataHolderManager pull from the DataHolderManager. This is relatively straight forward but lacks control.
  2. Have some kind of Simple Scheduler which pulls from the DataHolderManager that have accumulated the most HeldData and pushes those tasks forward. This is a bit more work but provides more value in my opinion.

ARROW_DCHECK(executor_ != nullptr);
return executor_->Submit(this->stop_source_.token(), [this] {
auto generator = this->data_holder_manager_->generator();
auto iterator = MakeGeneratorIterator(std::move(generator));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work. You need to avoid blocking on the CPU thread pool. This basically takes one of the CPU pool's threads and dedicates it to pulling from the data holder output. If the data holder fills up then this thread will be a useless thread and since our thread pool is a fixed thread pool this is a bad thing. Even if the data holder hasn't filled up this thread will spend a lot of time idle if the scanner is slow.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can use a condition variable that waits until there are things to iterate on. When a generator has something to push it can notify that cv.

while (true) {
ARROW_ASSIGN_OR_RAISE(auto result, iterator.Next());
if (IsIterationEnd(result)) {
break;
}
ARROW_ASSIGN_OR_RAISE(ExecBatch batch, result->Get());
this->outputs_[0]->InputReceived(this, batch);
}
return Status::OK();
});
});
if (!status.ok()) {
if (input_counter_.Cancel()) {
this->Finish(status);
}
inputs_[0]->StopProducing(this);
}
Comment on lines +100 to +105
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit odd to be calling these methods in the constructor. Admittedly it's a rather odd error case (AddTask probably shouldn't fail unless maybe the underlying thread pool has been shutdown). I'm not sure if this will work or if the next call to StopProducing will clear things out.

}

void ErrorReceived(ExecNode* input, Status error) override {
DCHECK_EQ(input, inputs_[0]);
outputs_[0]->ErrorReceived(this, std::move(error));
}

void InputFinished(ExecNode* input, int total_batches) override {
DCHECK_EQ(input, inputs_[0]);
outputs_[0]->InputFinished(this, total_batches);
if (input_counter_.SetTotal(total_batches)) {
this->Finish();
}
}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
auto schema = inputs[0]->output_schema();
return plan->EmplaceNode<DataHolderNode>(plan, std::move(inputs),
std::vector<std::string>{"target"},
std::move(schema), /*num_outputs=*/1);
}

const char* kind_name() const override { return "DataHolderNode"; }

void InputReceived(ExecNode* input, ExecBatch batch) override {
if (finished_.is_finished()) {
return;
}
auto status = task_group_.AddTask([this, batch]() -> Result<Future<>> {
return this->executor_->Submit(this->stop_source_.token(), [this, batch]() {
auto pool = this->plan()->exec_context()->memory_pool();
ARROW_ASSIGN_OR_RAISE(auto record_batch,
batch.ToRecordBatch(this->output_schema(), pool));
Comment on lines +137 to +139
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Converting every ExecBatch to RecordBatch sort of defeats the purpose of having ExecBatch in the first place. Ideally this would only have to happen if we needed to store to disk.

Status status = data_holder_manager_->Push(record_batch);
if (ErrorIfNotOk(status)) {
return status;
}
if (this->input_counter_.Increment()) {
this->Finish(status);
}
return Status::OK();
});
});
if (!status.ok()) {
if (input_counter_.Cancel()) {
this->Finish(status);
}
inputs_[0]->StopProducing(this);
return;
}
}

Status StartProducing() override { return Status::OK(); }

void PauseProducing(ExecNode* output) override {}

void ResumeProducing(ExecNode* output) override {}

void StopProducing(ExecNode* output) override {
DCHECK_EQ(output, outputs_[0]);
StopProducing();
}

void StopProducing() override {
if (executor_) {
this->stop_source_.RequestStop();
}
if (input_counter_.Cancel()) {
this->Finish();
}
inputs_[0]->StopProducing(this);
}

Future<> finished() override { return finished_; }

std::string ToStringExtra() const override { return ""; }

protected:
void Finish(Status finish_st = Status::OK()) {
this->data_holder_manager_->producer_.Close();

task_group_.End().AddCallback([this, finish_st](const Status& st) {
Status final_status = finish_st & st;
this->finished_.MarkFinished(final_status);
});
}

protected:
// Counter for the number of batches received
AtomicCounter input_counter_;

// Future to sync finished
Future<> finished_ = Future<>::Make();

// The task group for the corresponding batches
util::AsyncTaskGroup task_group_;

::arrow::internal::Executor* executor_;

// Variable used to cancel remaining tasks in the executor
StopSource stop_source_;

std::unique_ptr<DataHolderManager> data_holder_manager_;
};

namespace internal {

void RegisterDataHolderNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("data_holder", DataHolderNode::Make));
}

} // namespace internal
} // namespace compute
} // namespace arrow
Loading