diff --git a/cpp/src/arrow/compute/exec/exec_plan.h b/cpp/src/arrow/compute/exec/exec_plan.h index dc2a4083abf..c20dc0d048c 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.h +++ b/cpp/src/arrow/compute/exec/exec_plan.h @@ -292,7 +292,7 @@ class ARROW_EXPORT ExecNode { /// takes a batch in and returns a batch. This simple parallel runner also needs an /// executor (use simple synchronous runner if there is no executor) -class MapNode : public ExecNode { +class ARROW_EXPORT MapNode : public ExecNode { public: MapNode(ExecPlan* plan, std::vector inputs, std::shared_ptr output_schema, bool async_mode); diff --git a/cpp/src/arrow/compute/exec/util.h b/cpp/src/arrow/compute/exec/util.h index b0e423c8580..3ec7016e6a5 100644 --- a/cpp/src/arrow/compute/exec/util.h +++ b/cpp/src/arrow/compute/exec/util.h @@ -236,7 +236,7 @@ ARROW_EXPORT Result> TableFromExecBatches( const std::shared_ptr& schema, const std::vector& exec_batches); -class AtomicCounter { +class ARROW_EXPORT AtomicCounter { public: AtomicCounter() = default; diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index 57472b09c61..822fc714623 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -42,6 +42,7 @@ #include "arrow/util/map.h" #include "arrow/util/string.h" #include "arrow/util/task_group.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/variant.h" namespace arrow { @@ -269,6 +270,36 @@ Future<> FileWriter::Finish() { namespace { +Status WriteBatch(std::shared_ptr batch, compute::Expression guarantee, + FileSystemDatasetWriteOptions write_options, + std::function, + const Partitioning::PartitionPathFormat&)> + write) { + ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch)); + batch.reset(); // drop to hopefully conserve memory + + if (write_options.max_partitions <= 0) { + return Status::Invalid("max_partitions must be positive (was ", + write_options.max_partitions, ")"); + } + + if (groups.batches.size() > static_cast(write_options.max_partitions)) { + return Status::Invalid("Fragment would be written into ", groups.batches.size(), + " partitions. This exceeds the maximum of ", + write_options.max_partitions); + } + + for (std::size_t index = 0; index < groups.batches.size(); index++) { + auto partition_expression = and_(groups.expressions[index], guarantee); + auto next_batch = groups.batches[index]; + Partitioning::PartitionPathFormat destination; + ARROW_ASSIGN_OR_RAISE(destination, + write_options.partitioning->Format(partition_expression)); + RETURN_NOT_OK(write(next_batch, destination)); + } + return Status::OK(); +} + class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { public: DatasetWritingSinkNodeConsumer(std::shared_ptr custom_metadata, @@ -303,40 +334,24 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer { private: Status WriteNextBatch(std::shared_ptr batch, compute::Expression guarantee) { - ARROW_ASSIGN_OR_RAISE(auto groups, write_options_.partitioning->Partition(batch)); - batch.reset(); // drop to hopefully conserve memory - - if (write_options_.max_partitions <= 0) { - return Status::Invalid("max_partitions must be positive (was ", - write_options_.max_partitions, ")"); - } - - if (groups.batches.size() > static_cast(write_options_.max_partitions)) { - return Status::Invalid("Fragment would be written into ", groups.batches.size(), - " partitions. This exceeds the maximum of ", - write_options_.max_partitions); - } - - for (std::size_t index = 0; index < groups.batches.size(); index++) { - auto partition_expression = and_(groups.expressions[index], guarantee); - auto next_batch = groups.batches[index]; - Partitioning::PartitionPathFormat destination; - ARROW_ASSIGN_OR_RAISE(destination, - write_options_.partitioning->Format(partition_expression)); - RETURN_NOT_OK(task_group_.AddTask([this, next_batch, destination] { - Future<> has_room = dataset_writer_->WriteRecordBatch( - next_batch, destination.directory, destination.prefix); - if (!has_room.is_finished()) { - // We don't have to worry about sequencing backpressure here since task_group_ - // serves as our sequencer. If batches continue to arrive after we pause they - // will queue up in task_group_ until we free up and call Resume - backpressure_control_->Pause(); - return has_room.Then([this] { backpressure_control_->Resume(); }); - } - return has_room; - })); - } - return Status::OK(); + return WriteBatch( + batch, guarantee, write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + // We don't have to worry about sequencing backpressure here since + // task_group_ serves as our sequencer. If batches continue to arrive after + // we pause they will queue up in task_group_ until we free up and call + // Resume + backpressure_control_->Pause(); + return has_room.Then([this] { backpressure_control_->Resume(); }); + } + return has_room; + }); + }); } std::shared_ptr custom_metadata_; @@ -413,9 +428,102 @@ Result MakeWriteNode(compute::ExecPlan* plan, return node; } +namespace { + +class TeeNode : public compute::MapNode { + public: + TeeNode(compute::ExecPlan* plan, std::vector inputs, + std::shared_ptr output_schema, + std::unique_ptr dataset_writer, + FileSystemDatasetWriteOptions write_options, bool async_mode) + : MapNode(plan, std::move(inputs), std::move(output_schema), async_mode), + dataset_writer_(std::move(dataset_writer)), + write_options_(std::move(write_options)) {} + + static Result Make(compute::ExecPlan* plan, + std::vector inputs, + const compute::ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TeeNode")); + + const WriteNodeOptions write_node_options = + checked_cast(options); + const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options; + const std::shared_ptr schema = inputs[0]->output_schema(); + + ARROW_ASSIGN_OR_RAISE(auto dataset_writer, + internal::DatasetWriter::Make(write_options)); + + return plan->EmplaceNode(plan, std::move(inputs), std::move(schema), + std::move(dataset_writer), std::move(write_options), + /*async_mode=*/true); + } + + const char* kind_name() const override { return "TeeNode"; } + + Result DoTee(const compute::ExecBatch& batch) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr record_batch, + batch.ToRecordBatch(output_schema())); + ARROW_RETURN_NOT_OK(WriteNextBatch(std::move(record_batch), batch.guarantee)); + return batch; + } + + Status WriteNextBatch(std::shared_ptr batch, + compute::Expression guarantee) { + return WriteBatch(batch, guarantee, write_options_, + [this](std::shared_ptr next_batch, + const Partitioning::PartitionPathFormat& destination) { + return task_group_.AddTask([this, next_batch, destination] { + util::tracing::Span span; + Future<> has_room = dataset_writer_->WriteRecordBatch( + next_batch, destination.directory, destination.prefix); + if (!has_room.is_finished()) { + this->Pause(); + return has_room.Then([this] { this->Resume(); }); + } + return has_room; + }); + }); + } + + void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override { + EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); + DCHECK_EQ(input, inputs_[0]); + auto func = [this](compute::ExecBatch batch) { + util::tracing::Span span; + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"tee", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); + auto result = DoTee(std::move(batch)); + MARK_SPAN(span, result.status()); + END_SPAN(span); + return result; + }; + this->SubmitTask(std::move(func), std::move(batch)); + } + + void Pause() { inputs_[0]->PauseProducing(this, ++backpressure_counter_); } + + void Resume() { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); } + + protected: + std::string ToStringExtra(int indent = 0) const override { + return "base_dir=" + write_options_.base_dir; + } + + private: + std::unique_ptr dataset_writer_; + FileSystemDatasetWriteOptions write_options_; + util::SerializedAsyncTaskGroup task_group_; + int32_t backpressure_counter_ = 0; +}; + +} // namespace + namespace internal { void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("write", MakeWriteNode)); + DCHECK_OK(registry->AddFactory("tee", TeeNode::Make)); } } // namespace internal