Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ if(ARROW_COMPUTE)
compute/exec/sink_node.cc
compute/exec/source_node.cc
compute/exec/task_util.cc
compute/exec/table_source_node.cc
compute/exec/union_node.cc
compute/exec/util.cc
compute/function.cc
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ void RegisterUnionNode(ExecFactoryRegistry*);
void RegisterAggregateNode(ExecFactoryRegistry*);
void RegisterSinkNode(ExecFactoryRegistry*);
void RegisterHashJoinNode(ExecFactoryRegistry*);
void RegisterTableSourceNode(ExecFactoryRegistry*);

} // namespace internal

Expand All @@ -523,6 +524,7 @@ ExecFactoryRegistry* default_exec_factory_registry() {
internal::RegisterAggregateNode(this);
internal::RegisterSinkNode(this);
internal::RegisterHashJoinNode(this);
internal::RegisterTableSourceNode(this);
}

Result<Factory> GetFactory(const std::string& factory_name) override {
Expand Down
11 changes: 11 additions & 0 deletions cpp/src/arrow/compute/exec/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,17 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions {
std::function<Future<util::optional<ExecBatch>>()> generator;
};

/// \brief Adapt an Table as a source node
///
/// plan->exec_context()->executor() will be used to parallelize pushing to
/// outputs, if provided.
class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
public:
TableSourceNodeOptions(std::shared_ptr<Table> table) : table(table) {}

std::shared_ptr<Table> table;
};

/// \brief Make a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
Expand Down
26 changes: 26 additions & 0 deletions cpp/src/arrow/compute/exec/plan_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,32 @@ TEST(ExecPlanExecution, SourceSink) {
}
}

TEST(ExecPlanExecution, TableSourceSink) {
for (bool slow : {false, true}) {
SCOPED_TRACE(slow ? "slowed" : "unslowed");

for (bool parallel : {false, true}) {
SCOPED_TRACE(parallel ? "parallel" : "single threaded");

ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make());
AsyncGenerator<util::optional<ExecBatch>> sink_gen;

auto exec_batches = MakeBasicBatches();
auto table = TableFromExecBatches(exec_batches.schema, exec_batches.batches)

ASSERT_OK(Declaration::Sequence(
{
{"table", TableNodeOptions{table}},
{"sink", SinkNodeOptions{&sink_gen}},
})
.AddToPlan(plan.get()));

ASSERT_THAT(StartAndCollect(plan.get(), sink_gen),
Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches))));
}
}
}

