From 11f7352f01bbb08d4bc4d8999dfc2481edb6d1aa Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 19 Jan 2022 10:31:40 +0530 Subject: [PATCH 1/8] initial commit on adding bindings for write options --- r/R/arrowExports.R | 4 ++-- r/R/dataset-write.R | 8 +++++++- r/src/arrowExports.cpp | 14 +++++++++----- r/src/dataset.cpp | 8 +++++++- 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d6cf785a650..a8487578587 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -696,8 +696,8 @@ dataset___Scanner__schema <- function(sc) { .Call(`_arrow_dataset___Scanner__schema`, sc) } -dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions) { - invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions)) +dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) } dataset___Scanner__TakeRows <- function(scanner, indices) { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 6185f045eb3..25a54e7b93a 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -111,6 +111,10 @@ write_dataset <- function(dataset, hive_style = TRUE, existing_data_behavior = c("overwrite", "error", "delete_matching"), max_partitions = 1024L, + max_open_files = 900L, + max_rows_per_file = 0L, + min_rows_per_group = 0L, + max_rows_per_group = bitwShiftL(1, 20), ...) { format <- match.arg(format) if (inherits(dataset, "arrow_dplyr_query")) { @@ -146,6 +150,8 @@ write_dataset <- function(dataset, dataset___Dataset__Write( options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, scanner, - existing_data_behavior, max_partitions + existing_data_behavior, max_partitions, + max_open_files, max_rows_per_file, + min_rows_per_group, max_rows_per_group ) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 3e4196421c9..5014ac5d0a0 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -2757,8 +2757,8 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ // dataset.cpp #if defined(ARROW_R_WITH_DATASET) -void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp){ +void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ BEGIN_CPP11 arrow::r::Input&>::type file_write_options(file_write_options_sexp); arrow::r::Input&>::type filesystem(filesystem_sexp); @@ -2768,12 +2768,16 @@ BEGIN_CPP11 arrow::r::Input&>::type scanner(scanner_sexp); arrow::r::Input::type existing_data_behavior(existing_data_behavior_sexp); arrow::r::Input::type max_partitions(max_partitions_sexp); - dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions); + arrow::r::Input::type max_open_files(max_open_files_sexp); + arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); + arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); + arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); + dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp){ +extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7543,7 +7547,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, - { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 8}, + { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index b58224741a1..51e444986f8 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -517,7 +517,9 @@ void dataset___Dataset__Write( const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, - arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions) { + arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, + uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { ds::FileSystemDatasetWriteOptions opts; opts.file_write_options = file_write_options; opts.existing_data_behavior = existing_data_behavior; @@ -526,6 +528,10 @@ void dataset___Dataset__Write( opts.partitioning = partitioning; opts.basename_template = basename_template; opts.max_partitions = max_partitions; + opts.max_open_files = max_open_files; + opts.max_rows_per_file = max_rows_per_file; + opts.min_rows_per_group = min_rows_per_group; + opts.max_rows_per_group = max_rows_per_group; StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner)); } From 407605b9bf20afa4f80c3b1a299761921b616993 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 19 Jan 2022 11:33:44 +0530 Subject: [PATCH 2/8] adding max open files test case --- r/tests/testthat/test-dataset-write.R | 38 +++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index ef6c719afe6..cdfe7186e1b 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -505,3 +505,41 @@ test_that("Max partitions fails with non-integer values and less than required p "max_partitions must be a positive, non-missing integer" ) }) + +test_that("Dataset write max open files", { + skip_if_not_available("parquet") + # test default partitioning + dst_dir <- make_temp_dir() + file_format <- "parquet" + partitioning <- c("c2") + num_of_unique_c2_groups = 5 + + get_num_of_files = function(dir, format) { + files <- list.files(dir, pattern = paste('.', format, sep=""), + all.files = FALSE, recursive = TRUE, full.names = TRUE) + length(files) + } + + record_batch_1 <- record_batch(c1=c(1, 2, 3, 4, 0, 10), + c2=c('a', 'b', 'c', 'd', 'e', 'a')) + record_batch_2 <- record_batch(c1=c(5, 6, 7, 8, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'c')) + record_batch_3 <- record_batch(c1=c(9, 10, 11, 12, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'd')) + record_batch_4 <- record_batch(c1=c(13, 14, 15, 16, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'b')) + + table = Table$create(d1=record_batch_1, d2=record_batch_2, + d3=record_batch_3, d4=record_batch_4) + + write_dataset(table, path=dst_dir, format=file_format, partitioning=partitioning) + + # reduce 1 from the length of list of directories, since it list the search path) + expect_equal(length(list.dirs(dst_dir)) - 1, num_of_unique_c2_groups) + + max_open_files = 3 + dst_dir <- make_temp_dir() + write_dataset(table, path=dst_dir, format=file_format, partitioning=partitioning, max_open_files=max_open_files) + + expect_gt(get_num_of_files(dst_dir, file_format), max_open_files) +}) From 899c381c37d55e9e67d0e0612052885773723a7b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 19 Jan 2022 12:20:31 +0530 Subject: [PATCH 3/8] adding max rows per file test case --- r/tests/testthat/test-dataset-write.R | 44 +++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 6 deletions(-) diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index cdfe7186e1b..ef58f005be5 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -506,6 +506,12 @@ test_that("Max partitions fails with non-integer values and less than required p ) }) +get_num_of_files = function(dir, format) { + files <- list.files(dir, pattern = paste('.', format, sep=""), + all.files = FALSE, recursive = TRUE, full.names = TRUE) + length(files) +} + test_that("Dataset write max open files", { skip_if_not_available("parquet") # test default partitioning @@ -514,12 +520,6 @@ test_that("Dataset write max open files", { partitioning <- c("c2") num_of_unique_c2_groups = 5 - get_num_of_files = function(dir, format) { - files <- list.files(dir, pattern = paste('.', format, sep=""), - all.files = FALSE, recursive = TRUE, full.names = TRUE) - length(files) - } - record_batch_1 <- record_batch(c1=c(1, 2, 3, 4, 0, 10), c2=c('a', 'b', 'c', 'd', 'e', 'a')) record_batch_2 <- record_batch(c1=c(5, 6, 7, 8, 0, 1), @@ -543,3 +543,35 @@ test_that("Dataset write max open files", { expect_gt(get_num_of_files(dst_dir, file_format), max_open_files) }) + + +test_that("Dataset write max rows per files", { + skip_if_not_available("parquet") + num_of_records = 35 + df <- tibble::tibble( + int = 1:num_of_records, + dbl = as.numeric(1:num_of_records), + lgl = rep(c(TRUE, FALSE, NA, TRUE, FALSE), 7), + chr = rep(letters[1:7], 5), + ) + table = Table$create(df) + max_rows_per_file = 10 + max_rows_per_group = 10 + dst_dir <- make_temp_dir() + file_format <- "parquet" + + write_dataset(table, path=dst_dir, format=file_format, max_rows_per_file=max_rows_per_file, + max_rows_per_group=max_rows_per_group) + + expected_partitions = num_of_records %/% max_rows_per_file + 1 + written_files = list.files(dst_dir) + result_partitions = length(written_files) + + expect_equal(expected_partitions, result_partitions) + + for (file in written_files) { + file_path = paste(dst_dir, file, sep="/") + ds = read_parquet(file_path) + expect_lte(length(ds), max_rows_per_file) + } +}) \ No newline at end of file From fe94738590b1aac39e6cc405cdba1dbed9b7a628 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 19 Jan 2022 15:26:55 +0530 Subject: [PATCH 4/8] adding test cases for max rows per file, max rows per group --- r/tests/testthat/test-dataset-write.R | 36 +++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index ef58f005be5..c60d5a0b9d7 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -568,10 +568,42 @@ test_that("Dataset write max rows per files", { result_partitions = length(written_files) expect_equal(expected_partitions, result_partitions) - + total_records = 0 for (file in written_files) { file_path = paste(dst_dir, file, sep="/") ds = read_parquet(file_path) - expect_lte(length(ds), max_rows_per_file) + cur_records = nrow(ds) + expect_lte(cur_records, max_rows_per_file) + total_records = total_records + cur_records } + expect_equal(total_records, num_of_records) +}) + +test_that("Dataset write max rows per files", { + skip_if_not_available("parquet") + num_of_records = 30 + max_rows_per_group = 18 + df <- tibble::tibble( + int = 1:num_of_records, + dbl = as.numeric(1:num_of_records), + ) + table = Table$create(df) + dst_dir <- make_temp_dir() + file_format <- "parquet" + + write_dataset(table, path=dst_dir, format=file_format, max_rows_per_group=max_rows_per_group) + + written_files = list.files(dst_dir) + record_combination = list() + + # writes only to a single file with multiple groups + file_path = paste(dst_dir, written_files[[1]], sep="/") + ds = open_dataset(file_path) + row_group_sizes <- ds %>% + select() %>% + map_batches(~ .$num_rows, .data.frame = FALSE) %>% + unlist() %>% # Returns list because .data.frame is FALSE + sort() + + expect_equal(row_group_sizes, c(12, 18)) }) \ No newline at end of file From 80d1868aa2922256fcf13822ddb5a6c0b4e6bfb6 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 25 Jan 2022 11:29:36 +0530 Subject: [PATCH 5/8] adding min rows per group test case --- r/R/dataset-write.R | 10 ++++++ r/tests/testthat/test-dataset-write.R | 44 +++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 2 deletions(-) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 25a54e7b93a..f15b8ecd544 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -50,6 +50,16 @@ #' partitions which data is not written to. #' @param max_partitions maximum number of partitions any batch may be #' written into. Default is 1024L. +#' @param max_open_files maximum number of files that can be left opened +#' during a write operation. Default is 900L. +#' @param max_rows_per_file maximum limit how many rows are placed in +#' any single file +#' @param min_rows_per_group write the row groups to the disk when sufficient +#' rows have accumulated. +#' @param max_rows_per_group maximum number of row groups allowed in a single +#' group and when the rows execeeds, it is splitted and excess is written to +#' another group. This value must be set such that it is greater than +#' min_rows_per_group. #' @param ... additional format-specific arguments. For available Parquet #' options, see [write_parquet()]. The available Feather options are #' - `use_legacy_format` logical: write data formatted so that Arrow libraries diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index c60d5a0b9d7..a93a57c0687 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -579,7 +579,47 @@ test_that("Dataset write max rows per files", { expect_equal(total_records, num_of_records) }) -test_that("Dataset write max rows per files", { +test_that("Dataset min_rows_per_group", { + skip_if_not_available("parquet") + rb1 <- record_batch(c1=c(1, 2, 3, 4), + c2=c('a', 'b', 'e', 'a')) + rb2 <- record_batch(c1=c(5, 6, 7, 8, 9), + c2=c('a', 'b', 'c', 'd', 'h')) + rb3 <- record_batch(c1=c(9, 10, 11, 12, 0, 1, 100, 200, 300), + c2=c('a', 'b', 'c', 'd', 'e', 'd', 'g', 'h', 'i')) + rb4 <- record_batch(c1=c(13, 14, 15, 16, 0, 1), + c2=c('a', 'b', 'c', 'd', 'e', 'b')) + + dataset = Table$create(d1=rb1, d2=rb2, d3=rb3, d4=rb4) + + dst_dir <- make_temp_dir() + min_rows_per_group = 4 + max_rows_per_group = 5 + + write_dataset(dataset, + min_rows_per_group = min_rows_per_group, + max_rows_per_group = max_rows_per_group, + path = dst_dir) + + ds = open_dataset(file_path) + + row_group_sizes <- ds %>% + select() %>% + map_batches(~ .$num_rows, .data.frame = FALSE) %>% + unlist() + index = 1 + for(row_group_size in row_group_sizes) { + if (length(row_group_sizes) < index) { + expect_gte(row_group_size, min_rows_per_group) + expect_lte(row_group_size, max_rows_per_group) + } else { + expect_lte(row_group_size, max_rows_per_group) + } + index = index + 1 + } +}) + +test_that("Dataset write max rows per group", { skip_if_not_available("parquet") num_of_records = 30 max_rows_per_group = 18 @@ -606,4 +646,4 @@ test_that("Dataset write max rows per files", { sort() expect_equal(row_group_sizes, c(12, 18)) -}) \ No newline at end of file +}) From e7d4e62825ec17106308138326b95ee48b3fc22b Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Tue, 25 Jan 2022 15:13:39 +0530 Subject: [PATCH 6/8] adding initial interface test --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/compute/exec/options.h | 21 ++ .../arrow/compute/exec/table_source_node.cc | 182 ++++++++++++++++++ 3 files changed, 204 insertions(+) create mode 100644 cpp/src/arrow/compute/exec/table_source_node.cc diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index efbdc952dfb..ccab33895c6 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -394,6 +394,7 @@ if(ARROW_COMPUTE) compute/exec/sink_node.cc compute/exec/source_node.cc compute/exec/task_util.cc + compute/exec/table_source_node.cc compute/exec/union_node.cc compute/exec/util.cc compute/function.cc diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 2723c4454c0..99852bdfdbb 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -270,5 +270,26 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { SelectKOptions select_k_options; }; + +/// \brief Adapt an Table as a source node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { + public: + explicit TableSourceNodeOptions(arrow::Table *table) : _table(table) {} + + arrow::Table* _table; +}; + +/// \brief Adapt an Table as a source node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT DummyNodeOptions : public ExecNodeOptions { + public: + explicit DummyNodeOptions(); +}; + } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/table_source_node.cc b/cpp/src/arrow/compute/exec/table_source_node.cc new file mode 100644 index 00000000000..76ee8b3ed32 --- /dev/null +++ b/cpp/src/arrow/compute/exec/table_source_node.cc @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "arrow/compute/exec.h" +#include "arrow/compute/exec/exec_plan.h" +#include "arrow/compute/exec/expression.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/util.h" +#include "arrow/compute/exec_internal.h" +#include "arrow/datum.h" +#include "arrow/result.h" +#include "arrow/util/async_generator.h" +#include "arrow/util/async_util.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/future.h" +#include "arrow/util/logging.h" +#include "arrow/util/optional.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/unreachable.h" + +namespace arrow { + +using internal::checked_cast; + +namespace compute { +namespace { + +struct TableSourceNode : ExecNode { + TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, + std::shared_ptr table) : ExecNode(plan, {}, {}, + std::move(output_schema), /*num_outputs=*/1), table_(std::move(table)) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); + const auto& source_options = checked_cast(options); + return plan->EmplaceNode(plan, source_options.table.schema(), + source_options.table); + } + + const char* kind_name() const override { return "TableSourceNode"; } + + [[noreturn]] static void NoInputs() { + Unreachable("no inputs; this should never be called"); + } + [[noreturn]] void InputReceived(ExecNode*, ExecBatch) override { NoInputs(); } + [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); } + [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } + + Status StartProducing() override { + { + // If another exec node encountered an error during its StartProducing call + // it might have already called StopProducing on all of its inputs (including this + // node). + // + std::unique_lock lock(mutex_); + if (stop_requested_) { + return Status::OK(); + } + } + + CallbackOptions options; + auto executor = plan()->exec_context()->executor(); + if (executor) { + // These options will transfer execution to the desired Executor if necessary. + // This can happen for in-memory scans where batches didn't require + // any CPU work to decode. Otherwise, parsing etc should have already + // been placed us on the desired Executor and no queues will be pushed to. + options.executor = executor; + options.should_schedule = ShouldSchedule::IfDifferentExecutor; + } + finished_ = + Loop([this, executor, options] { + std::unique_lock lock(mutex_); + int total_batches = batch_count_++; + if (stop_requested_) { + return Future>::MakeFinished(Break(total_batches)); + } + lock.unlock(); + + return generator_().Then( + [=](const util::optional& maybe_batch) -> ControlFlow { + std::unique_lock lock(mutex_); + if (IsIterationEnd(maybe_batch) || stop_requested_) { + stop_requested_ = true; + return Break(total_batches); + } + lock.unlock(); + ExecBatch batch = std::move(*maybe_batch); + + if (executor) { + auto status = + task_group_.AddTask([this, executor, batch]() -> Result> { + return executor->Submit([=]() { + outputs_[0]->InputReceived(this, std::move(batch)); + return Status::OK(); + }); + }); + if (!status.ok()) { + outputs_[0]->ErrorReceived(this, std::move(status)); + return Break(total_batches); + } + } else { + outputs_[0]->InputReceived(this, std::move(batch)); + } + return Continue(); + }, + [=](const Status& error) -> ControlFlow { + // NB: ErrorReceived is independent of InputFinished, but + // ErrorReceived will usually prompt StopProducing which will + // prompt InputFinished. ErrorReceived may still be called from a + // node which was requested to stop (indeed, the request to stop + // may prompt an error). + std::unique_lock lock(mutex_); + stop_requested_ = true; + lock.unlock(); + outputs_[0]->ErrorReceived(this, error); + return Break(total_batches); + }, + options); + }).Then([&](int total_batches) { + outputs_[0]->InputFinished(this, total_batches); + return task_group_.End(); + }); + + return Status::OK(); + } + + void PauseProducing(ExecNode* output) override {} + + void ResumeProducing(ExecNode* output) override {} + + void StopProducing(ExecNode* output) override { + DCHECK_EQ(output, outputs_[0]); + StopProducing(); + } + + void StopProducing() override { + std::unique_lock lock(mutex_); + stop_requested_ = true; + } + + Future<> finished() override { return finished_; } + + private: + std::mutex mutex_; + bool stop_requested_{false}; + int batch_count_{0}; + Future<> finished_ = Future<>::MakeFinished(); + + std::shared_ptr table_; + util::AsyncTaskGroup task_group_; + AsyncGenerator> generator_; +}; + +} // namespace + +namespace internal { + +void RegisterSourceNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); +} + +} // namespace internal +} // namespace compute +} // namespace arrow From 533adf1d1cc92145581b10f0cf1149e2d8848807 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 13:48:15 +0530 Subject: [PATCH 7/8] initial functional test on table source node --- cpp/src/arrow/compute/exec/exec_plan.cc | 2 + cpp/src/arrow/compute/exec/options.h | 32 ++++------- cpp/src/arrow/compute/exec/plan_test.cc | 26 +++++++++ cpp/src/arrow/compute/exec/source_node.cc | 66 +++++++++++++++++++++++ 4 files changed, 105 insertions(+), 21 deletions(-) diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 527cbf4512d..b1521dc6425 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -509,6 +509,7 @@ void RegisterUnionNode(ExecFactoryRegistry*); void RegisterAggregateNode(ExecFactoryRegistry*); void RegisterSinkNode(ExecFactoryRegistry*); void RegisterHashJoinNode(ExecFactoryRegistry*); +void RegisterTableSourceNode(ExecFactoryRegistry*); } // namespace internal @@ -523,6 +524,7 @@ ExecFactoryRegistry* default_exec_factory_registry() { internal::RegisterAggregateNode(this); internal::RegisterSinkNode(this); internal::RegisterHashJoinNode(this); + internal::RegisterTableSourceNode(this); } Result GetFactory(const std::string& factory_name) override { diff --git a/cpp/src/arrow/compute/exec/options.h b/cpp/src/arrow/compute/exec/options.h index 99852bdfdbb..33fd0f05af3 100644 --- a/cpp/src/arrow/compute/exec/options.h +++ b/cpp/src/arrow/compute/exec/options.h @@ -52,6 +52,17 @@ class ARROW_EXPORT SourceNodeOptions : public ExecNodeOptions { std::function>()> generator; }; +/// \brief Adapt an Table as a source node +/// +/// plan->exec_context()->executor() will be used to parallelize pushing to +/// outputs, if provided. +class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { + public: + TableSourceNodeOptions(std::shared_ptr
table) : table(table) {} + + std::shared_ptr
table; +}; + /// \brief Make a node which excludes some rows from batches passed through it /// /// filter_expression will be evaluated against each batch which is pushed to @@ -270,26 +281,5 @@ class ARROW_EXPORT SelectKSinkNodeOptions : public SinkNodeOptions { SelectKOptions select_k_options; }; - -/// \brief Adapt an Table as a source node -/// -/// plan->exec_context()->executor() will be used to parallelize pushing to -/// outputs, if provided. -class ARROW_EXPORT TableSourceNodeOptions : public ExecNodeOptions { - public: - explicit TableSourceNodeOptions(arrow::Table *table) : _table(table) {} - - arrow::Table* _table; -}; - -/// \brief Adapt an Table as a source node -/// -/// plan->exec_context()->executor() will be used to parallelize pushing to -/// outputs, if provided. -class ARROW_EXPORT DummyNodeOptions : public ExecNodeOptions { - public: - explicit DummyNodeOptions(); -}; - } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/plan_test.cc b/cpp/src/arrow/compute/exec/plan_test.cc index 258238dbb81..4e0003810de 100644 --- a/cpp/src/arrow/compute/exec/plan_test.cc +++ b/cpp/src/arrow/compute/exec/plan_test.cc @@ -238,6 +238,32 @@ TEST(ExecPlanExecution, SourceSink) { } } +TEST(ExecPlanExecution, TableSourceSink) { + for (bool slow : {false, true}) { + SCOPED_TRACE(slow ? "slowed" : "unslowed"); + + for (bool parallel : {false, true}) { + SCOPED_TRACE(parallel ? "parallel" : "single threaded"); + + ASSERT_OK_AND_ASSIGN(auto plan, ExecPlan::Make()); + AsyncGenerator> sink_gen; + + auto exec_batches = MakeBasicBatches(); + auto table = TableFromExecBatches(exec_batches.schema, exec_batches.batches) + + ASSERT_OK(Declaration::Sequence( + { + {"table", TableNodeOptions{table}}, + {"sink", SinkNodeOptions{&sink_gen}}, + }) + .AddToPlan(plan.get())); + + ASSERT_THAT(StartAndCollect(plan.get(), sink_gen), + Finishes(ResultWith(UnorderedElementsAreArray(basic_data.batches)))); + } + } +} + TEST(ExecPlanExecution, SinkNodeBackpressure) { constexpr uint32_t kPauseIfAbove = 4; constexpr uint32_t kResumeIfBelow = 2; diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 46bba5609d4..4c16f10f219 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -25,6 +25,7 @@ #include "arrow/compute/exec_internal.h" #include "arrow/datum.h" #include "arrow/result.h" +#include "arrow/table.h" #include "arrow/util/async_generator.h" #include "arrow/util/async_util.h" #include "arrow/util/checked_cast.h" @@ -33,10 +34,12 @@ #include "arrow/util/optional.h" #include "arrow/util/thread_pool.h" #include "arrow/util/unreachable.h" +#include "arrow/util/vector.h" namespace arrow { using internal::checked_cast; +using internal::MapVector; namespace compute { namespace { @@ -169,6 +172,65 @@ struct SourceNode : ExecNode { AsyncGenerator> generator_; }; +struct TableSourceNode : public SourceNode { + TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, + std::shared_ptr
table) + : SourceNode(plan, output_schema, + generator(ConvertTableToExecBatches(*table.get()).ValueOrDie())) {} + + static Result Make(ExecPlan* plan, std::vector inputs, + const ExecNodeOptions& options) { + RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); + const auto& table_options = checked_cast(options); + return plan->EmplaceNode(plan, table_options.table->schema(), + table_options.table); + } + const char* kind_name() const override { return "TableSourceNode"; } + + [[noreturn]] void InputReceived(ExecNode* input, ExecBatch batch) override { + SourceNode::InputReceived(input, batch); + } + [[noreturn]] void ErrorReceived(ExecNode* input, Status status) override { + SourceNode::ErrorReceived(input, status); + } + [[noreturn]] void InputFinished(ExecNode* input, int total_batches) override { + SourceNode::InputFinished(input, total_batches); + } + + Status StartProducing() override { return SourceNode::StartProducing(); } + + void PauseProducing(ExecNode* output) override { SourceNode::PauseProducing(output); } + + void StopProducing() override { SourceNode::StopProducing(); } + + Future<> finished() override { return SourceNode::finished(); } + + arrow::AsyncGenerator> generator( + std::vector batches) { + auto opt_batches = MapVector( + [](ExecBatch batch) { return util::make_optional(std::move(batch)); }, batches); + AsyncGenerator> gen; + gen = MakeVectorGenerator(std::move(opt_batches)); + return gen; + } + + arrow::Result> ConvertTableToExecBatches(const Table& table) { + std::shared_ptr reader = std::make_shared(table); + std::shared_ptr batch; + std::vector> batch_vector; + std::vector exec_batches; + while (true) { + ARROW_ASSIGN_OR_RAISE(batch, reader->Next()); + if (batch == NULLPTR) { + break; + } + ExecBatch exec_batch{*batch}; + exec_batches.push_back(exec_batch); + } + return exec_batches; + } +}; + } // namespace namespace internal { @@ -177,6 +239,10 @@ void RegisterSourceNode(ExecFactoryRegistry* registry) { DCHECK_OK(registry->AddFactory("source", SourceNode::Make)); } +void RegisterTableSourceNode(ExecFactoryRegistry* registry) { + DCHECK_OK(registry->AddFactory("table", TableSourceNode::Make)); +} + } // namespace internal } // namespace compute } // namespace arrow From 65fde9546e2d26535764f503b838b2a4c64f3176 Mon Sep 17 00:00:00 2001 From: Vibhatha Abeykoon Date: Wed, 26 Jan 2022 13:49:06 +0530 Subject: [PATCH 8/8] removing table source sep code --- .../arrow/compute/exec/table_source_node.cc | 182 ------------------ 1 file changed, 182 deletions(-) delete mode 100644 cpp/src/arrow/compute/exec/table_source_node.cc diff --git a/cpp/src/arrow/compute/exec/table_source_node.cc b/cpp/src/arrow/compute/exec/table_source_node.cc deleted file mode 100644 index 76ee8b3ed32..00000000000 --- a/cpp/src/arrow/compute/exec/table_source_node.cc +++ /dev/null @@ -1,182 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include "arrow/compute/exec.h" -#include "arrow/compute/exec/exec_plan.h" -#include "arrow/compute/exec/expression.h" -#include "arrow/compute/exec/options.h" -#include "arrow/compute/exec/util.h" -#include "arrow/compute/exec_internal.h" -#include "arrow/datum.h" -#include "arrow/result.h" -#include "arrow/util/async_generator.h" -#include "arrow/util/async_util.h" -#include "arrow/util/checked_cast.h" -#include "arrow/util/future.h" -#include "arrow/util/logging.h" -#include "arrow/util/optional.h" -#include "arrow/util/thread_pool.h" -#include "arrow/util/unreachable.h" - -namespace arrow { - -using internal::checked_cast; - -namespace compute { -namespace { - -struct TableSourceNode : ExecNode { - TableSourceNode(ExecPlan* plan, std::shared_ptr output_schema, - std::shared_ptr table) : ExecNode(plan, {}, {}, - std::move(output_schema), /*num_outputs=*/1), table_(std::move(table)) {} - - static Result Make(ExecPlan* plan, std::vector inputs, - const ExecNodeOptions& options) { - RETURN_NOT_OK(ValidateExecNodeInputs(plan, inputs, 0, "TableSourceNode")); - const auto& source_options = checked_cast(options); - return plan->EmplaceNode(plan, source_options.table.schema(), - source_options.table); - } - - const char* kind_name() const override { return "TableSourceNode"; } - - [[noreturn]] static void NoInputs() { - Unreachable("no inputs; this should never be called"); - } - [[noreturn]] void InputReceived(ExecNode*, ExecBatch) override { NoInputs(); } - [[noreturn]] void ErrorReceived(ExecNode*, Status) override { NoInputs(); } - [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } - - Status StartProducing() override { - { - // If another exec node encountered an error during its StartProducing call - // it might have already called StopProducing on all of its inputs (including this - // node). - // - std::unique_lock lock(mutex_); - if (stop_requested_) { - return Status::OK(); - } - } - - CallbackOptions options; - auto executor = plan()->exec_context()->executor(); - if (executor) { - // These options will transfer execution to the desired Executor if necessary. - // This can happen for in-memory scans where batches didn't require - // any CPU work to decode. Otherwise, parsing etc should have already - // been placed us on the desired Executor and no queues will be pushed to. - options.executor = executor; - options.should_schedule = ShouldSchedule::IfDifferentExecutor; - } - finished_ = - Loop([this, executor, options] { - std::unique_lock lock(mutex_); - int total_batches = batch_count_++; - if (stop_requested_) { - return Future>::MakeFinished(Break(total_batches)); - } - lock.unlock(); - - return generator_().Then( - [=](const util::optional& maybe_batch) -> ControlFlow { - std::unique_lock lock(mutex_); - if (IsIterationEnd(maybe_batch) || stop_requested_) { - stop_requested_ = true; - return Break(total_batches); - } - lock.unlock(); - ExecBatch batch = std::move(*maybe_batch); - - if (executor) { - auto status = - task_group_.AddTask([this, executor, batch]() -> Result> { - return executor->Submit([=]() { - outputs_[0]->InputReceived(this, std::move(batch)); - return Status::OK(); - }); - }); - if (!status.ok()) { - outputs_[0]->ErrorReceived(this, std::move(status)); - return Break(total_batches); - } - } else { - outputs_[0]->InputReceived(this, std::move(batch)); - } - return Continue(); - }, - [=](const Status& error) -> ControlFlow { - // NB: ErrorReceived is independent of InputFinished, but - // ErrorReceived will usually prompt StopProducing which will - // prompt InputFinished. ErrorReceived may still be called from a - // node which was requested to stop (indeed, the request to stop - // may prompt an error). - std::unique_lock lock(mutex_); - stop_requested_ = true; - lock.unlock(); - outputs_[0]->ErrorReceived(this, error); - return Break(total_batches); - }, - options); - }).Then([&](int total_batches) { - outputs_[0]->InputFinished(this, total_batches); - return task_group_.End(); - }); - - return Status::OK(); - } - - void PauseProducing(ExecNode* output) override {} - - void ResumeProducing(ExecNode* output) override {} - - void StopProducing(ExecNode* output) override { - DCHECK_EQ(output, outputs_[0]); - StopProducing(); - } - - void StopProducing() override { - std::unique_lock lock(mutex_); - stop_requested_ = true; - } - - Future<> finished() override { return finished_; } - - private: - std::mutex mutex_; - bool stop_requested_{false}; - int batch_count_{0}; - Future<> finished_ = Future<>::MakeFinished(); - - std::shared_ptr
table_; - util::AsyncTaskGroup task_group_; - AsyncGenerator> generator_; -}; - -} // namespace - -namespace internal { - -void RegisterSourceNode(ExecFactoryRegistry* registry) { - DCHECK_OK(registry->AddFactory("table_source", TableSourceNode::Make)); -} - -} // namespace internal -} // namespace compute -} // namespace arrow