diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index efbdc952dfb..ccab33895c6 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -394,6 +394,7 @@ if(ARROW_COMPUTE) compute/exec/sink_node.cc compute/exec/source_node.cc compute/exec/task_util.cc + compute/exec/table_source_node.cc compute/exec/union_node.cc compute/exec/util.cc compute/function.cc diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 527cbf4512d..b1521dc6425 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -509,6 +509,7 @@ void RegisterUnionNode(ExecFactoryRegistry*); void RegisterAggregateNode(ExecFactoryRegistry*); void RegisterSinkNode(ExecFactoryRegistry*); void RegisterHashJoinNode(ExecFactoryRegistry*); +void RegisterTableSourceNode(ExecFactoryRegistry*); } // namespace internal @@ -523,6 +524,7 @@ ExecFactoryRegistry* default_exec_factory_registry() { internal::RegisterAggregateNode(this); internal::RegisterSinkNode(this); internal::RegisterHashJoinNode(this); + internal::RegisterTableSourceNode(this); } Result GetFactory(const std::string& factory_name) override { diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 2723c4454c0..33fd0f05af3 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -52,6 +52,17 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; +/// \brief Adapt an Table as a source node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { + public: + TableSourceNodeOptions(std::shared_ptr table) : table(table) {} + + std::shared_ptr
table; +}; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 258238dbb81..4e0003810de 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -238,6 +238,32 @@ TEST(ExecPlanExecution, SourceSink) { } } +TEST(ExecPlanExecution, TableSourceSink) { + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exec_batches = MakeBasicBatches(); + auto table = TableFromExecBatches(exec_batches.schema, exec_batches.batches) + + ASSERT_OK(Declaration::Sequence( + { + {"table", TableNodeOptions{table}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches)))); + } + } +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { constexpr uint32_t kPauseIfAbove = 4; constexpr uint32_t kResumeIfBelow = 2; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 46bba5609d4..4c16f10f219 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -25,6 +25,7 @@ #include "arrow/compute/exec_internal.h" #include "arrow/datum.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -33,10 +34,12 @@ #include "arrow/util/optional.h" #include "arrow/util/thread_pool.h" #include "arrow/util/unreachable.h" +#include "arrow/util/vector.h" namespace arrow { using internal::checked_cast; +using internal::MapVector; namespace compute { namespace { @@ -169,6 +172,65 @@ struct SourceNode : ExecNode { AsyncGenerator> generator_; }; +struct TableSourceNode : public SourceNode { + TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, + std::shared_ptr
table) + : SourceNode(plan, output_schema, + generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); + const auto& table_options = checked_cast(options); + return plan->EmplaceNode(plan, table_options.table->schema(), + table_options.table); + } + const char* kind_name() const override { return "TableSourceNode"; } + + [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override { + SourceNode::InputReceived(input, batch); + } + [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override { + SourceNode::ErrorReceived(input, status); + } + [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override { + SourceNode::InputFinished(input, total_batches); + } + + Status StartProducing() override { return SourceNode::StartProducing(); } + + void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); } + + void StopProducing() override { SourceNode::StopProducing(); } + + Future<> finished() override { return SourceNode::finished(); } + + arrow::AsyncGenerator> generator( + std::vector batches) { + auto opt_batches = MapVector( + [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); + AsyncGenerator> gen; + gen = MakeVectorGenerator(std::move(opt_batches)); + return gen; + } + + arrow::Result> ConvertTableToExecBatches(const Table& table) { + std::shared_ptr reader = std::make_shared(table); + std::shared_ptr batch; + std::vector> batch_vector; + std::vector exec_batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + if (batch == NULLPTR) { + break; + } + ExecBatch exec_batch{*batch}; + exec_batches.push_back(exec_batch); + } + return exec_batches; + } +}; + } // namespace namespace internal { @@ -177,6 +239,10 @@ void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); } +void RegisterTableSourceNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); +} + } // namespace internal } // namespace compute } // namespace arrow diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d6cf785a650..a8487578587 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -696,8 +696,8 @@ dataset___Scanner__schema <- function(sc) { .Call(`_arrow_dataset___Scanner__schema`, sc) } -dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions) { - invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions)) +dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) } dataset___Scanner__TakeRows <- function(scanner, indices) { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 6185f045eb3..f15b8ecd544 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -50,6 +50,16 @@ #' partitions which data is not written to. #' @param max_partitions maximum number of partitions any batch may be #' written into. Default is 1024L. +#' @param max_open_files maximum number of files that can be left opened +#' during a write operation. Default is 900L. +#' @param max_rows_per_file maximum limit how many rows are placed in +#' any single file +#' @param min_rows_per_group write the row groups to the disk when sufficient +#' rows have accumulated. +#' @param max_rows_per_group maximum number of row groups allowed in a single +#' group and when the rows execeeds, it is splitted and excess is written to +#' another group. This value must be set such that it is greater than +#' min_rows_per_group. #' @param ... additional format-specific arguments. For available Parquet #' options, see [write_parquet()]. The available Feather options are #' - `use_legacy_format` logical: write data formatted so that Arrow libraries @@ -111,6 +121,10 @@ write_dataset <- function(dataset, hive_style = TRUE, existing_data_behavior = c("overwrite", "error", "delete_matching"), max_partitions = 1024L, + max_open_files = 900L, + max_rows_per_file = 0L, + min_rows_per_group = 0L, + max_rows_per_group = bitwShiftL(1, 20), ...) { format <- match.arg(format) if (inherits(dataset, "arrow_dplyr_query")) { @@ -146,6 +160,8 @@ write_dataset <- function(dataset, dataset___Dataset__Write( options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, scanner, - existing_data_behavior, max_partitions + existing_data_behavior, max_partitions, + max_open_files, max_rows_per_file, + min_rows_per_group, max_rows_per_group ) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 3e4196421c9..5014ac5d0a0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2757,8 +2757,8 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp){ +void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ BEGIN_CPP11 arrow::r::Input&>::type file_write_options(file_write_options_sexp); arrow::r::Input&>::type filesystem(filesystem_sexp); @@ -2768,12 +2768,16 @@ BEGIN_CPP11 arrow::r::Input&>::type scanner(scanner_sexp); arrow::r::Input::type existing_data_behavior(existing_data_behavior_sexp); arrow::r::Input::type max_partitions(max_partitions_sexp); - dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions); + arrow::r::Input::type max_open_files(max_open_files_sexp); + arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); + arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); + arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); + dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp){ +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7543,7 +7547,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, - { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 8}, + { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index b58224741a1..51e444986f8 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -517,7 +517,9 @@ void dataset___Dataset__Write( const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, - arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions) { + arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, + uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { ds::FileSystemDatasetWriteOptions opts; opts.file_write_options = file_write_options; opts.existing_data_behavior = existing_data_behavior; @@ -526,6 +528,10 @@ void dataset___Dataset__Write( opts.partitioning = partitioning; opts.basename_template = basename_template; opts.max_partitions = max_partitions; + opts.max_open_files = max_open_files; + opts.max_rows_per_file = max_rows_per_file; + opts.min_rows_per_group = min_rows_per_group; + opts.max_rows_per_group = max_rows_per_group; StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner)); } diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index ef6c719afe6..a93a57c0687 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -505,3 +505,145 @@ test_that("Max partitions fails with non-integer values and less than required p "max_partitions must be a positive, non-missing integer" ) }) + +get_num_of_files = function(dir, format) { + files <- list.files(dir, pattern = paste('.', format, sep=""), + all.files = FALSE, recursive = TRUE, full.names = TRUE) + length(files) +} + +test_that("Dataset write max open files", { + skip_if_not_available("parquet") + # test default partitioning + dst_dir <- make_temp_dir() + file_format <- "parquet" + partitioning <- c("c2") + num_of_unique_c2_groups = 5 + + record_batch_1 <- record_batch(c1=c(1, 2, 3, 4, 0, 10), + c2=c('a', 'b', 'c', 'd', 'e', 'a')) + record_batch_2 <- record_batch(c1=c(5, 6, 7, 8, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'c')) + record_batch_3 <- record_batch(c1=c(9, 10, 11, 12, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'd')) + record_batch_4 <- record_batch(c1=c(13, 14, 15, 16, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'b')) + + table = Table$create(d1=record_batch_1, d2=record_batch_2, + d3=record_batch_3, d4=record_batch_4) + + write_dataset(table, path=dst_dir, format=file_format, partitioning=partitioning) + + # reduce 1 from the length of list of directories, since it list the search path) + expect_equal(length(list.dirs(dst_dir)) - 1, num_of_unique_c2_groups) + + max_open_files = 3 + dst_dir <- make_temp_dir() + write_dataset(table, path=dst_dir, format=file_format, partitioning=partitioning, max_open_files=max_open_files) + + expect_gt(get_num_of_files(dst_dir, file_format), max_open_files) +}) + + +test_that("Dataset write max rows per files", { + skip_if_not_available("parquet") + num_of_records = 35 + df <- tibble::tibble( + int = 1:num_of_records, + dbl = as.numeric(1:num_of_records), + lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 7), + chr = rep(letters[1:7], 5), + ) + table = Table$create(df) + max_rows_per_file = 10 + max_rows_per_group = 10 + dst_dir <- make_temp_dir() + file_format <- "parquet" + + write_dataset(table, path=dst_dir, format=file_format, max_rows_per_file=max_rows_per_file, + max_rows_per_group=max_rows_per_group) + + expected_partitions = num_of_records %/% max_rows_per_file + 1 + written_files = list.files(dst_dir) + result_partitions = length(written_files) + + expect_equal(expected_partitions, result_partitions) + total_records = 0 + for (file in written_files) { + file_path = paste(dst_dir, file, sep="/") + ds = read_parquet(file_path) + cur_records = nrow(ds) + expect_lte(cur_records, max_rows_per_file) + total_records = total_records + cur_records + } + expect_equal(total_records, num_of_records) +}) + +test_that("Dataset min_rows_per_group", { + skip_if_not_available("parquet") + rb1 <- record_batch(c1=c(1, 2, 3, 4), + c2=c('a', 'b', 'e', 'a')) + rb2 <- record_batch(c1=c(5, 6, 7, 8, 9), + c2=c('a', 'b', 'c', 'd', 'h')) + rb3 <- record_batch(c1=c(9, 10, 11, 12, 0, 1, 100, 200, 300), + c2=c('a', 'b', 'c', 'd', 'e', 'd', 'g', 'h', 'i')) + rb4 <- record_batch(c1=c(13, 14, 15, 16, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'b')) + + dataset = Table$create(d1=rb1, d2=rb2, d3=rb3, d4=rb4) + + dst_dir <- make_temp_dir() + min_rows_per_group = 4 + max_rows_per_group = 5 + + write_dataset(dataset, + min_rows_per_group = min_rows_per_group, + max_rows_per_group = max_rows_per_group, + path = dst_dir) + + ds = open_dataset(file_path) + + row_group_sizes <- ds %>% + select() %>% + map_batches(~ .$num_rows, .data.frame = FALSE) %>% + unlist() + index = 1 + for(row_group_size in row_group_sizes) { + if (length(row_group_sizes) < index) { + expect_gte(row_group_size, min_rows_per_group) + expect_lte(row_group_size, max_rows_per_group) + } else { + expect_lte(row_group_size, max_rows_per_group) + } + index = index + 1 + } +}) + +test_that("Dataset write max rows per group", { + skip_if_not_available("parquet") + num_of_records = 30 + max_rows_per_group = 18 + df <- tibble::tibble( + int = 1:num_of_records, + dbl = as.numeric(1:num_of_records), + ) + table = Table$create(df) + dst_dir <- make_temp_dir() + file_format <- "parquet" + + write_dataset(table, path=dst_dir, format=file_format, max_rows_per_group=max_rows_per_group) + + written_files = list.files(dst_dir) + record_combination = list() + + # writes only to a single file with multiple groups + file_path = paste(dst_dir, written_files[[1]], sep="/") + ds = open_dataset(file_path) + row_group_sizes <- ds %>% + select() %>% + map_batches(~ .$num_rows, .data.frame = FALSE) %>% + unlist() %>% # Returns list because .data.frame is FALSE + sort() + + expect_equal(row_group_sizes, c(12, 18)) +})