TEST(ExecPlanExecution, SinkNodeBackpressure) {
constexpr uint32_t kPauseIfAbove = 4;
constexpr uint32_t kResumeIfBelow = 2;
Expand Down
66 changes: 66 additions & 0 deletions cpp/src/arrow/compute/exec/source_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "arrow/compute/exec_internal.h"
#include "arrow/datum.h"
#include "arrow/result.h"
#include "arrow/table.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
#include "arrow/util/checked_cast.h"
Expand All @@ -33,10 +34,12 @@
#include "arrow/util/optional.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/unreachable.h"
#include "arrow/util/vector.h"

namespace arrow {

using internal::checked_cast;
using internal::MapVector;

namespace compute {
namespace {
Expand Down Expand Up @@ -169,6 +172,65 @@ struct SourceNode : ExecNode {
AsyncGenerator<util::optional<ExecBatch>> generator_;
};

struct TableSourceNode : public SourceNode {
TableSourceNode(ExecPlan* plan, std::shared_ptr<Schema> output_schema,
std::shared_ptr<Table> table)
: SourceNode(plan, output_schema,
generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())) {}

static Result<ExecNode*> Make(ExecPlan* plan, std::vector<ExecNode*> inputs,
const ExecNodeOptions& options) {
RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode"));
const auto& table_options = checked_cast<const TableSourceNodeOptions&>(options);
return plan->EmplaceNode<TableSourceNode>(plan, table_options.table->schema(),
table_options.table);
}
const char* kind_name() const override { return "TableSourceNode"; }

[[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override {
SourceNode::InputReceived(input, batch);
}
[[noreturn]] void ErrorReceived(ExecNode* input, Status status) override {
SourceNode::ErrorReceived(input, status);
}
[[noreturn]] void InputFinished(ExecNode* input, int total_batches) override {
SourceNode::InputFinished(input, total_batches);
}

Status StartProducing() override { return SourceNode::StartProducing(); }

void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); }

void StopProducing() override { SourceNode::StopProducing(); }

Future<> finished() override { return SourceNode::finished(); }

arrow::AsyncGenerator<util::optional<ExecBatch>> generator(
std::vector<ExecBatch> batches) {
auto opt_batches = MapVector(
[](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches);
AsyncGenerator<util::optional<ExecBatch>> gen;
gen = MakeVectorGenerator(std::move(opt_batches));
return gen;
}

arrow::Result<std::vector<ExecBatch>> ConvertTableToExecBatches(const Table& table) {
std::shared_ptr<TableBatchReader> reader = std::make_shared<TableBatchReader>(table);
std::shared_ptr<arrow::RecordBatch> batch;
std::vector<std::shared_ptr<arrow::RecordBatch>> batch_vector;
std::vector<ExecBatch> exec_batches;
while (true) {
ARROW_ASSIGN_OR_RAISE(batch, reader->Next());
if (batch == NULLPTR) {
break;
}
ExecBatch exec_batch{*batch};
exec_batches.push_back(exec_batch);
}
return exec_batches;
}
};

} // namespace

namespace internal {
Expand All @@ -177,6 +239,10 @@ void RegisterSourceNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("source", SourceNode::Make));
}

void RegisterTableSourceNode(ExecFactoryRegistry* registry) {
DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make));
}

} // namespace internal
} // namespace compute
} // namespace arrow
4 changes: 2 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 17 additions & 1 deletion r/R/dataset-write.R
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
#' partitions which data is not written to.
#' @param max_partitions maximum number of partitions any batch may be
#' written into. Default is 1024L.
#' @param max_open_files maximum number of files that can be left opened
#' during a write operation. Default is 900L.
#' @param max_rows_per_file maximum limit how many rows are placed in
#' any single file
#' @param min_rows_per_group write the row groups to the disk when sufficient
#' rows have accumulated.
#' @param max_rows_per_group maximum number of row groups allowed in a single
#' group and when the rows execeeds, it is splitted and excess is written to
#' another group. This value must be set such that it is greater than
#' min_rows_per_group.
#' @param ... additional format-specific arguments. For available Parquet
#' options, see [write_parquet()]. The available Feather options are
#' - `use_legacy_format` logical: write data formatted so that Arrow libraries
Expand Down Expand Up @@ -111,6 +121,10 @@ write_dataset <- function(dataset,
hive_style = TRUE,
existing_data_behavior = c("overwrite", "error", "delete_matching"),
max_partitions = 1024L,
max_open_files = 900L,
max_rows_per_file = 0L,
min_rows_per_group = 0L,
max_rows_per_group = bitwShiftL(1, 20),
...) {
format <- match.arg(format)
if (inherits(dataset, "arrow_dplyr_query")) {
Expand Down Expand Up @@ -146,6 +160,8 @@ write_dataset <- function(dataset,
dataset___Dataset__Write(
options, path_and_fs$fs, path_and_fs$path,
partitioning, basename_template, scanner,
existing_data_behavior, max_partitions
existing_data_behavior, max_partitions,
max_open_files, max_rows_per_file,
min_rows_per_group, max_rows_per_group
)
}
14 changes: 9 additions & 5 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 7 additions & 1 deletion r/src/dataset.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,9 @@ void dataset___Dataset__Write(
const std::shared_ptr<fs::FileSystem>& filesystem, std::string base_dir,
const std::shared_ptr<ds::Partitioning>& partitioning, std::string basename_template,
const std::shared_ptr<ds::Scanner>& scanner,
arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions) {
arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions,
uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group,
uint64_t max_rows_per_group) {
ds::FileSystemDatasetWriteOptions opts;
opts.file_write_options = file_write_options;
opts.existing_data_behavior = existing_data_behavior;
Expand All @@ -526,6 +528,10 @@ void dataset___Dataset__Write(
opts.partitioning = partitioning;
opts.basename_template = basename_template;
opts.max_partitions = max_partitions;
opts.max_open_files = max_open_files;
opts.max_rows_per_file = max_rows_per_file;
opts.min_rows_per_group = min_rows_per_group;
opts.max_rows_per_group = max_rows_per_group;
StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner));
}

Expand Down
Loading