Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cpp/examples/arrow/engine_substrait_consumption.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ class IgnoringConsumer : public cp::SinkNodeConsumer {
public:
explicit IgnoringConsumer(size_t tag) : tag_{tag} {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema) override {
return arrow::Status::OK();
}

arrow::Status Consume(cp::ExecBatch batch) override {
// Consume a batch of data
// (just print its row count to stdout)
Expand Down
6 changes: 5 additions & 1 deletion cpp/examples/arrow/execution_plan_documentation_examples.cc
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,10 @@ arrow::Status SourceConsumingSinkExample(cp::ExecContext& exec_context) {
CustomSinkNodeConsumer(std::atomic<uint32_t>* batches_seen, arrow::Future<> finish)
: batches_seen(batches_seen), finish(std::move(finish)) {}

arrow::Status Init(const std::shared_ptr<arrow::Schema>& schema) override {
return arrow::Status::OK();
}

arrow::Status Consume(cp::ExecBatch batch) override {
(*batches_seen)++;
return arrow::Status::OK();
Expand Down Expand Up @@ -794,7 +798,7 @@ arrow::Status ScanFilterWriteExample(cp::ExecContext& exec_context,
write_options.partitioning = partitioning;
write_options.basename_template = "part{i}.parquet";

arrow::dataset::WriteNodeOptions write_node_options{write_options, dataset->schema()};
arrow::dataset::WriteNodeOptions write_node_options{write_options};

ARROW_RETURN_NOT_OK(cp::MakeExecNode("write", plan.get(), {scan}, write_node_options));

Expand Down
12 changes: 8 additions & 4 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ class ARROW_EXPORT SinkNodeOptions : public ExecNodeOptions {
class ARROW_EXPORT SinkNodeConsumer {
public:
virtual ~SinkNodeConsumer() = default;
/// \brief Prepare any consumer state
///
/// This will be run once the schema is finalized as the plan is starting and
/// before any calls to Consume. A common use is to save off the schema so that
/// batches can be interpreted.
virtual Status Init(const std::shared_ptr<Schema>& schema) = 0;
/// \brief Consume a batch of data
virtual Status Consume(ExecBatch batch) = 0;
/// \brief Signal to the consumer that the last batch has been delivered
Expand Down Expand Up @@ -308,12 +314,10 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions {
/// a table pointer.
class ARROW_EXPORT TableSinkNodeOptions : public ExecNodeOptions {
public:
TableSinkNodeOptions(std::shared_ptr<Table>* output_table,
std::shared_ptr<Schema> output_schema)
: output_table(output_table), output_schema(std::move(output_schema)) {}
explicit TableSinkNodeOptions(std::shared_ptr<Table>* output_table)
: output_table(output_table) {}

std::shared_ptr<Table>* output_table;
std::shared_ptr<Schema> output_schema;
};

} // namespace compute
Expand Down
52 changes: 24 additions & 28 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,10 @@ TEST(ExecPlanExecution, SourceConsumingSink) {
TestConsumer(std::atomic<uint32_t>* batches_seen, Future<> finish)
: batches_seen(batches_seen), finish(std::move(finish)) {}

Status Init(const std::shared_ptr<Schema>& schema) override {
return Status::OK();
}

Status Consume(ExecBatch batch) override {
(*batches_seen)++;
return Status::OK();
Expand Down Expand Up @@ -539,7 +543,7 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) {

auto basic_data = MakeBasicBatches();

TableSinkNodeOptions options{&out, basic_data.schema};
TableSinkNodeOptions options{&out};

ASSERT_OK_AND_ASSIGN(
auto source, MakeExecNode("source", plan.get(), {},
Expand All @@ -560,16 +564,26 @@ TEST(ExecPlanExecution, SourceTableConsumingSink) {
}

TEST(ExecPlanExecution, ConsumingSinkError) {
struct InitErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema) override {
return Status::Invalid("XYZ");
}
Status Consume(ExecBatch batch) override { return Status::OK(); }
Future<> Finish() override { return Future<>::MakeFinished(); }
};
struct ConsumeErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); }
Status Consume(ExecBatch batch) override { return Status::Invalid("XYZ"); }
Future<> Finish() override { return Future<>::MakeFinished(); }
};
struct FinishErrorConsumer : public SinkNodeConsumer {
Status Init(const std::shared_ptr<Schema>& schema) override { return Status::OK(); }
Status Consume(ExecBatch batch) override { return Status::OK(); }
Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
};
std::vector<std::shared_ptr<SinkNodeConsumer>> consumers{
std::make_shared<ConsumeErrorConsumer>(), std::make_shared<FinishErrorConsumer>()};
std::make_shared<InitErrorConsumer>(), std::make_shared<ConsumeErrorConsumer>(),
std::make_shared<FinishErrorConsumer>()};

for (auto& consumer : consumers) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
Expand All @@ -585,35 +599,17 @@ TEST(ExecPlanExecution, ConsumingSinkError) {
SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
ConsumingSinkNodeOptions(consumer)));
ASSERT_OK(plan->StartProducing());
ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
// If we fail at init we see it during StartProducing. Other
// failures are not seen until we start running.
if (std::dynamic_pointer_cast<InitErrorConsumer>(consumer)) {
ASSERT_RAISES(Invalid, plan->StartProducing());
} else {
ASSERT_OK(plan->StartProducing());
ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
}
}
}

TEST(ExecPlanExecution, ConsumingSinkErrorFinish) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
struct FinishErrorConsumer : public SinkNodeConsumer {
Status Consume(ExecBatch batch) override { return Status::OK(); }
Future<> Finish() override { return Future<>::MakeFinished(Status::Invalid("XYZ")); }
};
std::shared_ptr<FinishErrorConsumer> consumer = std::make_shared<FinishErrorConsumer>();

auto basic_data = MakeBasicBatches();
ASSERT_OK(
Declaration::Sequence(
{{"source", SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))},
{"consuming_sink", ConsumingSinkNodeOptions(consumer)}})
.AddToPlan(plan.get()));
ASSERT_OK_AND_ASSIGN(
auto source,
MakeExecNode("source", plan.get(), {},
SourceNodeOptions(basic_data.schema, basic_data.gen(false, false))));
ASSERT_OK(MakeExecNode("consuming_sink", plan.get(), {source},
ConsumingSinkNodeOptions(consumer)));
ASSERT_OK(plan->StartProducing());
ASSERT_FINISHES_AND_RAISES(Invalid, plan->finished());
}

