From d0e7ed60ff7beb28d5a56ad7813a0011c6915342 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 1 May 2022 02:28:50 -0400 Subject: [PATCH 1/7] ARROW-16426: [C++] Add TeeNode to execution engine --- cpp/src/arrow/dataset/file_base.cc | 168 ++++++++++++++++++++++++----- 1 file changed, 144 insertions(+), 24 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 57472b09c61..78dcfd11771 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -42,6 +42,7 @@ #include "arrow/util/map.h" #include "arrow/util/string.h" #include "arrow/util/task_group.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/variant.h" namespace arrow { @@ -269,6 +270,37 @@ Future<> FileWriter::Finish() { namespace { +Status WriteBatch( + std::shared_ptr batch, + compute::Expression guarantee, + FileSystemDatasetWriteOptions write_options, + std::function, + const Partitioning::PartitionPathFormat&)> write) { + ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); + batch.reset(); // drop to hopefully conserve memory + + if (write_options.max_partitions <= 0) { + return Status::Invalid("max_partitions must be positive (was ", + write_options.max_partitions, ")"); + } + + if (groups.batches.size() > static_cast(write_options.max_partitions)) { + return Status::Invalid("Fragment would be written into ", groups.batches.size(), + " partitions. This exceeds the maximum of ", + write_options.max_partitions); + } + + for (std::size_t index = 0; index < groups.batches.size(); index++) { + auto partition_expression = and_(groups.expressions[index], guarantee); + auto next_batch = groups.batches[index]; + Partitioning::PartitionPathFormat destination; + ARROW_ASSIGN_OR_RAISE(destination, + write_options.partitioning->Format(partition_expression)); + RETURN_NOT_OK(write(next_batch, destination)); + } + return Status::OK(); +} + class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { public: DatasetWritingSinkNodeConsumer(std::shared_ptr custom_metadata, @@ -303,27 +335,12 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { private: Status WriteNextBatch(std::shared_ptr batch, compute::Expression guarantee) { - ARROW_ASSIGN_OR_RAISE(auto groups, write_options_.partitioning->Partition(batch)); - batch.reset(); // drop to hopefully conserve memory - - if (write_options_.max_partitions <= 0) { - return Status::Invalid("max_partitions must be positive (was ", - write_options_.max_partitions, ")"); - } - - if (groups.batches.size() > static_cast(write_options_.max_partitions)) { - return Status::Invalid("Fragment would be written into ", groups.batches.size(), - " partitions. This exceeds the maximum of ", - write_options_.max_partitions); - } - - for (std::size_t index = 0; index < groups.batches.size(); index++) { - auto partition_expression = and_(groups.expressions[index], guarantee); - auto next_batch = groups.batches[index]; - Partitioning::PartitionPathFormat destination; - ARROW_ASSIGN_OR_RAISE(destination, - write_options_.partitioning->Format(partition_expression)); - RETURN_NOT_OK(task_group_.AddTask([this, next_batch, destination] { + return WriteBatch(batch, + guarantee, + write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.prefix); if (!has_room.is_finished()) { @@ -334,9 +351,8 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { return has_room.Then([this] { backpressure_control_->Resume(); }); } return has_room; - })); - } - return Status::OK(); + }); + }); } std::shared_ptr custom_metadata_; @@ -413,9 +429,113 @@ Result MakeWriteNode(compute::ExecPlan* plan, return node; } +namespace { + +class TeeNode : public compute::MapNode, public compute::BackpressureControl { + public: + TeeNode(compute::ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + std::unique_ptr dataset_writer, + FileSystemDatasetWriteOptions write_options, + bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + dataset_writer_(std::move(dataset_writer)), + write_options_(std::move(write_options)), + backpressure_control_(this) { + } + + static Result Make(compute::ExecPlan* plan, + std::vector inputs, + const compute::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TeeNode")); + + const WriteNodeOptions write_node_options = + checked_cast(options); + const FileSystemDatasetWriteOptions& write_options = + write_node_options.write_options; + const std::shared_ptr schema = inputs[0]->output_schema(); + + ARROW_ASSIGN_OR_RAISE(auto dataset_writer, + internal::DatasetWriter::Make(write_options)); + + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), + std::move(dataset_writer), + std::move(write_options), + /*async_mode=*/true); + } + + const char* kind_name() const override { return "TeeNode"; } + + Result DoTee(const compute::ExecBatch& batch) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, + batch.ToRecordBatch(output_schema())); + ARROW_RETURN_NOT_OK(WriteNextBatch(std::move(record_batch), batch.guarantee)); + return batch; + } + + Status WriteNextBatch(std::shared_ptr batch, + compute::Expression guarantee) { + return WriteBatch(batch, + guarantee, + write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + util::tracing::Span span; + START_SPAN(span, "Tee", + {{"tee.base_dir", ToStringExtra()}, + {"tee.length", next_batch.length}}); + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + backpressure_control_->Pause(); + return has_room.Then([this] { backpressure_control_->Resume(); }); + } + return has_room; + }); + }); + } + + void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); + auto func = [this](compute::ExecBatch batch) { + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"tee", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); + auto result = DoTee(std::move(batch)); + MARK_SPAN(span, result.status()); + END_SPAN(span); + return result; + }; + this->SubmitTask(std::move(func), std::move(batch)); + } + + void Pause() override { inputs_[0]->PauseProducing(this, ++backpressure_counter_); } + + void Resume() override { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } + + protected: + std::string ToStringExtra(int indent = 0) const override { + return "base_dir=" + write_options_.base_dir; + } + + private: + std::unique_ptr dataset_writer_; + FileSystemDatasetWriteOptions write_options_; + util::SerializedAsyncTaskGroup task_group_; + compute::BackpressureControl* backpressure_control_; + int32_t backpressure_counter_ = 0; +}; + +} // namespace + namespace internal { void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("write", MakeWriteNode)); + DCHECK_OK(registry->AddFactory("tee", TeeNode::Make)); } } // namespace internal From 7e132d0764a2691c0b797d0d56ce164266bfa585 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 1 May 2022 03:00:07 -0400 Subject: [PATCH 2/7] Fix telemetry --- cpp/src/arrow/dataset/file_base.cc | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 78dcfd11771..194bd5dbb65 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -482,9 +482,9 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { const Partitioning::PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { util::tracing::Span span; - START_SPAN(span, "Tee", - {{"tee.base_dir", ToStringExtra()}, - {"tee.length", next_batch.length}}); + START_COMPUTE_SPAN(span, "Tee", + {{"tee.base_dir", ToStringExtra()}, + {"tee.length", next_batch->num_rows()}}); Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.prefix); if (!has_room.is_finished()) { From 793cfd990e9b9a6f30a4965bf72ad47927e89306 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 1 May 2022 03:54:39 -0400 Subject: [PATCH 3/7] lint --- cpp/src/arrow/dataset/file_base.cc | 90 ++++++++++++++---------------- 1 file changed, 42 insertions(+), 48 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 194bd5dbb65..8d99322b7e8 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -270,12 +270,11 @@ Future<> FileWriter::Finish() { namespace { -Status WriteBatch( - std::shared_ptr batch, - compute::Expression guarantee, - FileSystemDatasetWriteOptions write_options, - std::function, - const Partitioning::PartitionPathFormat&)> write) { +Status WriteBatch(std::shared_ptr batch, compute::Expression guarantee, + FileSystemDatasetWriteOptions write_options, + std::function, + const Partitioning::PartitionPathFormat&)> + write) { ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); batch.reset(); // drop to hopefully conserve memory @@ -335,24 +334,23 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { private: Status WriteNextBatch(std::shared_ptr batch, compute::Expression guarantee) { - return WriteBatch(batch, - guarantee, - write_options_, - [this](std::shared_ptr next_batch, - const Partitioning::PartitionPathFormat& destination) { - return task_group_.AddTask([this, next_batch, destination] { - Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); - if (!has_room.is_finished()) { - // We don't have to worry about sequencing backpressure here since task_group_ - // serves as our sequencer. If batches continue to arrive after we pause they - // will queue up in task_group_ until we free up and call Resume - backpressure_control_->Pause(); - return has_room.Then([this] { backpressure_control_->Resume(); }); - } - return has_room; - }); - }); + return WriteBatch( + batch, guarantee, write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + // We don't have to worry about sequencing backpressure here since task_group_ + // serves as our sequencer. If batches continue to arrive after we pause they + // will queue up in task_group_ until we free up and call Resume + backpressure_control_->Pause(); + return has_room.Then([this] { backpressure_control_->Resume(); }); + } + return has_room; + }); + }); } std::shared_ptr custom_metadata_; @@ -436,13 +434,11 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { TeeNode(compute::ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, std::unique_ptr dataset_writer, - FileSystemDatasetWriteOptions write_options, - bool async_mode) + FileSystemDatasetWriteOptions write_options, bool async_mode) : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), dataset_writer_(std::move(dataset_writer)), write_options_(std::move(write_options)), - backpressure_control_(this) { - } + backpressure_control_(this) {} static Result Make(compute::ExecPlan* plan, std::vector inputs, @@ -459,8 +455,7 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { internal::DatasetWriter::Make(write_options)); return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), - std::move(dataset_writer), - std::move(write_options), + std::move(dataset_writer), std::move(write_options), /*async_mode=*/true); } @@ -475,25 +470,24 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { Status WriteNextBatch(std::shared_ptr batch, compute::Expression guarantee) { - return WriteBatch(batch, - guarantee, - write_options_, - [this](std::shared_ptr next_batch, - const Partitioning::PartitionPathFormat& destination) { - return task_group_.AddTask([this, next_batch, destination] { - util::tracing::Span span; - START_COMPUTE_SPAN(span, "Tee", - {{"tee.base_dir", ToStringExtra()}, - {"tee.length", next_batch->num_rows()}}); - Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); - if (!has_room.is_finished()) { - backpressure_control_->Pause(); - return has_room.Then([this] { backpressure_control_->Resume(); }); - } - return has_room; + return WriteBatch( + batch, guarantee, write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + util::tracing::Span span; + START_COMPUTE_SPAN(span, "Tee", + {{"tee.base_dir", ToStringExtra()}, + {"tee.length", next_batch->num_rows()}}); + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + backpressure_control_->Pause(); + return has_room.Then([this] { backpressure_control_->Resume(); }); + } + return has_room; + }); }); - }); } void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override { From 0b19e5c46599ef692a12461cec92b3f0070a2aed Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Sun, 1 May 2022 04:50:10 -0400 Subject: [PATCH 4/7] lint --- cpp/src/arrow/dataset/file_base.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 8d99322b7e8..884f1273376 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -342,9 +342,10 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.prefix); if (!has_room.is_finished()) { - // We don't have to worry about sequencing backpressure here since task_group_ - // serves as our sequencer. If batches continue to arrive after we pause they - // will queue up in task_group_ until we free up and call Resume + // We don't have to worry about sequencing backpressure here since + // task_group_ serves as our sequencer. If batches continue to arrive after + // we pause they will queue up in task_group_ until we free up and call + // Resume backpressure_control_->Pause(); return has_room.Then([this] { backpressure_control_->Resume(); }); } @@ -447,8 +448,7 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { const WriteNodeOptions write_node_options = checked_cast(options); - const FileSystemDatasetWriteOptions& write_options = - write_node_options.write_options; + const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; const std::shared_ptr schema = inputs[0]->output_schema(); ARROW_ASSIGN_OR_RAISE(auto dataset_writer, From 7d04488f21880b5e2b4a9b4154bf7a583a2e7a24 Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Tue, 3 May 2022 14:34:15 -0400 Subject: [PATCH 5/7] Export MapNode and AtomicCounter --- cpp/src/arrow/compute/exec/exec_plan.h | 2 +- cpp/src/arrow/compute/exec/util.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index dc2a4083abf..c20dc0d048c 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -292,7 +292,7 @@ class ARROW_EXPORT ExecNode { /// takes a batch in and returns a batch. This simple parallel runner also needs an /// executor (use simple synchronous runner if there is no executor) -class MapNode : public ExecNode { +class ARROW_EXPORT MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, bool async_mode); diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index b0e423c8580..3ec7016e6a5 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -236,7 +236,7 @@ ARROW_EXPORT Result> TableFromExecBatches( const std::shared_ptr& schema, const std::vector& exec_batches); -class AtomicCounter { +class ARROW_EXPORT AtomicCounter { public: AtomicCounter() = default; From f27ad32cffe6700c3e12d4e6c0ca38d7ea20090e Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 5 May 2022 05:04:46 -0400 Subject: [PATCH 6/7] code cleanup --- cpp/src/arrow/dataset/file_base.cc | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 884f1273376..aba3fe00653 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -430,7 +430,7 @@ Result MakeWriteNode(compute::ExecPlan* plan, namespace { -class TeeNode : public compute::MapNode, public compute::BackpressureControl { +class TeeNode : public compute::MapNode { public: TeeNode(compute::ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, @@ -438,8 +438,7 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { FileSystemDatasetWriteOptions write_options, bool async_mode) : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), dataset_writer_(std::move(dataset_writer)), - write_options_(std::move(write_options)), - backpressure_control_(this) {} + write_options_(std::move(write_options)) {} static Result Make(compute::ExecPlan* plan, std::vector inputs, @@ -476,14 +475,11 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { const Partitioning::PartitionPathFormat& destination) { return task_group_.AddTask([this, next_batch, destination] { util::tracing::Span span; - START_COMPUTE_SPAN(span, "Tee", - {{"tee.base_dir", ToStringExtra()}, - {"tee.length", next_batch->num_rows()}}); Future<> has_room = dataset_writer_->WriteRecordBatch( next_batch, destination.directory, destination.prefix); if (!has_room.is_finished()) { - backpressure_control_->Pause(); - return has_room.Then([this] { backpressure_control_->Resume(); }); + this->Pause(); + return has_room.Then([this] { this->Resume(); }); } return has_room; }); @@ -507,9 +503,9 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { this->SubmitTask(std::move(func), std::move(batch)); } - void Pause() override { inputs_[0]->PauseProducing(this, ++backpressure_counter_); } + void Pause() { inputs_[0]->PauseProducing(this, ++backpressure_counter_); } - void Resume() override { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } + void Resume() { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } protected: std::string ToStringExtra(int indent = 0) const override { @@ -520,7 +516,6 @@ class TeeNode : public compute::MapNode, public compute::BackpressureControl { std::unique_ptr dataset_writer_; FileSystemDatasetWriteOptions write_options_; util::SerializedAsyncTaskGroup task_group_; - compute::BackpressureControl* backpressure_control_; int32_t backpressure_counter_ = 0; }; From b28aaeb7cc90b88dbd84c62800b87e4f957f7f0d Mon Sep 17 00:00:00 2001 From: Yaron Gvili Date: Thu, 5 May 2022 09:59:11 -0400 Subject: [PATCH 7/7] lint --- cpp/src/arrow/dataset/file_base.cc | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index aba3fe00653..822fc714623 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -469,21 +469,20 @@ class TeeNode : public compute::MapNode { Status WriteNextBatch(std::shared_ptr batch, compute::Expression guarantee) { - return WriteBatch( - batch, guarantee, write_options_, - [this](std::shared_ptr next_batch, - const Partitioning::PartitionPathFormat& destination) { - return task_group_.AddTask([this, next_batch, destination] { - util::tracing::Span span; - Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); - if (!has_room.is_finished()) { - this->Pause(); - return has_room.Then([this] { this->Resume(); }); - } - return has_room; - }); - }); + return WriteBatch(batch, guarantee, write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + util::tracing::Span span; + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + this->Pause(); + return has_room.Then([this] { this->Resume(); }); + } + return has_room; + }); + }); } void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override {