Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,20 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::function<Future<util::optional<ExecBatch>>()> generator;
};

/// \brief An extended Source node which accepts a table
class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
public:
TableSourceNodeOptions(std::shared_ptr<Table> table, int64_t batch_size)
: table(table), batch_size(batch_size) {}

// arrow table which acts as the data source
std::shared_ptr<Table> table;
// Size of batches to emit from this node
// If the table is larger the node will emit multiple batches from the
// the table to be processed in parallel.
int64_t batch_size;
};

/// \brief Make a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
Expand Down
39 changes: 39 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,45 @@ TEST(ExecPlanExecution, SourceSink) {
}
}

TEST(ExecPlanExecution, TableSourceSink) {
for (int batch_size : {1, 4}) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(auto table,
TableFromExecBatches(exp_batches.schema, exp_batches.batches));

ASSERT_OK(Declaration::Sequence(
{
{"table_source", TableSourceNodeOptions{table, batch_size}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_FINISHES_OK_AND_ASSIGN(auto res, StartAndCollect(plan.get(), sink_gen));
ASSERT_OK_AND_ASSIGN(auto out_table, TableFromExecBatches(exp_batches.schema, res));
AssertTablesEqual(table, out_table);
}
}

TEST(ExecPlanExecution, TableSourceSinkError) {
ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

auto exp_batches = MakeBasicBatches();
ASSERT_OK_AND_ASSIGN(auto table,
TableFromExecBatches(exp_batches.schema, exp_batches.batches));

auto null_table_options = TableSourceNodeOptions{NULLPTR, 1};
ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, null_table_options),
Raises(StatusCode::Invalid, HasSubstr("not null")));

auto negative_batch_size_options = TableSourceNodeOptions{table, -1};
ASSERT_THAT(MakeExecNode("table_source", plan.get(), {}, negative_batch_size_options),
Raises(StatusCode::Invalid, HasSubstr("batch_size > 0")));
}

TEST(ExecPlanExecution, SinkNodeBackpressure) {
constexpr uint32_t kPauseIfAbove = 4;
constexpr uint32_t kResumeIfBelow = 2;
Expand Down
71 changes: 71 additions & 0 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/compute/exec_internal.h"
#include "arrow/datum.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -34,10 +35,12 @@
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"
#include "arrow/util/unreachable.h"
#include "arrow/util/vector.h"

namespace arrow {

using internal::checked_cast;
using internal::MapVector;

namespace compute {
namespace {
Expand Down Expand Up @@ -174,12 +177,80 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};

struct TableSourceNode : public SourceNode {
TableSourceNode(ExecPlan* plan, std::shared_ptr<Table> table, int64_t batch_size)
: SourceNode(plan, table->schema(), TableGenerator(*table, batch_size)) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
const auto& table = table_options.table;
const int64_t batch_size = table_options.batch_size;

RETURN_NOT_OK(ValidateTableSourceNodeInput(table, batch_size));

return plan->EmplaceNode<TableSourceNode>(plan, table, batch_size);
}

const char* kind_name() const override { return "TableSourceNode"; }

static arrow::Status ValidateTableSourceNodeInput(const std::shared_ptr<Table> table,
const int64_t batch_size) {
if (table == nullptr) {
return Status::Invalid("TableSourceNode node requires table which is not null");
}

if (batch_size <= 0) {
return Status::Invalid(
"TableSourceNode node requires, batch_size > 0 , but got batch size ",
batch_size);
}

return Status::OK();
}

static arrow::AsyncGenerator<util::optional<ExecBatch>> TableGenerator(
const Table& table, const int64_t batch_size) {
auto batches = ConvertTableToExecBatches(table, batch_size);
auto opt_batches =
MapVector([](ExecBatch batch) { return util::make_optional(std::move(batch)); },
std::move(batches));
AsyncGenerator<util::optional<ExecBatch>> gen;
gen = MakeVectorGenerator(std::move(opt_batches));
return gen;
}

static std::vector<ExecBatch> ConvertTableToExecBatches(const Table& table,
const int64_t batch_size) {
std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);

// setting chunksize for the batch reader
reader->set_chunksize(batch_size);

std::shared_ptr<RecordBatch> batch;
std::vector<ExecBatch> exec_batches;
while (true) {
auto batch_res = reader->Next();
if (batch_res.ok()) {
batch = std::move(batch_res).MoveValueUnsafe();
}
if (batch == NULLPTR) {
break;
}
exec_batches.emplace_back(*batch);
}
return exec_batches;
}
};

} // namespace

namespace internal {

void RegisterSourceNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("source", SourceNode::Make));
DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make));
}

} // namespace internal
Expand Down