Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/exec/expression.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
#include "arrow/util/visibility.h"

namespace arrow {

namespace internal {

class Executor;

} // namespace internal

namespace compute {

using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
Expand Down Expand Up @@ -77,6 +85,49 @@ class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
int64_t max_batch_size;
};

/// \brief An extended Source node which accepts a schema
///
/// ItMaker is a maker of an iterator of tabular data.
template <typename ItMaker>
class ARROW_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
public:
SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
arrow::internal::Executor* io_executor = NULLPTR)
: schema(schema), it_maker(std::move(it_maker)), io_executor(io_executor) {}

/// \brief The schema of the record batches from the iterator
std::shared_ptr<Schema> schema;

/// \brief A maker of an iterator which acts as the data source
ItMaker it_maker;

/// \brief The executor to use for scanning the iterator
///
/// Defaults to the default I/O executor.
arrow::internal::Executor* io_executor;
};

using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
/// \brief An extended Source node which accepts a schema and array-vectors
class ARROW_EXPORT ArrayVectorSourceNodeOptions
: public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};

using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
/// \brief An extended Source node which accepts a schema and exec-batches
class ARROW_EXPORT ExecBatchSourceNodeOptions
: public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};

using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
/// \brief An extended Source node which accepts a schema and record-batches
class ARROW_EXPORT RecordBatchSourceNodeOptions
: public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};

/// \brief Make a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
Expand Down
79 changes: 79 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,85 @@ TEST(ExecPlanExecution, TableSourceSinkError) {
Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
}

template <typename ElementType, typename OptionsType>
void TestSourceSinkError(
std::string source_factory_name,
std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
to_elements) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
std::shared_ptr<Schema> no_schema;

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
auto element_it_maker = [&elements]() {
return MakeVectorIterator<ElementType>(elements);
};

auto null_executor_options = OptionsType{exp_batches.schema, element_it_maker};
ASSERT_OK(MakeExecNode(source_factory_name, plan.get(), {}, null_executor_options));

auto null_schema_options = OptionsType{no_schema, element_it_maker};
ASSERT_THAT(MakeExecNode(source_factory_name, plan.get(), {}, null_schema_options),
Raises(StatusCode::Invalid, HasSubstr("not null")));
}

template <typename ElementType, typename OptionsType>
void TestSourceSink(
std::string source_factory_name,
std::function<Result<std::vector<ElementType>>(const BatchesWithSchema&)>
to_elements) {
ASSERT_OK_AND_ASSIGN(auto io_executor, arrow::internal::ThreadPool::Make(1));
ExecContext exec_context(default_memory_pool(), io_executor.get());
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make(&exec_context));
AsyncGenerator<std::optional<ExecBatch>> sink_gen;

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(auto elements, to_elements(exp_batches));
auto element_it_maker = [&elements]() {
return MakeVectorIterator<ElementType>(elements);
};

ASSERT_OK(Declaration::Sequence({
{source_factory_name,
OptionsType{exp_batches.schema, element_it_maker}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray(exp_batches.batches))));
}

TEST(ExecPlanExecution, ArrayVectorSourceSink) {
TestSourceSink<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
"array_vector_source", ToArrayVectors);
}

TEST(ExecPlanExecution, ArrayVectorSourceSinkError) {
TestSourceSinkError<std::shared_ptr<ArrayVector>, ArrayVectorSourceNodeOptions>(
"array_vector_source", ToArrayVectors);
}

TEST(ExecPlanExecution, ExecBatchSourceSink) {
TestSourceSink<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
"exec_batch_source", ToExecBatches);
}

TEST(ExecPlanExecution, ExecBatchSourceSinkError) {
TestSourceSinkError<std::shared_ptr<ExecBatch>, ExecBatchSourceNodeOptions>(
"exec_batch_source", ToExecBatches);
}

TEST(ExecPlanExecution, RecordBatchSourceSink) {
TestSourceSink<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
"record_batch_source", ToRecordBatches);
}

