Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ class ARROW_EXPORT ExecNode {
/// takes a batch in and returns a batch. This simple parallel runner also needs an
/// executor (use simple synchronous runner if there is no executor)

class MapNode : public ExecNode {
class ARROW_EXPORT MapNode : public ExecNode {
public:
MapNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
std::shared_ptr<Schema> output_schema, bool async_mode);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ ARROW_EXPORT
Result<std::shared_ptr<Table>> TableFromExecBatches(
const std::shared_ptr<Schema>& schema, const std::vector<ExecBatch>& exec_batches);

class AtomicCounter {
class ARROW_EXPORT AtomicCounter {
public:
AtomicCounter() = default;

Expand Down
176 changes: 142 additions & 34 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "arrow/util/map.h"
#include "arrow/util/string.h"
#include "arrow/util/task_group.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/variant.h"

namespace arrow {
Expand Down Expand Up @@ -269,6 +270,36 @@ Future<> FileWriter::Finish() {

namespace {

Status WriteBatch(std::shared_ptr<RecordBatch> batch, compute::Expression guarantee,
FileSystemDatasetWriteOptions write_options,
std::function<Status(std::shared_ptr<RecordBatch>,
const Partitioning::PartitionPathFormat&)>
write) {
ARROW_ASSIGN_OR_RAISE(auto groups, write_options.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

if (write_options.max_partitions <= 0) {
return Status::Invalid("max_partitions must be positive (was ",
write_options.max_partitions, ")");
}

if (groups.batches.size() > static_cast<size_t>(write_options.max_partitions)) {
return Status::Invalid("Fragment would be written into ", groups.batches.size(),
" partitions. This exceeds the maximum of ",
write_options.max_partitions);
}

for (std::size_t index = 0; index < groups.batches.size(); index++) {
auto partition_expression = and_(groups.expressions[index], guarantee);
auto next_batch = groups.batches[index];
Partitioning::PartitionPathFormat destination;
ARROW_ASSIGN_OR_RAISE(destination,
write_options.partitioning->Format(partition_expression));
RETURN_NOT_OK(write(next_batch, destination));
}
return Status::OK();
}

class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
Expand Down Expand Up @@ -303,40 +334,24 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
private:
Status WriteNextBatch(std::shared_ptr<RecordBatch> batch,
compute::Expression guarantee) {
ARROW_ASSIGN_OR_RAISE(auto groups, write_options_.partitioning->Partition(batch));
batch.reset(); // drop to hopefully conserve memory

if (write_options_.max_partitions <= 0) {
return Status::Invalid("max_partitions must be positive (was ",
write_options_.max_partitions, ")");
}

if (groups.batches.size() > static_cast<size_t>(write_options_.max_partitions)) {
return Status::Invalid("Fragment would be written into ", groups.batches.size(),
" partitions. This exceeds the maximum of ",
write_options_.max_partitions);
}

for (std::size_t index = 0; index < groups.batches.size(); index++) {
auto partition_expression = and_(groups.expressions[index], guarantee);
auto next_batch = groups.batches[index];
Partitioning::PartitionPathFormat destination;
ARROW_ASSIGN_OR_RAISE(destination,
write_options_.partitioning->Format(partition_expression));
RETURN_NOT_OK(task_group_.AddTask([this, next_batch, destination] {
Future<> has_room = dataset_writer_->WriteRecordBatch(
next_batch, destination.directory, destination.prefix);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since task_group_
// serves as our sequencer. If batches continue to arrive after we pause they
// will queue up in task_group_ until we free up and call Resume
backpressure_control_->Pause();
return has_room.Then([this] { backpressure_control_->Resume(); });
}
return has_room;
}));
}
return Status::OK();
return WriteBatch(
batch, guarantee, write_options_,
[this](std::shared_ptr<RecordBatch> next_batch,
const Partitioning::PartitionPathFormat& destination) {
return task_group_.AddTask([this, next_batch, destination] {
Future<> has_room = dataset_writer_->WriteRecordBatch(
next_batch, destination.directory, destination.prefix);
if (!has_room.is_finished()) {
// We don't have to worry about sequencing backpressure here since
// task_group_ serves as our sequencer. If batches continue to arrive after
// we pause they will queue up in task_group_ until we free up and call
// Resume
backpressure_control_->Pause();
return has_room.Then([this] { backpressure_control_->Resume(); });
}
return has_room;
});
});
}

std::shared_ptr<const KeyValueMetadata> custom_metadata_;
Expand Down Expand Up @@ -413,9 +428,102 @@ Result<compute::ExecNode*> MakeWriteNode(compute::ExecPlan* plan,
return node;
}

namespace {

class TeeNode : public compute::MapNode {
public:
TeeNode(compute::ExecPlan* plan, std::vector<compute::ExecNode*> inputs,
std::shared_ptr<Schema> output_schema,
std::unique_ptr<internal::DatasetWriter> dataset_writer,
FileSystemDatasetWriteOptions write_options, bool async_mode)
: MapNode(plan, std::move(inputs), std::move(output_schema), async_mode),
dataset_writer_(std::move(dataset_writer)),
write_options_(std::move(write_options)) {}

static Result<compute::ExecNode*> Make(compute::ExecPlan* plan,
std::vector<compute::ExecNode*> inputs,
const compute::ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TeeNode"));

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
const std::shared_ptr<Schema> schema = inputs[0]->output_schema();

ARROW_ASSIGN_OR_RAISE(auto dataset_writer,
internal::DatasetWriter::Make(write_options));

return plan->EmplaceNode<TeeNode>(plan, std::move(inputs), std::move(schema),
std::move(dataset_writer), std::move(write_options),
/*async_mode=*/true);
}

const char* kind_name() const override { return "TeeNode"; }

Result<compute::ExecBatch> DoTee(const compute::ExecBatch& batch) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> record_batch,
batch.ToRecordBatch(output_schema()));
ARROW_RETURN_NOT_OK(WriteNextBatch(std::move(record_batch), batch.guarantee));
return batch;
}

Status WriteNextBatch(std::shared_ptr<RecordBatch> batch,
compute::Expression guarantee) {
return WriteBatch(batch, guarantee, write_options_,
[this](std::shared_ptr<RecordBatch> next_batch,
const Partitioning::PartitionPathFormat& destination) {
return task_group_.AddTask([this, next_batch, destination] {
util::tracing::Span span;
Future<> has_room = dataset_writer_->WriteRecordBatch(
next_batch, destination.directory, destination.prefix);
if (!has_room.is_finished()) {
this->Pause();
return has_room.Then([this] { this->Resume(); });
}
return has_room;
});
});
}

void InputReceived(compute::ExecNode* input, compute::ExecBatch batch) override {
EVENT(span_, "InputReceived", {{"batch.length", batch.length}});
DCHECK_EQ(input, inputs_[0]);
auto func = [this](compute::ExecBatch batch) {
util::tracing::Span span;
START_SPAN_WITH_PARENT(span, span_, "InputReceived",
{{"tee", ToStringExtra()},
{"node.label", label()},
{"batch.length", batch.length}});
auto result = DoTee(std::move(batch));
MARK_SPAN(span, result.status());
END_SPAN(span);
return result;
};
this->SubmitTask(std::move(func), std::move(batch));
}

void Pause() { inputs_[0]->PauseProducing(this, ++backpressure_counter_); }

void Resume() { inputs_[0]->ResumeProducing(this, ++backpressure_counter_); }

protected:
std::string ToStringExtra(int indent = 0) const override {
return "base_dir=" + write_options_.base_dir;
}

private:
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
FileSystemDatasetWriteOptions write_options_;
util::SerializedAsyncTaskGroup task_group_;
int32_t backpressure_counter_ = 0;
};

} // namespace

namespace internal {
void InitializeDatasetWriter(arrow::compute::ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("write", MakeWriteNode));
DCHECK_OK(registry->AddFactory("tee", TeeNode::Make));
}
} // namespace internal

Expand Down