From 4a2b61de9672b17fae8c906214c910c151e6bbe0 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Fri, 17 Sep 2021 16:33:17 -0400 Subject: [PATCH 1/2] Preserve original R schema metadata where appropriate --- r/R/query-engine.R | 21 +++++++++++++- r/tests/testthat/test-metadata.R | 48 ++++++++++++++++++++++++++++++-- 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index a96378671af..2b41ac36b94 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -20,11 +20,30 @@ do_exec_plan <- function(.data) { final_node <- plan$Build(.data) tab <- plan$Run(final_node) + # If arrange() created $temp_columns, make sure to omit them from the result + # We can't currently handle this in the ExecPlan itself because sorting + # happens in the end (SinkNode) so nothing comes after it. if (length(final_node$sort$temp_columns) > 0) { - # If arrange() created $temp_columns, make sure to omit them from the result tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] } + # Apply any column metadata from the original schema, where appropriate + original_schema <- source_data(.data)$schema + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- original_schema$metadata$r + if (!is.null(r_meta)) { + r_meta <- .unserialize_arrow_r_metadata(r_meta) + # Filter r_metadata$columns on columns with name _and_ type match + new_schema <- tab$schema + common_names <- intersect(names(r_meta$columns), names(tab)) + keep <- common_names[ + map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + tab$metadata$r <- .serialize_arrow_r_metadata(r_meta) + } + tab } diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R index 6ae5b54fbf3..f5c8ca8c285 100644 --- a/r/tests/testthat/test-metadata.R +++ b/r/tests/testthat/test-metadata.R @@ -239,8 +239,7 @@ test_that("metadata of list elements (ARROW-10386)", { ds <- open_dataset(dst_dir) expect_warning( df_from_ds <- collect(ds), - NA # TODO: ARROW-13852 - # "Row-level metadata is not compatible with this operation and has been ignored" + "Row-level metadata is not compatible with this operation and has been ignored" ) expect_equal(arrange(df_from_ds, int), arrange(df, int), check.attributes = FALSE) @@ -250,3 +249,48 @@ test_that("metadata of list elements (ARROW-10386)", { NA ) }) + +test_that("dplyr with metadata", { + skip_if_not_available("dataset") + + expect_dplyr_equal( + input %>% + collect(), + example_with_metadata + ) + expect_dplyr_equal( + input %>% + select(a) %>% + collect(), + example_with_metadata + ) + expect_dplyr_equal( + input %>% + mutate(z = b * 4) %>% + select(z, a) %>% + collect(), + example_with_metadata + ) + expect_dplyr_equal( + input %>% + mutate(z = nchar(a)) %>% + select(z, a) %>% + collect(), + example_with_metadata + ) + expect_dplyr_equal( + input %>% + group_by(a) %>% + summarize(n()) %>% + collect(), + example_with_metadata + ) + # Same name in output but different data + expect_dplyr_equal( + input %>% + mutate(a = nchar(a)) %>% + select(a) %>% + collect(), + example_with_metadata + ) +}) From 6f013f6da4ef3f6370ea989eaac3a2afda00e7c0 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 20 Sep 2021 15:37:27 -0400 Subject: [PATCH 2/2] Fix a couple of test failures and update metadata tests to testthat 3e --- r/R/dplyr.R | 5 +++++ r/R/query-engine.R | 36 +++++++++++++++++++------------- r/tests/testthat/test-metadata.R | 26 +++++++++++++++-------- 3 files changed, 43 insertions(+), 24 deletions(-) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 199120887b9..c783f3ef60d 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -235,3 +235,8 @@ source_data <- function(x) { } is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query") + +has_aggregation <- function(x) { + # TODO: update with joins (check right side data too) + !is.null(x$aggregations) || (is_collapsed(x) && has_aggregation(x$.data)) +} diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 2b41ac36b94..fff2b2cf6c5 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -27,21 +27,27 @@ do_exec_plan <- function(.data) { tab <- tab[, setdiff(names(tab), final_node$sort$temp_columns), drop = FALSE] } - # Apply any column metadata from the original schema, where appropriate - original_schema <- source_data(.data)$schema - # TODO: do we care about other (non-R) metadata preservation? - # How would we know if it were meaningful? - r_meta <- original_schema$metadata$r - if (!is.null(r_meta)) { - r_meta <- .unserialize_arrow_r_metadata(r_meta) - # Filter r_metadata$columns on columns with name _and_ type match - new_schema <- tab$schema - common_names <- intersect(names(r_meta$columns), names(tab)) - keep <- common_names[ - map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) - ] - r_meta$columns <- r_meta$columns[keep] - tab$metadata$r <- .serialize_arrow_r_metadata(r_meta) + if (ncol(tab)) { + # Apply any column metadata from the original schema, where appropriate + original_schema <- source_data(.data)$schema + # TODO: do we care about other (non-R) metadata preservation? + # How would we know if it were meaningful? + r_meta <- original_schema$metadata$r + if (!is.null(r_meta)) { + r_meta <- .unserialize_arrow_r_metadata(r_meta) + # Filter r_metadata$columns on columns with name _and_ type match + new_schema <- tab$schema + common_names <- intersect(names(r_meta$columns), names(tab)) + keep <- common_names[ + map_lgl(common_names, ~ original_schema[[.]] == new_schema[[.]]) + ] + r_meta$columns <- r_meta$columns[keep] + if (has_aggregation(.data)) { + # dplyr drops top-level attributes if you do summarize + r_meta$attributes <- NULL + } + tab$metadata$r <- .serialize_arrow_r_metadata(r_meta) + } } tab diff --git a/r/tests/testthat/test-metadata.R b/r/tests/testthat/test-metadata.R index f5c8ca8c285..19fd4c68cd7 100644 --- a/r/tests/testthat/test-metadata.R +++ b/r/tests/testthat/test-metadata.R @@ -15,11 +15,11 @@ # specific language governing permissions and limitations # under the License. -context("Schema metadata and R attributes") +# local_edition(3) test_that("Schema metadata", { s <- schema(b = double()) - expect_equivalent(s$metadata, list()) + expect_equal(s$metadata, empty_named_list()) expect_false(s$HasMetadata) s$metadata <- list(test = TRUE) expect_identical(s$metadata, list(test = "TRUE")) @@ -31,7 +31,7 @@ test_that("Schema metadata", { expect_identical(s$metadata, list(test = "TRUE")) expect_true(s$HasMetadata) s$metadata <- NULL - expect_equivalent(s$metadata, list()) + expect_equal(s$metadata, empty_named_list()) expect_false(s$HasMetadata) expect_error( s$metadata <- 4, @@ -41,7 +41,7 @@ test_that("Schema metadata", { test_that("Table metadata", { tab <- Table$create(x = 1:2, y = c("a", "b")) - expect_equivalent(tab$metadata, list()) + expect_equal(tab$metadata, empty_named_list()) tab$metadata <- list(test = TRUE) expect_identical(tab$metadata, list(test = "TRUE")) tab$metadata$foo <- 42 @@ -49,7 +49,7 @@ test_that("Table metadata", { tab$metadata$foo <- NULL expect_identical(tab$metadata, list(test = "TRUE")) tab$metadata <- NULL - expect_equivalent(tab$metadata, list()) + expect_equal(tab$metadata, empty_named_list()) }) test_that("Table R metadata", { @@ -126,7 +126,7 @@ test_that("Metadata serialization compression", { test_that("RecordBatch metadata", { rb <- RecordBatch$create(x = 1:2, y = c("a", "b")) - expect_equivalent(rb$metadata, list()) + expect_equal(rb$metadata, empty_named_list()) rb$metadata <- list(test = TRUE) expect_identical(rb$metadata, list(test = "TRUE")) rb$metadata$foo <- 42 @@ -134,7 +134,7 @@ test_that("RecordBatch metadata", { rb$metadata$foo <- NULL expect_identical(rb$metadata, list(test = "TRUE")) rb$metadata <- NULL - expect_equivalent(rb$metadata, list()) + expect_equal(rb$metadata, empty_named_list()) }) test_that("RecordBatch R metadata", { @@ -212,6 +212,7 @@ test_that("metadata of list elements (ARROW-10386)", { skip_if_not_available("dataset") skip_if_not_available("parquet") + local_edition(3) library(dplyr) df <- tibble::tibble( @@ -241,7 +242,11 @@ test_that("metadata of list elements (ARROW-10386)", { df_from_ds <- collect(ds), "Row-level metadata is not compatible with this operation and has been ignored" ) - expect_equal(arrange(df_from_ds, int), arrange(df, int), check.attributes = FALSE) + expect_equal( + arrange(df_from_ds, int), + arrange(df, int), + ignore_attr = TRUE + ) # however there is *no* warning if we don't select the metadata column expect_warning( @@ -278,6 +283,8 @@ test_that("dplyr with metadata", { collect(), example_with_metadata ) + # dplyr drops top-level attributes if you do summarize, though attributes + # of grouping columns appear to come through expect_dplyr_equal( input %>% group_by(a) %>% @@ -285,7 +292,8 @@ test_that("dplyr with metadata", { collect(), example_with_metadata ) - # Same name in output but different data + # Same name in output but different data, so the column metadata shouldn't + # carry through expect_dplyr_equal( input %>% mutate(a = nchar(a)) %>%