TEST(ExecPlanExecution, RecordBatchSourceSinkError) {
TestSourceSinkError<std::shared_ptr<RecordBatch>, RecordBatchSourceNodeOptions>(
"record_batch_source", ToRecordBatches);
}

TEST(ExecPlanExecution, SinkNodeBackpressure) {
std::optional<ExecBatch> batch =
ExecBatchFromJSON({int32(), boolean()},
Expand Down
136 changes: 136 additions & 0 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
#include "arrow/datum.h"
#include "arrow/io/util_internal.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
Expand Down Expand Up @@ -293,13 +294,148 @@ struct TableSourceNode : public SourceNode {
}
};

template <typename This, typename Options>
struct SchemaSourceNode : public SourceNode {
SchemaSourceNode(ExecPlan* plan, std::shared_ptr<Schema> schema,
arrow::AsyncGenerator<std::optional<ExecBatch>> generator)
: SourceNode(plan, schema, generator) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, This::kKindName));
const auto& cast_options = checked_cast<const Options&>(options);
auto& it_maker = cast_options.it_maker;
auto& schema = cast_options.schema;
auto io_executor = cast_options.io_executor;

if (io_executor == NULLPTR) {
io_executor = plan->exec_context()->executor();
}
auto it = it_maker();

if (schema == NULLPTR) {
return Status::Invalid(This::kKindName, " requires schema which is not null");
}
if (io_executor == NULLPTR) {
io_executor = io::internal::GetIOThreadPool();
}

ARROW_ASSIGN_OR_RAISE(auto generator, This::MakeGenerator(it, io_executor, schema));
return plan->EmplaceNode<This>(plan, schema, generator);
}
};

struct RecordBatchSourceNode
: public SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions> {
using RecordBatchSchemaSourceNode =
SchemaSourceNode<RecordBatchSourceNode, RecordBatchSourceNodeOptions>;

using RecordBatchSchemaSourceNode::RecordBatchSchemaSourceNode;

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
return RecordBatchSchemaSourceNode::Make(plan, inputs, options);
}

const char* kind_name() const override { return kKindName; }

static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
Iterator<std::shared_ptr<RecordBatch>>& batch_it,
arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& schema) {
auto to_exec_batch =
[schema](const std::shared_ptr<RecordBatch>& batch) -> std::optional<ExecBatch> {
if (batch == NULLPTR || *batch->schema() != *schema) {
return std::nullopt;
}
return std::optional<ExecBatch>(ExecBatch(*batch));
};
auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
}

static const char kKindName[];
};

const char RecordBatchSourceNode::kKindName[] = "RecordBatchSourceNode";

struct ExecBatchSourceNode
: public SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions> {
using ExecBatchSchemaSourceNode =
SchemaSourceNode<ExecBatchSourceNode, ExecBatchSourceNodeOptions>;

using ExecBatchSchemaSourceNode::ExecBatchSchemaSourceNode;

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
return ExecBatchSchemaSourceNode::Make(plan, inputs, options);
}

const char* kind_name() const override { return kKindName; }

static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
Iterator<std::shared_ptr<ExecBatch>>& batch_it,
arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& schema) {
auto to_exec_batch =
[](const std::shared_ptr<ExecBatch>& batch) -> std::optional<ExecBatch> {
return batch == NULLPTR ? std::nullopt : std::optional<ExecBatch>(*batch);
};
auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(batch_it));
return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
}

static const char kKindName[];
};

const char ExecBatchSourceNode::kKindName[] = "ExecBatchSourceNode";

struct ArrayVectorSourceNode
: public SchemaSourceNode<ArrayVectorSourceNode, ArrayVectorSourceNodeOptions> {
using ArrayVectorSchemaSourceNode =
SchemaSourceNode<ArrayVectorSourceNode, ArrayVectorSourceNodeOptions>;

using ArrayVectorSchemaSourceNode::ArrayVectorSchemaSourceNode;

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
return ArrayVectorSchemaSourceNode::Make(plan, inputs, options);
}

const char* kind_name() const override { return kKindName; }

static Result<arrow::AsyncGenerator<std::optional<ExecBatch>>> MakeGenerator(
Iterator<std::shared_ptr<ArrayVector>>& arrayvec_it,
arrow::internal::Executor* io_executor, const std::shared_ptr<Schema>& schema) {
auto to_exec_batch =
[](const std::shared_ptr<ArrayVector>& arrayvec) -> std::optional<ExecBatch> {
if (arrayvec == NULLPTR || arrayvec->size() == 0) {
return std::nullopt;
}
std::vector<Datum> datumvec;
for (const auto& array : *arrayvec) {
datumvec.push_back(Datum(array));
}
return std::optional<ExecBatch>(
ExecBatch(std::move(datumvec), (*arrayvec)[0]->length()));
};
auto exec_batch_it = MakeMapIterator(to_exec_batch, std::move(arrayvec_it));
return MakeBackgroundGenerator(std::move(exec_batch_it), io_executor);
}

static const char kKindName[];
};

const char ArrayVectorSourceNode::kKindName[] = "ArrayVectorSourceNode";

} // namespace

namespace internal {

void RegisterSourceNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("source", SourceNode::Make));
DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make));
DCHECK_OK(registry->AddFactory("record_batch_source", RecordBatchSourceNode::Make));
DCHECK_OK(registry->AddFactory("exec_batch_source", ExecBatchSourceNode::Make));
DCHECK_OK(registry->AddFactory("array_vector_source", ArrayVectorSourceNode::Make));
}

} // namespace internal
Expand Down
32 changes: 32 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,38 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr<Schema>& schema,
return out_batches;
}

Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
const BatchesWithSchema& batches_with_schema) {
std::vector<std::shared_ptr<ArrayVector>> arrayvecs;
for (auto batch : batches_with_schema.batches) {
ARROW_ASSIGN_OR_RAISE(auto record_batch,
batch.ToRecordBatch(batches_with_schema.schema));
arrayvecs.push_back(std::make_shared<ArrayVector>(record_batch->columns()));
}
return arrayvecs;
}

Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
const BatchesWithSchema& batches_with_schema) {
std::vector<std::shared_ptr<ExecBatch>> exec_batches;
for (auto batch : batches_with_schema.batches) {
auto exec_batch = std::make_shared<ExecBatch>(batch);
exec_batches.push_back(exec_batch);
}
return exec_batches;
}

Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const BatchesWithSchema& batches_with_schema) {
std::vector<std::shared_ptr<RecordBatch>> record_batches;
for (auto batch : batches_with_schema.batches) {
ARROW_ASSIGN_OR_RAISE(auto record_batch,
batch.ToRecordBatch(batches_with_schema.schema));
record_batches.push_back(record_batch);
}
return record_batches;
}

Result<std::shared_ptr<Table>> SortTableOnAllFields(const std::shared_ptr<Table>& tab) {
std::vector<SortKey> sort_keys;
for (auto&& f : tab->schema()->fields()) {
Expand Down
24 changes: 24 additions & 0 deletions cpp/src/arrow/compute/exec/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,30 @@ BatchesWithSchema MakeBatchesFromString(const std::shared_ptr<Schema>& schema,
const std::vector<std::string_view>& json_strings,
int multiplicity = 1);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
const BatchesWithSchema& batches_with_schema);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
const BatchesWithSchema& batches);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const BatchesWithSchema& batches);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<ArrayVector>>> ToArrayVectors(
const BatchesWithSchema& batches_with_schema);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<ExecBatch>>> ToExecBatches(
const BatchesWithSchema& batches);

ARROW_TESTING_EXPORT
Result<std::vector<std::shared_ptr<RecordBatch>>> ToRecordBatches(
const BatchesWithSchema& batches);

ARROW_TESTING_EXPORT
Result<std::shared_ptr<Table>> SortTableOnAllFields(const std::shared_ptr<Table>& tab);

Expand Down