TEST(ExecPlanExecution, StressSourceSink) {
for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");
Expand Down
20 changes: 13 additions & 7 deletions cpp/src/arrow/compute/exec/sink_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ class ConsumingSinkNode : public ExecNode {
{{"node.label", label()},
{"node.detail", ToString()},
{"node.kind", kind_name()}});
DCHECK_GT(inputs_.size(), 0);
RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema()));
finished_ = Future<>::Make();
END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this);
return Status::OK();
Expand Down Expand Up @@ -268,13 +270,17 @@ class ConsumingSinkNode : public ExecNode {

struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {
public:
TableSinkNodeConsumer(std::shared_ptr<Table>* out,
std::shared_ptr<Schema> output_schema, MemoryPool* pool)
: out_(out), output_schema_(std::move(output_schema)), pool_(pool) {}
TableSinkNodeConsumer(std::shared_ptr<Table>* out, MemoryPool* pool)
: out_(out), pool_(pool) {}

Status Init(const std::shared_ptr<Schema>& schema) override {
schema_ = schema;
return Status::OK();
}

Status Consume(ExecBatch batch) override {
std::lock_guard<std::mutex> guard(consume_mutex_);
ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(output_schema_, pool_));
ARROW_ASSIGN_OR_RAISE(auto rb, batch.ToRecordBatch(schema_, pool_));
batches_.push_back(rb);
return Status::OK();
}
Expand All @@ -286,8 +292,8 @@ struct TableSinkNodeConsumer : public arrow::compute::SinkNodeConsumer {

private:
std::shared_ptr<Table>* out_;
std::shared_ptr<Schema> output_schema_;
MemoryPool* pool_;
std::shared_ptr<Schema> schema_;
std::vector<std::shared_ptr<RecordBatch>> batches_;
std::mutex consume_mutex_;
};
Expand All @@ -298,8 +304,8 @@ static Result<ExecNode*> MakeTableConsumingSinkNode(
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 1, "TableConsumingSinkNode"));
const auto& sink_options = checked_cast<const TableSinkNodeOptions&>(options);
MemoryPool* pool = plan->exec_context()->memory_pool();
auto tb_consumer = std::make_shared<TableSinkNodeConsumer>(
sink_options.output_table, sink_options.output_schema, pool);
auto tb_consumer =
std::make_shared<TableSinkNodeConsumer>(sink_options.output_table, pool);
auto consuming_sink_node_options = ConsumingSinkNodeOptions{tb_consumer};
return MakeExecNode("consuming_sink", plan, inputs, consuming_sink_node_options);
}
Expand Down
32 changes: 23 additions & 9 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -271,22 +271,31 @@ namespace {

class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
public:
DatasetWritingSinkNodeConsumer(std::shared_ptr<Schema> schema,
DatasetWritingSinkNodeConsumer(std::shared_ptr<const KeyValueMetadata> custom_metadata,
std::unique_ptr<internal::DatasetWriter> dataset_writer,
FileSystemDatasetWriteOptions write_options,
std::shared_ptr<util::AsyncToggle> backpressure_toggle)
: schema_(std::move(schema)),
: custom_metadata_(std::move(custom_metadata)),
dataset_writer_(std::move(dataset_writer)),
write_options_(std::move(write_options)),
backpressure_toggle_(std::move(backpressure_toggle)) {}

Status Consume(compute::ExecBatch batch) {
Status Init(const std::shared_ptr<Schema>& schema) override {
if (custom_metadata_) {
schema_ = schema->WithMetadata(custom_metadata_);
} else {
schema_ = schema;
}
return Status::OK();
}

Status Consume(compute::ExecBatch batch) override {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<RecordBatch> record_batch,
batch.ToRecordBatch(schema_));
return WriteNextBatch(std::move(record_batch), batch.guarantee);
}

Future<> Finish() {
Future<> Finish() override {
RETURN_NOT_OK(task_group_.AddTask([this] { return dataset_writer_->Finish(); }));
return task_group_.End();
}
Expand Down Expand Up @@ -327,11 +336,12 @@ class DatasetWritingSinkNodeConsumer : public compute::SinkNodeConsumer {
return Status::OK();
}

std::shared_ptr<Schema> schema_;
std::shared_ptr<const KeyValueMetadata> custom_metadata_;
std::unique_ptr<internal::DatasetWriter> dataset_writer_;
FileSystemDatasetWriteOptions write_options_;
std::shared_ptr<util::AsyncToggle> backpressure_toggle_;
util::SerializedAsyncTaskGroup task_group_;
std::shared_ptr<Schema> schema_ = nullptr;
};

} // namespace
Expand All @@ -354,6 +364,10 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
std::shared_ptr<util::AsyncToggle> backpressure_toggle =
std::make_shared<util::AsyncToggle>();

// The projected_schema is currently used by pyarrow to preserve the custom metadata
// when reading from a single input file.
const auto& custom_metadata = scanner->options()->projected_schema->metadata();

RETURN_NOT_OK(
compute::Declaration::Sequence(
{
Expand All @@ -362,8 +376,7 @@ Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_optio
{"project",
compute::ProjectNodeOptions{std::move(exprs), std::move(names)}},
{"write",
WriteNodeOptions{write_options, scanner->options()->projected_schema,
backpressure_toggle}},
WriteNodeOptions{write_options, custom_metadata, backpressure_toggle}},
})
.AddToPlan(plan.get()));

