From dee89592603475d765f22da9584ffd166df9abb8 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 14 Apr 2022 14:23:05 -0400 Subject: [PATCH 01/10] ARROW-15517: [R] Use WriteNode in write_dataset() --- r/R/arrowExports.R | 8 ++-- r/R/dataset-write.R | 57 +++++++++++++++++++++------ r/R/dplyr.R | 7 +++- r/R/query-engine.R | 4 ++ r/src/arrowExports.cpp | 57 ++++++++++++++------------- r/src/compute-exec.cpp | 36 +++++++++++++++++ r/src/dataset.cpp | 24 ----------- r/tests/testthat/test-dataset-write.R | 52 +++++++++++++++--------- 8 files changed, 158 insertions(+), 87 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 7bf77f1e66c..d0ac9a83345 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -420,6 +420,10 @@ ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) } +ExecPlan_Write <- function(plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) +} + ExecNode_Filter <- function(input, filter) { .Call(`_arrow_ExecNode_Filter`, input, filter) } @@ -748,10 +752,6 @@ dataset___Scanner__schema <- function(sc) { .Call(`_arrow_dataset___Scanner__schema`, sc) } -dataset___Dataset__Write <- function(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { - invisible(.Call(`_arrow_dataset___Dataset__Write`, file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) -} - dataset___Scanner__TakeRows <- function(scanner, indices) { .Call(`_arrow_dataset___Scanner__TakeRows`, scanner, indices) } diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index d7c73908e70..9f289f0b179 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -136,18 +136,49 @@ write_dataset <- function(dataset, if (inherits(dataset, "arrow_dplyr_query")) { # partitioning vars need to be in the `select` schema dataset <- ensure_group_vars(dataset) - } else if (inherits(dataset, "grouped_df")) { - force(partitioning) - # Drop the grouping metadata before writing; we've already consumed it - # now to construct `partitioning` and don't want it in the metadata$r - dataset <- dplyr::ungroup(dataset) + } else { + if (inherits(dataset, "grouped_df")) { + force(partitioning) + # Drop the grouping metadata before writing; we've already consumed it + # now to construct `partitioning` and don't want it in the metadata$r + dataset <- dplyr::ungroup(dataset) + } + dataset <- tryCatch( + as_adq(dataset), + error = function(e) { + supported <- c( + "Dataset", "RecordBatch", "Table", "arrow_dplyr_query", "data.frame" + ) + stop( + "'dataset' must be a ", + oxford_paste(supported, "or", quote = FALSE), + ", not ", + deparse(class(dataset)), + call. = FALSE + ) + } + ) + } + + plan <- ExecPlan$create() + final_node <- plan$Build(dataset) + if (!is.null(final_node$sort %||% final_node$head %||% final_node$tail)) { + # Because sorting and topK are only handled in the SinkNode (or in R!), + # they wouldn't get picked up in the WriteNode. So let's Run this ExecPlan + # to capture those, and then create a new plan for writing + # TODO(ARROW-15681): do sorting in WriteNode in C++ + dataset <- as_adq(plan$Run(final_node)) + plan <- ExecPlan$create() + final_node <- plan$Build(dataset) } - scanner <- Scanner$create(dataset) if (!inherits(partitioning, "Partitioning")) { - partition_schema <- scanner$schema[partitioning] + partition_schema <- final_node$schema[partitioning] if (isTRUE(hive_style)) { - partitioning <- HivePartitioning$create(partition_schema, null_fallback = list(...)$null_fallback) + partitioning <- HivePartitioning$create( + partition_schema, + null_fallback = list(...)$null_fallback + ) } else { partitioning <- DirectoryPartitioning$create(partition_schema) } @@ -158,8 +189,10 @@ write_dataset <- function(dataset, } path_and_fs <- get_path_and_filesystem(path) - options <- FileWriteOptions$create(format, table = scanner, ...) + options <- FileWriteOptions$create(format, table = final_node$schema, ...) + # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R + # and encapsulate this logic better existing_data_behavior_opts <- c("delete_matching", "overwrite", "error") existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L @@ -168,9 +201,9 @@ write_dataset <- function(dataset, validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer") validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") - dataset___Dataset__Write( - options, path_and_fs$fs, path_and_fs$path, - partitioning, basename_template, scanner, + plan$Write( + final_node, options, path_and_fs$fs, path_and_fs$path, + partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group diff --git a/r/R/dplyr.R b/r/R/dplyr.R index e6d78890785..9d1d1dfdc24 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -24,7 +24,12 @@ arrow_dplyr_query <- function(.data) { # RecordBatch, or Dataset) and the state of the user's dplyr query--things # like selected columns, filters, and group vars. # An arrow_dplyr_query can contain another arrow_dplyr_query in .data - gv <- dplyr::group_vars(.data) %||% character() + gv <- tryCatch( + # If dplyr is not available, or if the input doesn't have a group_vars + # method, assume no group vars + dplyr::group_vars(.data) %||% character(), + error = function(e) character() + ) if (inherits(.data, "data.frame")) { .data <- Table$create(.data) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 6c1b14036f1..559ab006e03 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -244,6 +244,10 @@ ExecPlan <- R6Class("ExecPlan", } out }, + Write = function(node, ...) { + # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... + ExecPlan_Write(self, node, ...) + }, Stop = function() ExecPlan_StopProducing(self) ) ) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 81dcc0dddcb..0d010338cc7 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -914,6 +914,34 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil } #endif +// compute-exec.cpp +#if defined(ARROW_R_WITH_DATASET) +void ExecPlan_Write(const std::shared_ptr& plan, const std::shared_ptr& final_node, const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + arrow::r::Input&>::type final_node(final_node_sexp); + arrow::r::Input&>::type file_write_options(file_write_options_sexp); + arrow::r::Input&>::type filesystem(filesystem_sexp); + arrow::r::Input::type base_dir(base_dir_sexp); + arrow::r::Input&>::type partitioning(partitioning_sexp); + arrow::r::Input::type basename_template(basename_template_sexp); + arrow::r::Input::type existing_data_behavior(existing_data_behavior_sexp); + arrow::r::Input::type max_partitions(max_partitions_sexp); + arrow::r::Input::type max_open_files(max_open_files_sexp); + arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); + arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); + arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); + ExecPlan_Write(plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); + return R_NilValue; +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ + Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // compute-exec.cpp std::shared_ptr ExecNode_Filter(const std::shared_ptr& input, const std::shared_ptr& filter); extern "C" SEXP _arrow_ExecNode_Filter(SEXP input_sexp, SEXP filter_sexp){ @@ -2041,33 +2069,6 @@ extern "C" SEXP _arrow_dataset___Scanner__schema(SEXP sc_sexp){ } #endif -// dataset.cpp -#if defined(ARROW_R_WITH_DATASET) -void dataset___Dataset__Write(const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, const std::shared_ptr& scanner, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type file_write_options(file_write_options_sexp); - arrow::r::Input&>::type filesystem(filesystem_sexp); - arrow::r::Input::type base_dir(base_dir_sexp); - arrow::r::Input&>::type partitioning(partitioning_sexp); - arrow::r::Input::type basename_template(basename_template_sexp); - arrow::r::Input&>::type scanner(scanner_sexp); - arrow::r::Input::type existing_data_behavior(existing_data_behavior_sexp); - arrow::r::Input::type max_partitions(max_partitions_sexp); - arrow::r::Input::type max_open_files(max_open_files_sexp); - arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); - arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); - arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); - dataset___Dataset__Write(file_write_options, filesystem, base_dir, partitioning, basename_template, scanner, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); - return R_NilValue; -END_CPP11 -} -#else -extern "C" SEXP _arrow_dataset___Dataset__Write(SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP scanner_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ - Rf_error("Cannot call dataset___Dataset__Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // dataset.cpp #if defined(ARROW_R_WITH_DATASET) std::shared_ptr dataset___Scanner__TakeRows(const std::shared_ptr& scanner, const std::shared_ptr& indices); @@ -5197,6 +5198,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, + { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 13}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, { "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3}, { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5}, @@ -5279,7 +5281,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_dataset___Scanner__ToRecordBatchReader", (DL_FUNC) &_arrow_dataset___Scanner__ToRecordBatchReader, 1}, { "_arrow_dataset___Scanner__head", (DL_FUNC) &_arrow_dataset___Scanner__head, 2}, { "_arrow_dataset___Scanner__schema", (DL_FUNC) &_arrow_dataset___Scanner__schema, 1}, - { "_arrow_dataset___Dataset__Write", (DL_FUNC) &_arrow_dataset___Dataset__Write, 12}, { "_arrow_dataset___Scanner__TakeRows", (DL_FUNC) &_arrow_dataset___Scanner__TakeRows, 2}, { "_arrow_dataset___Scanner__CountRows", (DL_FUNC) &_arrow_dataset___Scanner__CountRows, 1}, { "_arrow_Int8__initialize", (DL_FUNC) &_arrow_Int8__initialize, 0}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index e7d8df55bb2..19d1f15719b 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -121,6 +121,7 @@ std::shared_ptr ExecNode_output_schema( #if defined(ARROW_R_WITH_DATASET) +#include #include #include @@ -157,6 +158,41 @@ std::shared_ptr ExecNode_Scan( arrow::dataset::ScanNodeOptions{dataset, options}); } +// [[dataset::export]] +void ExecPlan_Write(const std::shared_ptr& plan, + const std::shared_ptr& final_node, + const std::shared_ptr& file_write_options, + const std::shared_ptr& filesystem, + std::string base_dir, + const std::shared_ptr& partitioning, + std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, + int max_partitions, uint32_t max_open_files, + uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { + // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R + // and encapsulate this logic better + ds::FileSystemDatasetWriteOptions opts; + opts.file_write_options = file_write_options; + opts.existing_data_behavior = existing_data_behavior; + opts.filesystem = filesystem; + opts.base_dir = base_dir; + opts.partitioning = partitioning; + opts.basename_template = basename_template; + opts.max_partitions = max_partitions; + opts.max_open_files = max_open_files; + opts.max_rows_per_file = max_rows_per_file; + opts.min_rows_per_group = min_rows_per_group; + opts.max_rows_per_group = max_rows_per_group; + + MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, + ds::WriteNodeOptions{std::move(opts)}); + + StopIfNotOk(plan->Validate()); + StopIfNotOk(plan->StartProducing()); + StopIfNotOk(plan->finished().status()); +} + #endif // [[arrow::export]] diff --git a/r/src/dataset.cpp b/r/src/dataset.cpp index 4881830560a..4ff30d9d941 100644 --- a/r/src/dataset.cpp +++ b/r/src/dataset.cpp @@ -511,30 +511,6 @@ std::shared_ptr dataset___Scanner__schema( return sc->options()->projected_schema; } -// [[dataset::export]] -void dataset___Dataset__Write( - const std::shared_ptr& file_write_options, - const std::shared_ptr& filesystem, std::string base_dir, - const std::shared_ptr& partitioning, std::string basename_template, - const std::shared_ptr& scanner, - arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, - uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, - uint64_t max_rows_per_group) { - ds::FileSystemDatasetWriteOptions opts; - opts.file_write_options = file_write_options; - opts.existing_data_behavior = existing_data_behavior; - opts.filesystem = filesystem; - opts.base_dir = base_dir; - opts.partitioning = partitioning; - opts.basename_template = basename_template; - opts.max_partitions = max_partitions; - opts.max_open_files = max_open_files; - opts.max_rows_per_file = max_rows_per_file; - opts.min_rows_per_group = min_rows_per_group; - opts.max_rows_per_group = max_rows_per_group; - StopIfNotOk(ds::FileSystemDataset::Write(opts, scanner)); -} - // [[dataset::export]] std::shared_ptr dataset___Scanner__TakeRows( const std::shared_ptr& scanner, diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index aafb4bf292e..9a0c3e2252e 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -321,6 +321,7 @@ test_that("Dataset writing: from RecordBatch", { dst_dir <- tempfile() stacked <- record_batch(rbind(df1, df2)) stacked %>% + mutate(twice = int * 2) %>% group_by(int) %>% write_dataset(dst_dir, format = "feather") expect_true(dir.exists(dst_dir)) @@ -438,7 +439,7 @@ test_that("Writing a dataset: CSV format options", { test_that("Dataset writing: unsupported features/input validation", { skip_if_not_available("parquet") - expect_error(write_dataset(4), 'dataset must be a "Dataset"') + expect_error(write_dataset(4), "'dataset' must be a Dataset, ") ds <- open_dataset(hive_dir) expect_error( @@ -520,7 +521,6 @@ test_that("max_rows_per_group is adjusted if at odds with max_rows_per_file", { expect_silent( write_dataset(df, dst_dir, max_rows_per_file = 5) ) - }) @@ -571,17 +571,27 @@ test_that("Dataset write max open files", { partitioning <- "c2" num_of_unique_c2_groups <- 5 - record_batch_1 <- record_batch(c1 = c(1, 2, 3, 4, 0, 10), - c2 = c("a", "b", "c", "d", "e", "a")) - record_batch_2 <- record_batch(c1 = c(5, 6, 7, 8, 0, 1), - c2 = c("a", "b", "c", "d", "e", "c")) - record_batch_3 <- record_batch(c1 = c(9, 10, 11, 12, 0, 1), - c2 = c("a", "b", "c", "d", "e", "d")) - record_batch_4 <- record_batch(c1 = c(13, 14, 15, 16, 0, 1), - c2 = c("a", "b", "c", "d", "e", "b")) + record_batch_1 <- record_batch( + c1 = c(1, 2, 3, 4, 0, 10), + c2 = c("a", "b", "c", "d", "e", "a") + ) + record_batch_2 <- record_batch( + c1 = c(5, 6, 7, 8, 0, 1), + c2 = c("a", "b", "c", "d", "e", "c") + ) + record_batch_3 <- record_batch( + c1 = c(9, 10, 11, 12, 0, 1), + c2 = c("a", "b", "c", "d", "e", "d") + ) + record_batch_4 <- record_batch( + c1 = c(13, 14, 15, 16, 0, 1), + c2 = c("a", "b", "c", "d", "e", "b") + ) - table <- Table$create(d1 = record_batch_1, d2 = record_batch_2, - d3 = record_batch_3, d4 = record_batch_4) + table <- Table$create( + d1 = record_batch_1, d2 = record_batch_2, + d3 = record_batch_3, d4 = record_batch_4 + ) write_dataset(table, path = dst_dir, format = file_format, partitioning = partitioning) @@ -643,12 +653,18 @@ test_that("Dataset write max rows per files", { test_that("Dataset min_rows_per_group", { skip_if_not_available("parquet") - rb1 <- record_batch(c1 = c(1, 2, 3, 4), - c2 = c("a", "b", "e", "a")) - rb2 <- record_batch(c1 = c(5, 6, 7, 8, 9), - c2 = c("a", "b", "c", "d", "h")) - rb3 <- record_batch(c1 = c(10, 11), - c2 = c("a", "b")) + rb1 <- record_batch( + c1 = c(1, 2, 3, 4), + c2 = c("a", "b", "e", "a") + ) + rb2 <- record_batch( + c1 = c(5, 6, 7, 8, 9), + c2 = c("a", "b", "c", "d", "h") + ) + rb3 <- record_batch( + c1 = c(10, 11), + c2 = c("a", "b") + ) dataset <- Table$create(d1 = rb1, d2 = rb2, d3 = rb3) From b8938c34cb75f7dc9b56f7e4f593b86b586e5056 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Thu, 14 Apr 2022 14:30:58 -0400 Subject: [PATCH 02/10] Initialize dataset node factory --- r/src/arrowExports.cpp | 4 ++-- r/src/compute-exec.cpp | 8 +++++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 0d010338cc7..0091e33f7a3 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -898,11 +898,11 @@ END_CPP11 } // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); +std::shared_ptr ExecNode_Scan(const std::shared_ptr& plan, const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names); extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP filter_sexp, SEXP materialized_field_names_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); - arrow::r::Input&>::type dataset(dataset_sexp); + arrow::r::Input&>::type dataset(dataset_sexp); arrow::r::Input&>::type filter(filter_sexp); arrow::r::Input>::type materialized_field_names(materialized_field_names_sexp); return cpp11::as_sexp(ExecNode_Scan(plan, dataset, filter, materialized_field_names)); diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 19d1f15719b..e2cafd98ae2 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -128,13 +128,13 @@ std::shared_ptr ExecNode_output_schema( // [[dataset::export]] std::shared_ptr ExecNode_Scan( const std::shared_ptr& plan, - const std::shared_ptr& dataset, + const std::shared_ptr& dataset, const std::shared_ptr& filter, std::vector materialized_field_names) { arrow::dataset::internal::Initialize(); // TODO: pass in FragmentScanOptions - auto options = std::make_shared(); + auto options = std::make_shared(); options->use_threads = arrow::r::GetBoolOption("arrow.use_threads", true); @@ -155,7 +155,7 @@ std::shared_ptr ExecNode_Scan( .Bind(*dataset->schema())); return MakeExecNodeOrStop("scan", plan.get(), {}, - arrow::dataset::ScanNodeOptions{dataset, options}); + ds::ScanNodeOptions{dataset, options}); } // [[dataset::export]] @@ -170,6 +170,8 @@ void ExecPlan_Write(const std::shared_ptr& plan, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group) { + arrow::dataset::internal::Initialize(); + // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R // and encapsulate this logic better ds::FileSystemDatasetWriteOptions opts; From 4bdf78b83e7a198ef5e8633c62485e9e0fe07f1a Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 15 Apr 2022 07:42:30 -0400 Subject: [PATCH 03/10] Fix duckdb test (not sure why failure shows up here) --- r/tests/testthat/test-duckdb.R | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/r/tests/testthat/test-duckdb.R b/r/tests/testthat/test-duckdb.R index 61f6983b385..63948176642 100644 --- a/r/tests/testthat/test-duckdb.R +++ b/r/tests/testthat/test-duckdb.R @@ -48,9 +48,9 @@ test_that("to_duckdb", { # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(!fct) %>% arrange(int), - example_data %>% - select(!fct) %>% - arrange(int) + example_data %>% + select(!fct) %>% + arrange(int) ) expect_identical( @@ -159,8 +159,7 @@ test_that("to_arrow roundtrip, with dataset", { to_arrow() %>% filter(int > 5 & part > 1) %>% collect() %>% - arrange(part, int) %>% - as.data.frame(), + arrange(part, int), ds %>% select(-fct) %>% filter(int > 5 & part > 1) %>% From ede42321f483cb68a84bf10de74d92337880c8d1 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 15 Apr 2022 08:11:45 -0400 Subject: [PATCH 04/10] Attempt to pass metadata in (does not fix test) --- r/R/arrowExports.R | 4 ++-- r/R/dataset-write.R | 2 +- r/src/arrowExports.cpp | 11 ++++++----- r/src/compute-exec.cpp | 29 +++++++++++++++++------------ r/tests/testthat/test-metadata.R | 19 ++++++++++++------- 5 files changed, 38 insertions(+), 27 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d0ac9a83345..6b969336c93 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -420,8 +420,8 @@ ExecNode_Scan <- function(plan, dataset, filter, materialized_field_names) { .Call(`_arrow_ExecNode_Scan`, plan, dataset, filter, materialized_field_names) } -ExecPlan_Write <- function(plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { - invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) +ExecPlan_Write <- function(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group) { + invisible(.Call(`_arrow_ExecPlan_Write`, plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group)) } ExecNode_Filter <- function(input, filter) { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 9f289f0b179..61eea5e85fa 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -202,7 +202,7 @@ write_dataset <- function(dataset, validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") plan$Write( - final_node, options, path_and_fs$fs, path_and_fs$path, + final_node, prepare_key_value_metadata(dataset$metadata), options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 0091e33f7a3..fb9f3b94d18 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -916,11 +916,12 @@ extern "C" SEXP _arrow_ExecNode_Scan(SEXP plan_sexp, SEXP dataset_sexp, SEXP fil // compute-exec.cpp #if defined(ARROW_R_WITH_DATASET) -void ExecPlan_Write(const std::shared_ptr& plan, const std::shared_ptr& final_node, const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); -extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +void ExecPlan_Write(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::strings metadata, const std::shared_ptr& file_write_options, const std::shared_ptr& filesystem, std::string base_dir, const std::shared_ptr& partitioning, std::string basename_template, arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, uint64_t max_rows_per_group); +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); arrow::r::Input&>::type final_node(final_node_sexp); + arrow::r::Input::type metadata(metadata_sexp); arrow::r::Input&>::type file_write_options(file_write_options_sexp); arrow::r::Input&>::type filesystem(filesystem_sexp); arrow::r::Input::type base_dir(base_dir_sexp); @@ -932,12 +933,12 @@ BEGIN_CPP11 arrow::r::Input::type max_rows_per_file(max_rows_per_file_sexp); arrow::r::Input::type min_rows_per_group(min_rows_per_group_sexp); arrow::r::Input::type max_rows_per_group(max_rows_per_group_sexp); - ExecPlan_Write(plan, final_node, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); + ExecPlan_Write(plan, final_node, metadata, file_write_options, filesystem, base_dir, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group); return R_NilValue; END_CPP11 } #else -extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ +extern "C" SEXP _arrow_ExecPlan_Write(SEXP plan_sexp, SEXP final_node_sexp, SEXP metadata_sexp, SEXP file_write_options_sexp, SEXP filesystem_sexp, SEXP base_dir_sexp, SEXP partitioning_sexp, SEXP basename_template_sexp, SEXP existing_data_behavior_sexp, SEXP max_partitions_sexp, SEXP max_open_files_sexp, SEXP max_rows_per_file_sexp, SEXP min_rows_per_group_sexp, SEXP max_rows_per_group_sexp){ Rf_error("Cannot call ExecPlan_Write(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -5198,7 +5199,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, { "_arrow_ExecNode_Scan", (DL_FUNC) &_arrow_ExecNode_Scan, 4}, - { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 13}, + { "_arrow_ExecPlan_Write", (DL_FUNC) &_arrow_ExecPlan_Write, 14}, { "_arrow_ExecNode_Filter", (DL_FUNC) &_arrow_ExecNode_Filter, 2}, { "_arrow_ExecNode_Project", (DL_FUNC) &_arrow_ExecNode_Project, 3}, { "_arrow_ExecNode_Aggregate", (DL_FUNC) &_arrow_ExecNode_Aggregate, 5}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index e2cafd98ae2..4c3cc922574 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -159,17 +159,15 @@ std::shared_ptr ExecNode_Scan( } // [[dataset::export]] -void ExecPlan_Write(const std::shared_ptr& plan, - const std::shared_ptr& final_node, - const std::shared_ptr& file_write_options, - const std::shared_ptr& filesystem, - std::string base_dir, - const std::shared_ptr& partitioning, - std::string basename_template, - arrow::dataset::ExistingDataBehavior existing_data_behavior, - int max_partitions, uint32_t max_open_files, - uint64_t max_rows_per_file, uint64_t min_rows_per_group, - uint64_t max_rows_per_group) { +void ExecPlan_Write( + const std::shared_ptr& plan, + const std::shared_ptr& final_node, cpp11::strings metadata, + const std::shared_ptr& file_write_options, + const std::shared_ptr& filesystem, std::string base_dir, + const std::shared_ptr& partitioning, std::string basename_template, + arrow::dataset::ExistingDataBehavior existing_data_behavior, int max_partitions, + uint32_t max_open_files, uint64_t max_rows_per_file, uint64_t min_rows_per_group, + uint64_t max_rows_per_group) { arrow::dataset::internal::Initialize(); // TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R @@ -187,8 +185,15 @@ void ExecPlan_Write(const std::shared_ptr& plan, opts.min_rows_per_group = min_rows_per_group; opts.max_rows_per_group = max_rows_per_group; + // TODO: factor this out to a strings_to_KVM() helper + auto values = cpp11::as_cpp>(metadata); + auto names = cpp11::as_cpp>(metadata.attr("names")); + + auto kv = + std::make_shared(std::move(names), std::move(values)); + MakeExecNodeOrStop("write", final_node->plan(), {final_node.get()}, - ds::WriteNodeOptions{std::move(opts)}); + ds::WriteNodeOptions{std::move(opts), std::move(kv)}); StopIfNotOk(plan->Validate()); StopIfNotOk(plan->StartProducing()); diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R index 3217b58d6c2..fb0ba953db0 100644 --- a/r/tests/testthat/test-metadata.R +++ b/r/tests/testthat/test-metadata.R @@ -226,11 +226,13 @@ test_that("Row-level metadata (does not by default) roundtrip", { # But we can re-enable this / read data that has already been written with # row-level metadata withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { tab <- Table$create(df) expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar") expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux") - }) + } + ) }) @@ -256,7 +258,8 @@ test_that("Row-level metadata (does not) roundtrip in datasets", { dst_dir <- make_temp_dir() withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { expect_warning( write_dataset(df, dst_dir, partitioning = "part"), "Row-level metadata is not compatible with datasets and will be discarded" @@ -286,7 +289,8 @@ test_that("Row-level metadata (does not) roundtrip in datasets", { df_from_ds <- ds %>% select(int) %>% collect(), NA ) - }) + } + ) }) test_that("When we encounter SF cols, we warn", { @@ -305,11 +309,13 @@ test_that("When we encounter SF cols, we warn", { # But we can re-enable this / read data that has already been written with # row-level metadata without a warning withr::with_options( - list("arrow.preserve_row_level_metadata" = TRUE), { + list("arrow.preserve_row_level_metadata" = TRUE), + { expect_warning(tab <- Table$create(df), NA) expect_identical(attr(as.data.frame(tab)$x[[1]], "foo"), "bar") expect_identical(attr(as.data.frame(tab)$x[[2]], "baz"), "qux") - }) + } + ) }) test_that("dplyr with metadata", { @@ -369,7 +375,6 @@ test_that("grouped_df metadata is recorded (efficiently)", { }) test_that("grouped_df non-arrow metadata is preserved", { - simple_tbl <- tibble(a = 1:2, b = 3:4) attr(simple_tbl, "other_metadata") <- "look I'm still here!" grouped <- group_by(simple_tbl, a) From 84de6feff2eaa1d45463ca7f696e6500751102a5 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 15 Apr 2022 08:14:11 -0400 Subject: [PATCH 05/10] Grab the source_data for the metadata --- r/R/dataset-write.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 61eea5e85fa..c6c370b27df 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -202,7 +202,7 @@ write_dataset <- function(dataset, validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") plan$Write( - final_node, prepare_key_value_metadata(dataset$metadata), options, path_and_fs$fs, path_and_fs$path, + final_node, prepare_key_value_metadata(source_data(dataset$metadata)), options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, From 5f5844996625a02323c1a2092458124a7b8bf6ea Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 15 Apr 2022 14:12:40 -0400 Subject: [PATCH 06/10] Fix the write_dataset metadata handling and add a test --- r/R/dataset-write.R | 3 ++- r/R/dplyr.R | 4 +++- r/tests/testthat/test-metadata.R | 17 +++++++++++++++++ 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index c6c370b27df..159651ce10d 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -202,7 +202,8 @@ write_dataset <- function(dataset, validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") plan$Write( - final_node, prepare_key_value_metadata(source_data(dataset$metadata)), options, path_and_fs$fs, path_and_fs$path, + final_node, prepare_key_value_metadata(source_data(dataset)$metadata), + options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, existing_data_behavior, max_partitions, max_open_files, max_rows_per_file, diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 9d1d1dfdc24..c9650fb0653 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -252,7 +252,9 @@ abandon_ship <- function(call, .data, msg) { query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader")) source_data <- function(x) { - if (is_collapsed(x)) { + if (!inherits(x, "arrow_dplyr_query")) { + x + } else if (is_collapsed(x)) { source_data(x$.data) } else { x$.data diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R index fb0ba953db0..4db20d04df3 100644 --- a/r/tests/testthat/test-metadata.R +++ b/r/tests/testthat/test-metadata.R @@ -293,6 +293,23 @@ test_that("Row-level metadata (does not) roundtrip in datasets", { ) }) +test_that("Dataset writing does handle other metadata", { + skip_if_not_available("dataset") + skip_if_not_available("parquet") + + dst_dir <- make_temp_dir() + write_dataset(example_with_metadata, dst_dir, partitioning = "b") + + ds <- open_dataset(dst_dir) + expect_equal( + ds %>% + # partitioning on b puts it last, so move it back + select(a, b, c, d) %>% + collect(), + example_with_metadata + ) +}) + test_that("When we encounter SF cols, we warn", { df <- data.frame(x = I(list(structure(1, foo = "bar"), structure(2, baz = "qux")))) class(df$x) <- c("sfc_MULTIPOLYGON", "sfc", "list") From 8481b2d922f9eb11595355d45cdd561f1326dd5b Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Sun, 17 Apr 2022 08:23:20 -0400 Subject: [PATCH 07/10] Revert "Fix duckdb test (not sure why failure shows up here)" This reverts commit afb6ca3453057f55fa8ba23433fce06949badf99. --- r/tests/testthat/test-duckdb.R | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/r/tests/testthat/test-duckdb.R b/r/tests/testthat/test-duckdb.R index 63948176642..61f6983b385 100644 --- a/r/tests/testthat/test-duckdb.R +++ b/r/tests/testthat/test-duckdb.R @@ -48,9 +48,9 @@ test_that("to_duckdb", { # factors don't roundtrip https://github.com/duckdb/duckdb/issues/1879 select(!fct) %>% arrange(int), - example_data %>% - select(!fct) %>% - arrange(int) + example_data %>% + select(!fct) %>% + arrange(int) ) expect_identical( @@ -159,7 +159,8 @@ test_that("to_arrow roundtrip, with dataset", { to_arrow() %>% filter(int > 5 & part > 1) %>% collect() %>% - arrange(part, int), + arrange(part, int) %>% + as.data.frame(), ds %>% select(-fct) %>% filter(int > 5 & part > 1) %>% From 473241855347d99fd6dc627dff7b3465122f2523 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 18 Apr 2022 16:42:07 -0400 Subject: [PATCH 08/10] Factor out r metadata trimming from do_exec_plan; needs tests --- r/R/dataset-write.R | 31 ++++++++++++++++++++----------- r/R/metadata.R | 22 +++++++++++++++++++++- r/R/query-engine.R | 25 +++++++------------------ 3 files changed, 48 insertions(+), 30 deletions(-) diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 159651ce10d..9a25d5fbccc 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -184,25 +184,34 @@ write_dataset <- function(dataset, } } - if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { - max_rows_per_group <- max_rows_per_file - } - path_and_fs <- get_path_and_filesystem(path) - options <- FileWriteOptions$create(format, table = final_node$schema, ...) + output_schema <- final_node$schema + options <- FileWriteOptions$create(format, table = output_schema, ...) # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R # and encapsulate this logic better existing_data_behavior_opts <- c("delete_matching", "overwrite", "error") existing_data_behavior <- match(match.arg(existing_data_behavior), existing_data_behavior_opts) - 1L - validate_positive_int_value(max_partitions, "max_partitions must be a positive, non-missing integer") - validate_positive_int_value(max_open_files, "max_open_files must be a positive, non-missing integer") - validate_positive_int_value(min_rows_per_group, "min_rows_per_group must be a positive, non-missing integer") - validate_positive_int_value(max_rows_per_group, "max_rows_per_group must be a positive, non-missing integer") + if (!missing(max_rows_per_file) && missing(max_rows_per_group) && max_rows_per_group > max_rows_per_file) { + max_rows_per_group <- max_rows_per_file + } + + validate_positive_int_value(max_partitions) + validate_positive_int_value(max_open_files) + validate_positive_int_value(min_rows_per_group) + validate_positive_int_value(max_rows_per_group) + new_r_meta <- get_r_metadata_from_old_schema( + output_schema, + source_data(dataset)$schema, + drop_attributes = has_aggregation(dataset) + ) + if (!is.null(new_r_meta)) { + output_schema$r_metadata <- new_r_meta + } plan$Write( - final_node, prepare_key_value_metadata(source_data(dataset)$metadata), + final_node, prepare_key_value_metadata(output_schema$metadata), options, path_and_fs$fs, path_and_fs$path, partitioning, basename_template, existing_data_behavior, max_partitions, @@ -213,6 +222,6 @@ write_dataset <- function(dataset, validate_positive_int_value <- function(value, msg) { if (!is_integerish(value, n = 1) || is.na(value) || value < 0) { - abort(msg) + abort(paste(substitute(value), "must be a positive, non-missing integer")) } } diff --git a/r/R/metadata.R b/r/R/metadata.R index d88297dd925..f0411eb54a8 100644 --- a/r/R/metadata.R +++ b/r/R/metadata.R @@ -133,7 +133,6 @@ remove_attributes <- function(x) { } arrow_attributes <- function(x, only_top_level = FALSE) { - att <- attributes(x) removed_attributes <- remove_attributes(x) @@ -208,3 +207,24 @@ arrow_attributes <- function(x, only_top_level = FALSE) { NULL } } + +get_r_metadata_from_old_schema <- function(new_schema, + old_schema, + drop_attributes = FALSE) { + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- old_schema$r_metadata + if (!is.null(r_meta)) { + # Filter r_metadata$columns on columns with name _and_ type match + common_names <- intersect(names(r_meta$columns), names(new_schema)) + keep <- common_names[ + map_lgl(common_names, ~ old_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + if (drop_attributes) { + # dplyr drops top-level attributes if you do summarize + r_meta$attributes <- NULL + } + } + r_meta +} diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 559ab006e03..c794bc9de64 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -33,26 +33,15 @@ do_exec_plan <- function(.data) { if (ncol(tab)) { # Apply any column metadata from the original schema, where appropriate - original_schema <- source_data(.data)$schema - # TODO: do we care about other (non-R) metadata preservation? - # How would we know if it were meaningful? - r_meta <- original_schema$r_metadata - if (!is.null(r_meta)) { - # Filter r_metadata$columns on columns with name _and_ type match - new_schema <- tab$schema - common_names <- intersect(names(r_meta$columns), names(tab)) - keep <- common_names[ - map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) - ] - r_meta$columns <- r_meta$columns[keep] - if (has_aggregation(.data)) { - # dplyr drops top-level attributes if you do summarize - r_meta$attributes <- NULL - } - tab$r_metadata <- r_meta + new_r_metadata <- get_r_metadata_from_old_schema( + tab$schema, + source_data(.data)$schema, + drop_attributes = has_aggregation(.data) + ) + if (!is.null(new_r_metadata)) { + tab$r_metadata <- new_r_metadata } } - tab } From 42f8b4a8454a1a185ce226d082595d604673aa40 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 19 Apr 2022 10:03:45 -0400 Subject: [PATCH 09/10] Add head test --- r/tests/testthat/test-dataset-write.R | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/r/tests/testthat/test-dataset-write.R b/r/tests/testthat/test-dataset-write.R index 9a0c3e2252e..5b657148a57 100644 --- a/r/tests/testthat/test-dataset-write.R +++ b/r/tests/testthat/test-dataset-write.R @@ -244,6 +244,24 @@ test_that("Dataset writing: dplyr methods", { new_ds %>% select(c(names(df1), "twice")) %>% collect(), df1 %>% filter(int == 4) %>% mutate(twice = int * 2) ) + + # head + dst_dir4 <- tempfile() + ds %>% + mutate(twice = int * 2) %>% + arrange(int) %>% + head(3) %>% + write_dataset(dst_dir4, format = "feather") + new_ds <- open_dataset(dst_dir4, format = "feather") + + expect_equal( + new_ds %>% + select(c(names(df1), "twice")) %>% + collect(), + df1 %>% + mutate(twice = int * 2) %>% + head(3) + ) }) test_that("Dataset writing: non-hive", { From 87efe9f9f4a65b2103d2ae7d0572dacb629a7098 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Tue, 19 Apr 2022 10:17:59 -0400 Subject: [PATCH 10/10] Clean up table argument to ParquetWriterProperties --- r/R/dataset-format.R | 4 ++-- r/R/dataset-write.R | 6 +++++- r/R/parquet.R | 38 ++++++++++++++++++-------------------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/r/R/dataset-format.R b/r/R/dataset-format.R index f00efd0350c..acc1a41b02c 100644 --- a/r/R/dataset-format.R +++ b/r/R/dataset-format.R @@ -390,7 +390,7 @@ ParquetFragmentScanOptions$create <- function(use_buffered_stream = FALSE, FileWriteOptions <- R6Class("FileWriteOptions", inherit = ArrowObject, public = list( - update = function(table, ...) { + update = function(column_names, ...) { check_additional_args <- function(format, passed_args) { if (format == "parquet") { supported_args <- names(formals(write_parquet)) @@ -437,7 +437,7 @@ FileWriteOptions <- R6Class("FileWriteOptions", if (self$type == "parquet") { dataset___ParquetFileWriteOptions__update( self, - ParquetWriterProperties$create(table, ...), + ParquetWriterProperties$create(column_names, ...), ParquetArrowWriterProperties$create(...) ) } else if (self$type == "ipc") { diff --git a/r/R/dataset-write.R b/r/R/dataset-write.R index 9a25d5fbccc..09b3ebdbe69 100644 --- a/r/R/dataset-write.R +++ b/r/R/dataset-write.R @@ -186,7 +186,11 @@ write_dataset <- function(dataset, path_and_fs <- get_path_and_filesystem(path) output_schema <- final_node$schema - options <- FileWriteOptions$create(format, table = output_schema, ...) + options <- FileWriteOptions$create( + format, + column_names = names(output_schema), + ... + ) # TODO(ARROW-16200): expose FileSystemDatasetWriteOptions in R # and encapsulate this logic better diff --git a/r/R/parquet.R b/r/R/parquet.R index 3a07c224ed6..c6c00ed3a48 100644 --- a/r/R/parquet.R +++ b/r/R/parquet.R @@ -186,7 +186,7 @@ write_parquet <- function(x, x$schema, sink, properties = properties %||% ParquetWriterProperties$create( - x, + names(x), version = version, compression = compression, compression_level = compression_level, @@ -307,33 +307,33 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", set_version = function(version) { parquet___WriterProperties___Builder__version(self, make_valid_version(version)) }, - set_compression = function(table, compression) { + set_compression = function(column_names, compression) { compression <- compression_from_name(compression) assert_that(is.integer(compression)) private$.set( - table, compression, + column_names, compression, parquet___ArrowWriterProperties___Builder__set_compressions ) }, - set_compression_level = function(table, compression_level) { + set_compression_level = function(column_names, compression_level) { # cast to integer but keep names compression_level <- set_names(as.integer(compression_level), names(compression_level)) private$.set( - table, compression_level, + column_names, compression_level, parquet___ArrowWriterProperties___Builder__set_compression_levels ) }, - set_dictionary = function(table, use_dictionary) { + set_dictionary = function(column_names, use_dictionary) { assert_that(is.logical(use_dictionary)) private$.set( - table, use_dictionary, + column_names, use_dictionary, parquet___ArrowWriterProperties___Builder__set_use_dictionary ) }, - set_write_statistics = function(table, write_statistics) { + set_write_statistics = function(column_names, write_statistics) { assert_that(is.logical(write_statistics)) private$.set( - table, write_statistics, + column_names, write_statistics, parquet___ArrowWriterProperties___Builder__set_write_statistics ) }, @@ -342,9 +342,8 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", } ), private = list( - .set = function(table, value, FUN) { + .set = function(column_names, value, FUN) { msg <- paste0("unsupported ", substitute(value), "= specification") - column_names <- names(table) given_names <- names(value) if (is.null(given_names)) { if (length(value) %in% c(1L, length(column_names))) { @@ -364,7 +363,7 @@ ParquetWriterPropertiesBuilder <- R6Class("ParquetWriterPropertiesBuilder", ) ) -ParquetWriterProperties$create <- function(table, +ParquetWriterProperties$create <- function(column_names, version = NULL, compression = default_parquet_compression(), compression_level = NULL, @@ -377,16 +376,16 @@ ParquetWriterProperties$create <- function(table, builder$set_version(version) } if (!is.null(compression)) { - builder$set_compression(table, compression = compression) + builder$set_compression(column_names, compression = compression) } if (!is.null(compression_level)) { - builder$set_compression_level(table, compression_level = compression_level) + builder$set_compression_level(column_names, compression_level = compression_level) } if (!is.null(use_dictionary)) { - builder$set_dictionary(table, use_dictionary) + builder$set_dictionary(column_names, use_dictionary) } if (!is.null(write_statistics)) { - builder$set_write_statistics(table, write_statistics) + builder$set_write_statistics(column_names, write_statistics) } if (!is.null(data_page_size)) { builder$set_data_page_size(data_page_size) @@ -600,10 +599,9 @@ ParquetArrowReaderProperties$create <- function(use_threads = option_use_threads parquet___arrow___ArrowReaderProperties__Make(isTRUE(use_threads)) } -calculate_chunk_size <- function(rows, columns, - target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8), - max_chunks = getOption("arrow.parquet_max_chunks", 200) - ) { +calculate_chunk_size <- function(rows, columns, + target_cells_per_group = getOption("arrow.parquet_cells_per_group", 2.5e8), + max_chunks = getOption("arrow.parquet_max_chunks", 200)) { # Ensure is a float to prevent integer overflow issues num_cells <- as.numeric(rows) * as.numeric(columns)