From e5d9e71e376032cb0034615b9742b03c40ee1089 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Sun, 10 Jul 2022 15:31:00 -0400 Subject: [PATCH 1/7] Make query_on_dataset() consider all sources --- r/R/dplyr.R | 23 ++++++--- r/tests/testthat/test-dplyr-query.R | 73 +++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 8018cb5a60e..08d0b621abd 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -264,7 +264,9 @@ abandon_ship <- function(call, .data, msg) { eval.parent(call, 2) } -query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader")) +query_on_dataset <- function(x) { + any(map_lgl(all_sources(x), ~ inherits(., c("Dataset", "RecordBatchReader")))) +} source_data <- function(x) { if (!inherits(x, "arrow_dplyr_query")) { @@ -276,13 +278,22 @@ source_data <- function(x) { } } -is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query") - -has_aggregation <- function(x) { - # TODO: update with joins (check right side data too) - !is.null(x$aggregations) || (is_collapsed(x) && has_aggregation(x$.data)) +all_sources <- function(x) { + if (is.null(x)) { + x + } else if (!inherits(x, "arrow_dplyr_query")) { + list(x) + } else { + c( + all_sources(x$.data), + all_sources(x$join$right_data), + all_sources(x$union_all$right_data) + ) + } } +is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query") + has_head_tail <- function(x) { !is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && has_head_tail(x$.data)) } diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R index d55ed07cfc8..0a5515b70c4 100644 --- a/r/tests/testthat/test-dplyr-query.R +++ b/r/tests/testthat/test-dplyr-query.R @@ -293,3 +293,76 @@ test_that("No duplicate field names are allowed in an arrow_dplyr_query", { ) ) }) + +test_that("all_sources() finds all data sources in a query", { + tab <- Table$create(a = 1) + ds <- InMemoryDataset$create(tab) + expect_equal(all_sources(tab), list(tab)) + expect_equal( + tab %>% + filter(a > 0) %>% + summarize(a = sum(a)) %>% + arrange(desc(a)) %>% + all_sources(), + list(tab) + ) + expect_equal( + tab %>% + filter(a > 0) %>% + union_all(ds) %>% + all_sources(), + list(tab, ds) + ) + + expect_equal( + tab %>% + filter(a > 0) %>% + union_all(ds) %>% + left_join(tab) %>% + all_sources(), + list(tab, ds, tab) + ) + expect_equal( + tab %>% + filter(a > 0) %>% + union_all(left_join(ds, tab)) %>% + left_join(tab) %>% + all_sources(), + list(tab, ds, tab, tab) + ) +}) + +test_that("query_on_dataset() looks at all data sources in a query", { + tab <- Table$create(a = 1) + ds <- InMemoryDataset$create(tab) + expect_false(query_on_dataset(tab)) + expect_true(query_on_dataset(ds)) + expect_false( + tab %>% + filter(a > 0) %>% + summarize(a = sum(a)) %>% + arrange(desc(a)) %>% + query_on_dataset() + ) + expect_true( + tab %>% + filter(a > 0) %>% + union_all(ds) %>% + query_on_dataset() + ) + + expect_true( + tab %>% + filter(a > 0) %>% + union_all(left_join(ds, tab)) %>% + left_join(tab) %>% + query_on_dataset() + ) + expect_false( + tab %>% + filter(a > 0) %>% + union_all(left_join(tab, tab)) %>% + left_join(tab) %>% + query_on_dataset() + ) +}) From 5537e2f58ab5f1b5d52e2a255366aab06a0f2804 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Sun, 10 Jul 2022 14:47:56 -0400 Subject: [PATCH 2/7] dplyr::glimpse() and a few assorted cleanups --- r/DESCRIPTION | 3 + r/NAMESPACE | 2 + r/R/arrow-object.R | 6 +- r/R/arrow-package.R | 3 +- r/R/chunked-array.R | 4 +- r/R/dplyr-count.R | 2 +- r/R/dplyr-glimpse.R | 133 ++++++++++++++++++ r/R/extension.R | 22 +-- r/R/filesystem.R | 1 - r/R/query-engine.R | 5 +- r/tests/testthat/_snaps/dplyr-glimpse.md | 165 +++++++++++++++++++++++ r/tests/testthat/test-chunked-array.txt | 4 + r/tests/testthat/test-data-type.R | 19 +-- r/tests/testthat/test-dplyr-glimpse.R | 96 +++++++++++++ r/tests/testthat/test-extension.R | 2 +- r/tests/testthat/test-schema.R | 11 +- 16 files changed, 437 insertions(+), 41 deletions(-) create mode 100644 r/R/dplyr-glimpse.R create mode 100644 r/tests/testthat/_snaps/dplyr-glimpse.md create mode 100644 r/tests/testthat/test-dplyr-glimpse.R diff --git a/r/DESCRIPTION b/r/DESCRIPTION index 2cbbec054a7..a7408d27d65 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -44,6 +44,7 @@ RoxygenNote: 7.2.0 Config/testthat/edition: 3 VignetteBuilder: knitr Suggests: + cli, DBI, dbplyr, decor, @@ -53,6 +54,7 @@ Suggests: hms, knitr, lubridate, + pillar, pkgload, reticulate, rmarkdown, @@ -103,6 +105,7 @@ Collate: 'dplyr-funcs-type.R' 'expression.R' 'dplyr-funcs.R' + 'dplyr-glimpse.R' 'dplyr-group-by.R' 'dplyr-join.R' 'dplyr-mutate.R' diff --git a/r/NAMESPACE b/r/NAMESPACE index 5762df9eb0c..f680a9602b9 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -452,6 +452,8 @@ importFrom(tidyselect,starts_with) importFrom(tidyselect,vars_pull) importFrom(tidyselect,vars_rename) importFrom(tidyselect,vars_select) +importFrom(utils,capture.output) +importFrom(utils,getFromNamespace) importFrom(utils,head) importFrom(utils,install.packages) importFrom(utils,modifyList) diff --git a/r/R/arrow-object.R b/r/R/arrow-object.R index 0a82f858774..ac067d4aa5f 100644 --- a/r/R/arrow-object.R +++ b/r/R/arrow-object.R @@ -31,14 +31,16 @@ ArrowObject <- R6Class("ArrowObject", } assign(".:xp:.", xp, envir = self) }, - print = function(...) { + class_title = function() { if (!is.null(self$.class_title)) { # Allow subclasses to override just printing the class name first class_title <- self$.class_title() } else { class_title <- class(self)[[1]] } - cat(class_title, "\n", sep = "") + }, + print = function(...) { + cat(self$class_title(), "\n", sep = "") if (!is.null(self$ToString)) { cat(self$ToString(), "\n", sep = "") } diff --git a/r/R/arrow-package.R b/r/R/arrow-package.R index 7b59854f1e1..c6e4434d668 100644 --- a/r/R/arrow-package.R +++ b/r/R/arrow-package.R @@ -41,7 +41,7 @@ "group_vars", "group_by_drop_default", "ungroup", "mutate", "transmute", "arrange", "rename", "pull", "relocate", "compute", "collapse", "distinct", "left_join", "right_join", "inner_join", "full_join", - "semi_join", "anti_join", "count", "tally", "rename_with", "union", "union_all" + "semi_join", "anti_join", "count", "tally", "rename_with", "union", "union_all", "glimpse" ) ) for (cl in c("Dataset", "ArrowTabular", "RecordBatchReader", "arrow_dplyr_query")) { @@ -50,6 +50,7 @@ } } s3_register("dplyr::tbl_vars", "arrow_dplyr_query") + s3_register("pillar::type_sum", "DataType") for (cl in c( "Array", "RecordBatch", "ChunkedArray", "Table", "Schema", diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index 24ca7e6e58a..aaed05a2375 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -119,6 +119,7 @@ ChunkedArray <- R6Class("ChunkedArray", assert_that(!is.na(descending)) # TODO: after ARROW-12042 is closed, review whether this and the # Array$SortIndices definition can be consolidated + # (edit: ARROW-12042 was resolved in 7.0.0) call_function( "sort_indices", self, @@ -132,7 +133,8 @@ ChunkedArray <- R6Class("ChunkedArray", ChunkedArray__Validate(self) }, ToString = function() { - ChunkedArray__ToString(self) + typ <- paste0("<", self$type$ToString(), ">") + paste(typ, ChunkedArray__ToString(self), sep = "\n") }, Equals = function(other, ...) { inherits(other, "ChunkedArray") && ChunkedArray__Equals(self, other) diff --git a/r/R/dplyr-count.R b/r/R/dplyr-count.R index 747212bc7be..50badb84593 100644 --- a/r/R/dplyr-count.R +++ b/r/R/dplyr-count.R @@ -39,7 +39,7 @@ count.Dataset <- count.ArrowTabular <- count.RecordBatchReader <- count.arrow_dp #' @importFrom rlang sym := tally.arrow_dplyr_query <- function(x, wt = NULL, sort = FALSE, name = NULL) { - check_name <- utils::getFromNamespace("check_name", "dplyr") + check_name <- getFromNamespace("check_name", "dplyr") name <- check_name(name, dplyr::group_vars(x)) if (quo_is_null(enquo(wt))) { diff --git a/r/R/dplyr-glimpse.R b/r/R/dplyr-glimpse.R new file mode 100644 index 00000000000..b1c56053e5d --- /dev/null +++ b/r/R/dplyr-glimpse.R @@ -0,0 +1,133 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' @importFrom utils getFromNamespace +glimpse.ArrowTabular <- function(x, + width = getOption("pillar.width", getOption("width")), + ...) { + # We use cli:: and pillar:: throughout this function. We don't need to check + # to see if they're installed because dplyr depends on pillar, which depends + # on cli, and we're only in this function though S3 dispatch on dplyr::glimpse + if (!is.finite(width)) { + abort("`width` must be finite.") + } + + # Even though this is the ArrowTabular method, we use it for arrow_dplyr_query + # too, so let's make some adaptations that aren't covered by S3 methods + if (inherits(x, "arrow_dplyr_query")) { + # TODO(ARROW-16030): encapsulate this + schema <- implicit_schema(x) + class_title <- paste(source_data(x)$class_title(), "(query)") + } else { + schema <- x$schema + class_title <- x$class_title() + } + + cli::cat_line(class_title) + + dims <- dim(x) + # We need a couple of internal functions in pillar for formatting + pretty_int <- getFromNamespace("big_mark", "pillar") + cli::cat_line(sprintf( + "%s rows x %s columns", pretty_int(dims[1]), pretty_int(dims[2]) + )) + + if (dims[2] == 0) { + return(invisible(x)) + } + + var_types <- map_chr(schema$fields, ~ format(pillar::new_pillar_type(.$type))) + # note: pillar:::tick_if_needed() is in glimplse.tbl() + var_headings <- paste("$", center_pad(names(x), var_types)) + + nrows <- as.integer(width / 3) + df <- as.data.frame(head(x, nrows)) + formatted_data <- map_chr(df, function(.) { + tryCatch( + paste(pillar::format_glimpse(.), collapse = ", "), + # This could error e.g. if you have a VctrsExtensionType and the package + # that defines methods for the data is not loaded + error = function(e) conditionMessage(e) + ) + }) + + data_width <- width - pillar::get_extent(var_headings) + make_shorter <- getFromNamespace("str_trunc", "pillar") + truncated_data <- make_shorter(formatted_data, data_width) + + cli::cat_line(var_headings, " ", truncated_data) + if (inherits(x, "arrow_dplyr_query")) { + cli::cat_line("Call `print()` for query details") + } else if (any(grepl("<...>", var_types, fixed = TRUE)) || schema$HasMetadata) { + # TODO: use crayon to style? + # TODO(ARROW-16030): this could point to the schema method + cli::cat_line("Call `print()` for full schema details") + } + invisible(x) +} + +# Dataset has an efficient head() method via Scanner so this is fine +glimpse.Dataset <- glimpse.ArrowTabular + +glimpse.arrow_dplyr_query <- function(x, + width = getOption("pillar.width", getOption("width")), + ...) { + source <- source_data(x) + # TODO(ARROW-XXXXX): this should check for RBRs in other source nodes too + if (inherits(source, "RecordBatchReader")) { + message("Cannot glimpse() data from a RecordBatchReader because it can only be read one time. Call `compute()` to evaluate the query first.") + print(x) + } else if (query_on_dataset(x) && (is_collapsed(x) || has_aggregation(x) || length(x$arrange_vars))) { + # We allow queries that just select/filter/mutate because those stream, + # no arrange/summarize/join/etc. because those require full scans. + # TODO: tangentially related, test that head %>% head is handled correctly + message("This query requires a full table scan, so glimpse() may be expensive. Call `compute()` to evaluate the query first.") + print(x) + } else { + # Go for it + glimpse.ArrowTabular(x, width = width, ...) + } +} + +glimpse.RecordBatchReader <- function(x, + width = getOption("pillar.width", getOption("width")), + ...) { + # TODO(ARROW-YYYYY): to_arrow() on duckdb con should hold con not RBR so it can be run more than onces (like duckdb does on the other side) + message("Cannot glimpse() data from a RecordBatchReader because it can only be read one time; call `as_arrow_table()` to consume it first") + print(x) +} + +glimpse.ArrowDatum <- function(x, width, ...) { + cli::cat_line(gsub("[ \n]+", " ", x$ToString())) +} + +type_sum.DataType <- function(x) { + if (inherits(x, "VctrsExtensionType")) { + # ptype() holds a vctrs type object, which pillar knows how to format + paste0("ext<", pillar::type_sum(x$ptype()), ">") + } else { + # Trim long type names with <...> + sub("<.*>", "<...>", x$ToString()) + } +} + +center_pad <- function(left, right) { + left_sizes <- pillar::get_extent(left) + right_sizes <- pillar::get_extent(right) + total_width <- max(left_sizes + right_sizes) + 1L + paste0(left, strrep(" ", total_width - left_sizes - right_sizes), right) +} diff --git a/r/R/extension.R b/r/R/extension.R index e31f4934a7d..be492c845fa 100644 --- a/r/R/extension.R +++ b/r/R/extension.R @@ -193,7 +193,7 @@ ExtensionType <- R6Class("ExtensionType", sprintf( "<%s %s...>", class(self)[1], - paste(format(utils::head(metadata_raw, 20)), collapse = " ") + paste(format(head(metadata_raw, 20)), collapse = " ") ) } else { sprintf( @@ -420,31 +420,19 @@ unregister_extension_type <- function(extension_name) { arrow__UnregisterRExtensionType(extension_name) } +#' @importFrom utils capture.output VctrsExtensionType <- R6Class("VctrsExtensionType", inherit = ExtensionType, public = list( - ptype = function() { - private$.ptype - }, + ptype = function() private$.ptype, ToString = function() { - tf <- tempfile() - sink(tf) - on.exit({ - sink(NULL) - unlink(tf) - }) - print(self$ptype()) - paste0(readLines(tf), collapse = "\n") + paste0(capture.output(print(self$ptype())), collapse = "\n") }, deserialize_instance = function() { private$.ptype <- unserialize(self$extension_metadata()) }, ExtensionEquals = function(other) { - if (!inherits(other, "VctrsExtensionType")) { - return(FALSE) - } - - identical(self$ptype(), other$ptype()) + inherits(other, "VctrsExtensionType") && identical(self$ptype(), other$ptype()) }, as_vector = function(extension_array) { if (inherits(extension_array, "ChunkedArray")) { diff --git a/r/R/filesystem.R b/r/R/filesystem.R index 75997431a43..3cebbc30c85 100644 --- a/r/R/filesystem.R +++ b/r/R/filesystem.R @@ -451,7 +451,6 @@ s3_bucket <- function(bucket, ...) { #' @usage NULL #' @format NULL #' @rdname FileSystem -#' @importFrom utils modifyList #' @export GcsFileSystem <- R6Class("GcsFileSystem", inherit = FileSystem diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 513b861d414..9dac2651037 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -226,9 +226,10 @@ ExecPlan <- R6Class("ExecPlan", slice_size <- node$extras$head %||% node$extras$tail if (!is.null(slice_size)) { out <- head(out, slice_size) + # We already have everything we need for the head, so StopProducing + # TODO: close ARROW-14329 since this is working now + self$Stop() } - # Can we now tell `self$Stop()` to StopProducing? We already have - # everything we need for the head (but it seems to segfault: ARROW-14329) } else if (!is.null(node$extras$tail)) { # TODO(ARROW-16630): proper BottomK support # Reverse the row order to get back what we expect diff --git a/r/tests/testthat/_snaps/dplyr-glimpse.md b/r/tests/testthat/_snaps/dplyr-glimpse.md new file mode 100644 index 00000000000..ba2aeaf3166 --- /dev/null +++ b/r/tests/testthat/_snaps/dplyr-glimpse.md @@ -0,0 +1,165 @@ +# glimpse() Table/ChunkedArray + + Code + glimpse(tab) + Output + Table + 10 rows x 7 columns + $ int 1, 2, 3, NA, 5, 6, 7, 8, 9, 10 + $ dbl 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, NA, 10.1 + $ dbl2 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 + $ lgl TRUE, NA, TRUE, FALSE, TRUE, NA, NA, FALSE, FALSE, NA + $ false FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, ~ + $ chr "a", "b", "c", "d", "e", NA, "g", "h", "i", "j" + $ fct > a, b, c, d, NA, NA, g, h, i, j + Call `print()` for full schema details + +--- + + Code + glimpse(tab$chr) + Output + [ [ "a", "b", "c", "d", "e", null, "g", "h", "i", "j" ] ] + +# glimpse() RecordBatch/Array + + Code + glimpse(batch) + Output + RecordBatch + 10 rows x 7 columns + $ int 1, 2, 3, NA, 5, 6, 7, 8, 9, 10 + $ dbl 1.1, 2.1, 3.1, 4.1, 5.1, 6.1, 7.1, 8.1, NA, 10.1 + $ dbl2 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 + $ lgl TRUE, NA, TRUE, FALSE, TRUE, NA, NA, FALSE, FALSE, NA + $ false FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, FALSE, ~ + $ chr "a", "b", "c", "d", "e", NA, "g", "h", "i", "j" + $ fct > a, b, c, d, NA, NA, g, h, i, j + Call `print()` for full schema details + +--- + + Code + glimpse(batch$int) + Output + [ 1, 2, 3, null, 5, 6, 7, 8, 9, 10 ] + +# glimpse() with VctrsExtensionType + + Code + glimpse(haven) + Output + Table + 2 rows x 3 columns + $ num 5.1, 4.9 + $ cat_int > 3, 1 + $ cat_chr > Can't convert `x` to . + Call `print()` for full schema details + +--- + + Code + glimpse(haven[[3]]) + Output + <> [ [ "B", "B" ] ] + +# glimpse prints message about schema if there are complex types + + Code + glimpse(dictionary_but_no_metadata) + Output + Table + 5 rows x 2 columns + $ a 1, 2, 3, 4, 5 + $ b > 1, 2, 3, 4, 5 + Call `print()` for full schema details + +--- + + Code + glimpse(Table$create(a = 1)) + Output + Table + 1 rows x 1 columns + $ a 1 + +# glimpse() calls print() instead of showing data for RBR + + Code + example_data %>% as_record_batch_reader() %>% glimpse() + Message + Cannot glimpse() data from a RecordBatchReader because it can only be read one time; call `as_arrow_table()` to consume it first + Output + RecordBatchReader + int: int32 + dbl: double + dbl2: double + lgl: bool + false: bool + chr: string + fct: dictionary + +--- + + Code + example_data %>% as_record_batch_reader() %>% select(int) %>% glimpse() + Message + Cannot glimpse() data from a RecordBatchReader because it can only be read one time. Call `compute()` to evaluate the query first. + Output + RecordBatchReader (query) + int: int32 + + See $.data for the source Arrow object + +# glimpse() on Dataset + + Code + glimpse(ds) + Output + FileSystemDataset with 2 Parquet files + 20 rows x 7 columns + $ int 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 101, 102, 103, 104, ~ + $ dbl 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 51, 52, 53, 54, 55, ~ + $ lgl TRUE, FALSE, NA, TRUE, FALSE, TRUE, FALSE, NA, TRUE~ + $ chr "a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "~ + $ fct > A, B, C, D, E, F, G, H, I, J, J, I, H, G, F, E, D, ~ + $ ts 2015-04-30 03:12:39, 2015-05-01 03:12:39, 2015-05-0~ + $ group 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, ~ + Call `print()` for full schema details + +# glimpse() on Dataset query only shows data for streaming eval + + Code + ds %>% select(int, chr) %>% filter(int > 2) %>% mutate(twice = int * 2) %>% + glimpse() + Output + FileSystemDataset with 2 Parquet files (query) + 18 rows x 3 columns + $ int 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 3, 4, 5, 6, 7~ + $ chr "j", "i", "h", "g", "f", "e", "d", "c", "b", "a", "c", "d", "e"~ + $ twice 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 6, 8, 10, 12,~ + Call `print()` for query details + +--- + + Code + ds %>% summarize(max(int)) %>% glimpse() + Message + This query requires a full table scan, so glimpse() may be expensive. Call `compute()` to evaluate the query first. + Output + FileSystemDataset (query) + max(int): int32 + + See $.data for the source Arrow object + +# glimpse() on in-memory query shows data even if aggregating + + Code + example_data %>% arrow_table() %>% summarize(sum(int, na.rm = TRUE)) %>% + glimpse() + Output + Table (query) + ?? rows x 1 columns + $ sum(int, na.rm = TRUE) 51 + Call `print()` for query details + diff --git a/r/tests/testthat/test-chunked-array.txt b/r/tests/testthat/test-chunked-array.txt index c7101359d76..e2a691a99e9 100644 --- a/r/tests/testthat/test-chunked-array.txt +++ b/r/tests/testthat/test-chunked-array.txt @@ -1,5 +1,6 @@ > chunked_array(c(1, 2, 3), c(4, 5, 6)) ChunkedArray + [ [ 1, @@ -15,6 +16,7 @@ ChunkedArray > chunked_array(1:30, c(4, 5, 6)) ChunkedArray + [ [ 1, @@ -48,6 +50,7 @@ ChunkedArray > chunked_array(1:30) ChunkedArray + [ [ 1, @@ -76,6 +79,7 @@ ChunkedArray > chunked_array(factor(c("a", "b")), factor(c("c", "d"))) ChunkedArray +> [ -- dictionary: diff --git a/r/tests/testthat/test-data-type.R b/r/tests/testthat/test-data-type.R index 88333fb314b..16fcf8e0a38 100644 --- a/r/tests/testthat/test-data-type.R +++ b/r/tests/testthat/test-data-type.R @@ -593,15 +593,16 @@ test_that("DataType$code()", { expect_code_roundtrip(dictionary(index_type = int8(), value_type = large_utf8())) expect_code_roundtrip(dictionary(index_type = int8(), ordered = TRUE)) - skip("until rlang 1.0") - expect_snapshot({ - (expect_error( - DayTimeInterval__initialize()$code() - )) - (expect_error( - struct(a = DayTimeInterval__initialize())$code() - )) - }) + skip_if(packageVersion("rlang") < 1) + # Are these unsupported for a reason? + expect_error( + eval(DayTimeInterval__initialize()$code()), + "Unsupported type" + ) + expect_error( + eval(struct(a = DayTimeInterval__initialize())$code()), + "Unsupported type" + ) }) test_that("as_data_type() works for DataType", { diff --git a/r/tests/testthat/test-dplyr-glimpse.R b/r/tests/testthat/test-dplyr-glimpse.R new file mode 100644 index 00000000000..db9d786b4d1 --- /dev/null +++ b/r/tests/testthat/test-dplyr-glimpse.R @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +library(dplyr, warn.conflicts = FALSE) + +test_that("glimpse() Table/ChunkedArray", { + tab <- Table$create(example_data) + expect_snapshot(glimpse(tab)) + expect_snapshot(glimpse(tab$chr)) +}) + +test_that("glimpse() RecordBatch/Array", { + batch <- RecordBatch$create(example_data) + expect_snapshot(glimpse(batch)) + expect_snapshot(glimpse(batch$int)) +}) + +test_that("glimpse() with VctrsExtensionType", { + haven <- Table$create(haven_data) + expect_snapshot(glimpse(haven)) + expect_snapshot(glimpse(haven[[3]])) +}) + +test_that("glimpse prints message about schema if there are complex types", { + dictionary_but_no_metadata <- Table$create(a = 1:5, b = factor(1:5)) + expect_snapshot(glimpse(dictionary_but_no_metadata)) + # No message here + expect_snapshot(glimpse(Table$create(a = 1))) +}) + +test_that("glimpse() calls print() instead of showing data for RBR", { + expect_snapshot({ + example_data %>% + as_record_batch_reader() %>% + glimpse() + }) + expect_snapshot({ + example_data %>% + as_record_batch_reader() %>% + select(int) %>% + glimpse() + }) +}) + +skip_if_not_available("dataset") +big_df <- rbind( + cbind(df1, group = 1), + cbind(df2, group = 2) +) +ds_dir <- make_temp_dir() +write_dataset(big_df, ds_dir, partitioning = "group") + +ds <- open_dataset(ds_dir) + +test_that("glimpse() on Dataset", { + expect_snapshot(glimpse(ds)) +}) + +test_that("glimpse() on Dataset query only shows data for streaming eval", { + expect_snapshot({ + ds %>% + select(int, chr) %>% + filter(int > 2) %>% + mutate(twice = int * 2) %>% + glimpse() + }) + + expect_snapshot({ + ds %>% + summarize(max(int)) %>% + glimpse() + }) +}) + +test_that("glimpse() on in-memory query shows data even if aggregating", { + expect_snapshot({ + example_data %>% + arrow_table() %>% + summarize(sum(int, na.rm = TRUE)) %>% + glimpse() + }) +}) diff --git a/r/tests/testthat/test-extension.R b/r/tests/testthat/test-extension.R index a4a37c91273..638869dc8c3 100644 --- a/r/tests/testthat/test-extension.R +++ b/r/tests/testthat/test-extension.R @@ -183,7 +183,7 @@ test_that("vctrs extension type works", { expect_r6_class(type, "VctrsExtensionType") expect_identical(type$ptype(), vctrs::vec_ptype(custom_vctr)) expect_true(type$Equals(type)) - expect_match(type$ToString(), "arrow_custom_test") + expect_identical(type$ToString(), "") array_in <- vctrs_extension_array(custom_vctr) expect_true(array_in$type$Equals(type)) diff --git a/r/tests/testthat/test-schema.R b/r/tests/testthat/test-schema.R index c7046de3cb8..3a35569f7fa 100644 --- a/r/tests/testthat/test-schema.R +++ b/r/tests/testthat/test-schema.R @@ -43,12 +43,11 @@ test_that("Schema$code()", { schema(a = int32(), b = struct(c = double(), d = utf8()), e = list_of(binary())) ) - skip("until rlang 1.0") - expect_snapshot({ - (expect_error( - schema(x = int32(), y = DayTimeInterval__initialize())$code() - )) - }) + skip_if(packageVersion("rlang") < 1) + expect_error( + eval(schema(x = int32(), y = DayTimeInterval__initialize())$code()), + "Unsupported type" + ) }) test_that("Schema with non-nullable fields", { From 2382b882d1c8628f22c9c8de1bfdbf7c055851d4 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Sun, 10 Jul 2022 16:16:26 -0400 Subject: [PATCH 3/7] Better determine when it is safe to glimpse --- r/R/dplyr-glimpse.R | 8 +-- r/R/dplyr.R | 26 +++++++++ r/tests/testthat/_snaps/dplyr-glimpse.md | 13 ----- r/tests/testthat/test-dplyr-glimpse.R | 12 +++-- r/tests/testthat/test-dplyr-query.R | 67 ++++++++++++++++++++++++ 5 files changed, 104 insertions(+), 22 deletions(-) diff --git a/r/R/dplyr-glimpse.R b/r/R/dplyr-glimpse.R index b1c56053e5d..a29d457493b 100644 --- a/r/R/dplyr-glimpse.R +++ b/r/R/dplyr-glimpse.R @@ -86,14 +86,10 @@ glimpse.Dataset <- glimpse.ArrowTabular glimpse.arrow_dplyr_query <- function(x, width = getOption("pillar.width", getOption("width")), ...) { - source <- source_data(x) - # TODO(ARROW-XXXXX): this should check for RBRs in other source nodes too - if (inherits(source, "RecordBatchReader")) { + if (any(map_lgl(all_sources(x), ~ inherits(., "RecordBatchReader")))) { message("Cannot glimpse() data from a RecordBatchReader because it can only be read one time. Call `compute()` to evaluate the query first.") print(x) - } else if (query_on_dataset(x) && (is_collapsed(x) || has_aggregation(x) || length(x$arrange_vars))) { - # We allow queries that just select/filter/mutate because those stream, - # no arrange/summarize/join/etc. because those require full scans. + } else if (query_on_dataset(x) && !query_can_stream(x)) { # TODO: tangentially related, test that head %>% head is handled correctly message("This query requires a full table scan, so glimpse() may be expensive. Call `compute()` to evaluate the query first.") print(x) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 08d0b621abd..b048d98018a 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -292,6 +292,32 @@ all_sources <- function(x) { } } +query_can_stream <- function(x) { + # Queries that just select/filter/mutate can stream: + # you can take head() without evaluating over the whole dataset + if (inherits(x, "arrow_dplyr_query")) { + # Aggregations require all of the data + is.null(x$aggregations) && + # Sorting does too + length(x$arrange_vars) == 0 && + # Joins are ok as long as the right-side data is in memory + # (we have to hash the whole dataset to join it) + !query_on_dataset(x$join$right_data) && + # But need to check that this non-dataset join can stream + query_can_stream(x$join$right_data) && + # Also check that any unioned datasets also can stream + query_can_stream(x$union_all$right_data) && + # Recursively check any queries that have been collapsed + query_can_stream(x$.data) + } else { + # Not a query, so it must be a Table/Dataset (or NULL) + # Note that if you have a RecordBatchReader, you *can* stream, + # but the reader is consumed. If that's a problem, you should check + # for RBRs outside of this function. + TRUE + } +} + is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query") has_head_tail <- function(x) { diff --git a/r/tests/testthat/_snaps/dplyr-glimpse.md b/r/tests/testthat/_snaps/dplyr-glimpse.md index ba2aeaf3166..de8bdc13a37 100644 --- a/r/tests/testthat/_snaps/dplyr-glimpse.md +++ b/r/tests/testthat/_snaps/dplyr-glimpse.md @@ -129,19 +129,6 @@ # glimpse() on Dataset query only shows data for streaming eval - Code - ds %>% select(int, chr) %>% filter(int > 2) %>% mutate(twice = int * 2) %>% - glimpse() - Output - FileSystemDataset with 2 Parquet files (query) - 18 rows x 3 columns - $ int 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 3, 4, 5, 6, 7~ - $ chr "j", "i", "h", "g", "f", "e", "d", "c", "b", "a", "c", "d", "e"~ - $ twice 202, 204, 206, 208, 210, 212, 214, 216, 218, 220, 6, 8, 10, 12,~ - Call `print()` for query details - ---- - Code ds %>% summarize(max(int)) %>% glimpse() Message diff --git a/r/tests/testthat/test-dplyr-glimpse.R b/r/tests/testthat/test-dplyr-glimpse.R index db9d786b4d1..d39fef9e82c 100644 --- a/r/tests/testthat/test-dplyr-glimpse.R +++ b/r/tests/testthat/test-dplyr-glimpse.R @@ -71,14 +71,20 @@ test_that("glimpse() on Dataset", { }) test_that("glimpse() on Dataset query only shows data for streaming eval", { - expect_snapshot({ + # Because dataset scan row order is not deterministic, we can't snapshot + # the whole output. Instead check for an indication that glimpse method ran + # instead of the regular print() method that is the fallback + expect_output( ds %>% select(int, chr) %>% filter(int > 2) %>% mutate(twice = int * 2) %>% - glimpse() - }) + glimpse(), + "Call `print()` for query details", + fixed = TRUE + ) + # This doesn't show the data and falls back to print() expect_snapshot({ ds %>% summarize(max(int)) %>% diff --git a/r/tests/testthat/test-dplyr-query.R b/r/tests/testthat/test-dplyr-query.R index 0a5515b70c4..62633eb1d63 100644 --- a/r/tests/testthat/test-dplyr-query.R +++ b/r/tests/testthat/test-dplyr-query.R @@ -366,3 +366,70 @@ test_that("query_on_dataset() looks at all data sources in a query", { query_on_dataset() ) }) + +test_that("query_can_stream()", { + tab <- Table$create(a = 1) + ds <- InMemoryDataset$create(tab) + expect_true(query_can_stream(tab)) + expect_true(query_can_stream(ds)) + expect_true(query_can_stream(NULL)) + expect_true( + ds %>% + filter(a > 0) %>% + query_can_stream() + ) + expect_false( + tab %>% + filter(a > 0) %>% + arrange(desc(a)) %>% + query_can_stream() + ) + expect_false( + tab %>% + filter(a > 0) %>% + summarize(a = sum(a)) %>% + query_can_stream() + ) + expect_true( + tab %>% + filter(a > 0) %>% + union_all(ds) %>% + query_can_stream() + ) + expect_false( + tab %>% + filter(a > 0) %>% + union_all(summarize(ds, a = sum(a))) %>% + query_can_stream() + ) + + expect_true( + tab %>% + filter(a > 0) %>% + union_all(left_join(ds, tab)) %>% + left_join(tab) %>% + query_can_stream() + ) + expect_true( + tab %>% + filter(a > 0) %>% + union_all(left_join(tab, tab)) %>% + left_join(tab) %>% + query_can_stream() + ) + expect_false( + tab %>% + filter(a > 0) %>% + union_all(left_join(tab, tab)) %>% + left_join(ds) %>% + query_can_stream() + ) + expect_false( + tab %>% + filter(a > 0) %>% + arrange(a) %>% + union_all(left_join(tab, tab)) %>% + left_join(tab) %>% + query_can_stream() + ) +}) From c8971a870b0f6507bf4d716a477eee406729c678 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 11 Jul 2022 09:37:50 -0400 Subject: [PATCH 4/7] Cleanup TODOs and add more comments --- r/R/dplyr-glimpse.R | 71 +++++++++++++++++------- r/R/query-engine.R | 1 - r/tests/testthat/_snaps/dplyr-glimpse.md | 2 +- 3 files changed, 52 insertions(+), 22 deletions(-) diff --git a/r/R/dplyr-glimpse.R b/r/R/dplyr-glimpse.R index a29d457493b..ef609df4c7a 100644 --- a/r/R/dplyr-glimpse.R +++ b/r/R/dplyr-glimpse.R @@ -19,6 +19,8 @@ glimpse.ArrowTabular <- function(x, width = getOption("pillar.width", getOption("width")), ...) { + # This function is inspired by pillar:::glimpse.tbl(), with some adaptations + # We use cli:: and pillar:: throughout this function. We don't need to check # to see if they're installed because dplyr depends on pillar, which depends # on cli, and we're only in this function though S3 dispatch on dplyr::glimpse @@ -26,22 +28,21 @@ glimpse.ArrowTabular <- function(x, abort("`width` must be finite.") } + # We need a couple of internal functions in pillar for formatting + pretty_int <- getFromNamespace("big_mark", "pillar") + make_shorter <- getFromNamespace("str_trunc", "pillar") + tickify <- getFromNamespace("tick_if_needed", "pillar") + # Even though this is the ArrowTabular method, we use it for arrow_dplyr_query - # too, so let's make some adaptations that aren't covered by S3 methods + # so make some accommodations. (Others are handled by S3 method dispatch.) if (inherits(x, "arrow_dplyr_query")) { - # TODO(ARROW-16030): encapsulate this - schema <- implicit_schema(x) class_title <- paste(source_data(x)$class_title(), "(query)") } else { - schema <- x$schema class_title <- x$class_title() } - cli::cat_line(class_title) dims <- dim(x) - # We need a couple of internal functions in pillar for formatting - pretty_int <- getFromNamespace("big_mark", "pillar") cli::cat_line(sprintf( "%s rows x %s columns", pretty_int(dims[1]), pretty_int(dims[2]) )) @@ -50,12 +51,27 @@ glimpse.ArrowTabular <- function(x, return(invisible(x)) } + nrows <- as.integer(width / 3) + head_tab <- dplyr::compute(head(x, nrows)) + # Take the schema from this Table because if x is arrow_dplyr_query, some + # output types could be a best guess (in implicit_schema()). + schema <- head_tab$schema + + # Assemble the column names and types + # We use the Arrow type names here. See type_sum.DataType() below. var_types <- map_chr(schema$fields, ~ format(pillar::new_pillar_type(.$type))) - # note: pillar:::tick_if_needed() is in glimplse.tbl() - var_headings <- paste("$", center_pad(names(x), var_types)) + # glimpse.tbl() left-aligns the var names (pads with whitespace to the right) + # and appends the types next to them. Because those type names are + # aggressively truncated to all be roughly the same length, this means the + # data glimpse that follows is also mostly aligned. + # However, Arrow type names are longer and variable length, and we're only + # truncating the nested type information inside of <...>. So, to keep the + # data glimpses aligned, we "justify" align the name and type: add the padding + # whitespace between them so that the total width is equal. + var_headings <- paste("$", center_pad(tickify(names(x)), var_types)) - nrows <- as.integer(width / 3) - df <- as.data.frame(head(x, nrows)) + # Assemble the data glimpse + df <- as.data.frame(head_tab) formatted_data <- map_chr(df, function(.) { tryCatch( paste(pillar::format_glimpse(.), collapse = ", "), @@ -64,17 +80,19 @@ glimpse.ArrowTabular <- function(x, error = function(e) conditionMessage(e) ) }) - + # Here and elsewhere in the glimpse code, you have to use pillar::get_extent() + # instead of nchar() because get_extent knows how to deal with ANSI escapes + # etc.--it counts how much space on the terminal will be taken when printed. data_width <- width - pillar::get_extent(var_headings) - make_shorter <- getFromNamespace("str_trunc", "pillar") truncated_data <- make_shorter(formatted_data, data_width) + # Print the table body (var name, type, data glimpse) cli::cat_line(var_headings, " ", truncated_data) + + # TODO: use crayon to style these footers? if (inherits(x, "arrow_dplyr_query")) { cli::cat_line("Call `print()` for query details") } else if (any(grepl("<...>", var_types, fixed = TRUE)) || schema$HasMetadata) { - # TODO: use crayon to style? - # TODO(ARROW-16030): this could point to the schema method cli::cat_line("Call `print()` for full schema details") } invisible(x) @@ -87,11 +105,18 @@ glimpse.arrow_dplyr_query <- function(x, width = getOption("pillar.width", getOption("width")), ...) { if (any(map_lgl(all_sources(x), ~ inherits(., "RecordBatchReader")))) { - message("Cannot glimpse() data from a RecordBatchReader because it can only be read one time. Call `compute()` to evaluate the query first.") + msg <- paste( + "Cannot glimpse() data from a RecordBatchReader because it can only be", + "read one time. Call `compute()` to evaluate the query first." + ) + message(msg) print(x) } else if (query_on_dataset(x) && !query_can_stream(x)) { - # TODO: tangentially related, test that head %>% head is handled correctly - message("This query requires a full table scan, so glimpse() may be expensive. Call `compute()` to evaluate the query first.") + msg <- paste( + "This query requires a full table scan, so glimpse() may be", + "expensive. Call `compute()` to evaluate the query first." + ) + message(msg) print(x) } else { # Go for it @@ -102,13 +127,19 @@ glimpse.arrow_dplyr_query <- function(x, glimpse.RecordBatchReader <- function(x, width = getOption("pillar.width", getOption("width")), ...) { - # TODO(ARROW-YYYYY): to_arrow() on duckdb con should hold con not RBR so it can be run more than onces (like duckdb does on the other side) - message("Cannot glimpse() data from a RecordBatchReader because it can only be read one time; call `as_arrow_table()` to consume it first") + # TODO(ARROW-17038): to_arrow() on duckdb con should hold con not RBR so it + # can be run more than once (like duckdb does on the other side) + msg <- paste( + "Cannot glimpse() data from a RecordBatchReader because it can only be", + "read one time; call `as_arrow_table()` to consume it first" + ) + message(msg) print(x) } glimpse.ArrowDatum <- function(x, width, ...) { cli::cat_line(gsub("[ \n]+", " ", x$ToString())) + invisible(x) } type_sum.DataType <- function(x) { diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 9dac2651037..511bf3dbc27 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -227,7 +227,6 @@ ExecPlan <- R6Class("ExecPlan", if (!is.null(slice_size)) { out <- head(out, slice_size) # We already have everything we need for the head, so StopProducing - # TODO: close ARROW-14329 since this is working now self$Stop() } } else if (!is.null(node$extras$tail)) { diff --git a/r/tests/testthat/_snaps/dplyr-glimpse.md b/r/tests/testthat/_snaps/dplyr-glimpse.md index de8bdc13a37..58fee120764 100644 --- a/r/tests/testthat/_snaps/dplyr-glimpse.md +++ b/r/tests/testthat/_snaps/dplyr-glimpse.md @@ -147,6 +147,6 @@ Output Table (query) ?? rows x 1 columns - $ sum(int, na.rm = TRUE) 51 + $ `sum(int, na.rm = TRUE)` 51 Call `print()` for query details From c94f61fe5bdcb4e7a5621ea82ad167d4e786fbf8 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 11 Jul 2022 11:59:46 -0400 Subject: [PATCH 5/7] Update r/R/dplyr-glimpse.R Co-authored-by: Will Jones --- r/R/dplyr-glimpse.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/dplyr-glimpse.R b/r/R/dplyr-glimpse.R index ef609df4c7a..8a70f4c5b7b 100644 --- a/r/R/dplyr-glimpse.R +++ b/r/R/dplyr-glimpse.R @@ -131,7 +131,7 @@ glimpse.RecordBatchReader <- function(x, # can be run more than once (like duckdb does on the other side) msg <- paste( "Cannot glimpse() data from a RecordBatchReader because it can only be", - "read one time; call `as_arrow_table()` to consume it first" + "read one time; call `as_arrow_table()` to consume it first." ) message(msg) print(x) From 4d957e75fe0b97ca9a79b3dd1026f10dc68db542 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 11 Jul 2022 13:50:04 -0400 Subject: [PATCH 6/7] Update r/tests/testthat/_snaps/dplyr-glimpse.md --- r/tests/testthat/_snaps/dplyr-glimpse.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/tests/testthat/_snaps/dplyr-glimpse.md b/r/tests/testthat/_snaps/dplyr-glimpse.md index 58fee120764..6daca0850de 100644 --- a/r/tests/testthat/_snaps/dplyr-glimpse.md +++ b/r/tests/testthat/_snaps/dplyr-glimpse.md @@ -88,7 +88,7 @@ Code example_data %>% as_record_batch_reader() %>% glimpse() Message - Cannot glimpse() data from a RecordBatchReader because it can only be read one time; call `as_arrow_table()` to consume it first + Cannot glimpse() data from a RecordBatchReader because it can only be read one time; call `as_arrow_table()` to consume it first. Output RecordBatchReader int: int32 From 3270842a7ee99ddddf2c7ab67f494deb109b19f7 Mon Sep 17 00:00:00 2001 From: Neal Richardson Date: Mon, 11 Jul 2022 18:44:53 -0400 Subject: [PATCH 7/7] Update r/R/chunked-array.R --- r/R/chunked-array.R | 1 - 1 file changed, 1 deletion(-) diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index aaed05a2375..d51d9cc5393 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -119,7 +119,6 @@ ChunkedArray <- R6Class("ChunkedArray", assert_that(!is.na(descending)) # TODO: after ARROW-12042 is closed, review whether this and the # Array$SortIndices definition can be consolidated - # (edit: ARROW-12042 was resolved in 7.0.0) call_function( "sort_indices", self,