Expand All @@ -381,8 +394,9 @@ Result<compute::ExecNode*> MakeWriteNode(compute::ExecPlan* plan,

const WriteNodeOptions write_node_options =
checked_cast<const WriteNodeOptions&>(options);
const std::shared_ptr<const KeyValueMetadata>& custom_metadata =
write_node_options.custom_metadata;
const FileSystemDatasetWriteOptions& write_options = write_node_options.write_options;
const std::shared_ptr<Schema>& schema = write_node_options.schema;
const std::shared_ptr<util::AsyncToggle>& backpressure_toggle =
write_node_options.backpressure_toggle;

Expand All @@ -391,7 +405,7 @@ Result<compute::ExecNode*> MakeWriteNode(compute::ExecPlan* plan,

std::shared_ptr<DatasetWritingSinkNodeConsumer> consumer =
std::make_shared<DatasetWritingSinkNodeConsumer>(
schema, std::move(dataset_writer), write_options, backpressure_toggle);
custom_metadata, std::move(dataset_writer), write_options, backpressure_toggle);

ARROW_ASSIGN_OR_RAISE(
auto node,
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -408,14 +408,18 @@ struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
class ARROW_DS_EXPORT WriteNodeOptions : public compute::ExecNodeOptions {
public:
explicit WriteNodeOptions(
FileSystemDatasetWriteOptions options, std::shared_ptr<Schema> schema,
FileSystemDatasetWriteOptions options,
std::shared_ptr<const KeyValueMetadata> custom_metadata = NULLPTR,
std::shared_ptr<util::AsyncToggle> backpressure_toggle = NULLPTR)
: write_options(std::move(options)),
schema(std::move(schema)),
custom_metadata(std::move(custom_metadata)),
backpressure_toggle(std::move(backpressure_toggle)) {}

/// \brief Options to control how to write the dataset
FileSystemDatasetWriteOptions write_options;
std::shared_ptr<Schema> schema;
/// \brief Optional metadata to attach to written batches
std::shared_ptr<const KeyValueMetadata> custom_metadata;
/// \brief Optional toggle that can be used to pause producers when the node is full
std::shared_ptr<util::AsyncToggle> backpressure_toggle;
};

Expand Down
5 changes: 3 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ ExecNode <- R6Class("ExecNode",
)
)

do_exec_plan_substrait <- function(substrait_plan, output_names) {
do_exec_plan_substrait <- function(substrait_plan) {
if (is.string(substrait_plan)) {
substrait_plan <- engine__internal__SubstraitFromJSON(substrait_plan)
} else if (is.raw(substrait_plan)) {
Expand All @@ -315,5 +315,5 @@ do_exec_plan_substrait <- function(substrait_plan, output_names) {
}

plan <- ExecPlan$create()
ExecPlan_run_substrait(plan, substrait_plan, output_names)
ExecPlan_run_substrait(plan, substrait_plan)
}
23 changes: 11 additions & 12 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading