Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions r/DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ RoxygenNote: 7.2.0
Config/testthat/edition: 3
VignetteBuilder: knitr
Suggests:
cli,
DBI,
dbplyr,
decor,
Expand All @@ -53,6 +54,7 @@ Suggests:
hms,
knitr,
lubridate,
pillar,
pkgload,
reticulate,
rmarkdown,
Expand Down Expand Up @@ -103,6 +105,7 @@ Collate:
'dplyr-funcs-type.R'
'expression.R'
'dplyr-funcs.R'
'dplyr-glimpse.R'
'dplyr-group-by.R'
'dplyr-join.R'
'dplyr-mutate.R'
Expand Down
2 changes: 2 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,8 @@ importFrom(tidyselect,starts_with)
importFrom(tidyselect,vars_pull)
importFrom(tidyselect,vars_rename)
importFrom(tidyselect,vars_select)
importFrom(utils,capture.output)
importFrom(utils,getFromNamespace)
importFrom(utils,head)
importFrom(utils,install.packages)
importFrom(utils,modifyList)
Expand Down
6 changes: 4 additions & 2 deletions r/R/arrow-object.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,16 @@ ArrowObject <- R6Class("ArrowObject",
}
assign(".:xp:.", xp, envir = self)
},
print = function(...) {
class_title = function() {
if (!is.null(self$.class_title)) {
# Allow subclasses to override just printing the class name first
class_title <- self$.class_title()
} else {
class_title <- class(self)[[1]]
}
cat(class_title, "\n", sep = "")
},
print = function(...) {
cat(self$class_title(), "\n", sep = "")
if (!is.null(self$ToString)) {
cat(self$ToString(), "\n", sep = "")
}
Expand Down
3 changes: 2 additions & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"group_vars", "group_by_drop_default", "ungroup", "mutate", "transmute",
"arrange", "rename", "pull", "relocate", "compute", "collapse",
"distinct", "left_join", "right_join", "inner_join", "full_join",
"semi_join", "anti_join", "count", "tally", "rename_with", "union", "union_all"
"semi_join", "anti_join", "count", "tally", "rename_with", "union", "union_all", "glimpse"
)
)
for (cl in c("Dataset", "ArrowTabular", "RecordBatchReader", "arrow_dplyr_query")) {
Expand All @@ -50,6 +50,7 @@
}
}
s3_register("dplyr::tbl_vars", "arrow_dplyr_query")
s3_register("pillar::type_sum", "DataType")

for (cl in c(
"Array", "RecordBatch", "ChunkedArray", "Table", "Schema",
Expand Down
3 changes: 2 additions & 1 deletion r/R/chunked-array.R
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ ChunkedArray <- R6Class("ChunkedArray",
ChunkedArray__Validate(self)
},
ToString = function() {
ChunkedArray__ToString(self)
typ <- paste0("<", self$type$ToString(), ">")
paste(typ, ChunkedArray__ToString(self), sep = "\n")
},
Equals = function(other, ...) {
inherits(other, "ChunkedArray") && ChunkedArray__Equals(self, other)
Expand Down
2 changes: 1 addition & 1 deletion r/R/dplyr-count.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ count.Dataset <- count.ArrowTabular <- count.RecordBatchReader <- count.arrow_dp

#' @importFrom rlang sym :=
tally.arrow_dplyr_query <- function(x, wt = NULL, sort = FALSE, name = NULL) {
check_name <- utils::getFromNamespace("check_name", "dplyr")
check_name <- getFromNamespace("check_name", "dplyr")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is there a reason we didn't want the utils::?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've elsewhere done importFrom so it's not necessary

name <- check_name(name, dplyr::group_vars(x))

if (quo_is_null(enquo(wt))) {
Expand Down
160 changes: 160 additions & 0 deletions r/R/dplyr-glimpse.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

#' @importFrom utils getFromNamespace
glimpse.ArrowTabular <- function(x,
width = getOption("pillar.width", getOption("width")),
...) {
# This function is inspired by pillar:::glimpse.tbl(), with some adaptations

# We use cli:: and pillar:: throughout this function. We don't need to check
# to see if they're installed because dplyr depends on pillar, which depends
# on cli, and we're only in this function though S3 dispatch on dplyr::glimpse
if (!is.finite(width)) {
abort("`width` must be finite.")
}

# We need a couple of internal functions in pillar for formatting
pretty_int <- getFromNamespace("big_mark", "pillar")
make_shorter <- getFromNamespace("str_trunc", "pillar")
tickify <- getFromNamespace("tick_if_needed", "pillar")

# Even though this is the ArrowTabular method, we use it for arrow_dplyr_query
# so make some accommodations. (Others are handled by S3 method dispatch.)
if (inherits(x, "arrow_dplyr_query")) {
class_title <- paste(source_data(x)$class_title(), "(query)")
} else {
class_title <- x$class_title()
}
cli::cat_line(class_title)

dims <- dim(x)
cli::cat_line(sprintf(
"%s rows x %s columns", pretty_int(dims[1]), pretty_int(dims[2])
))

if (dims[2] == 0) {
return(invisible(x))
}

nrows <- as.integer(width / 3)
head_tab <- dplyr::compute(head(x, nrows))
# Take the schema from this Table because if x is arrow_dplyr_query, some
# output types could be a best guess (in implicit_schema()).
schema <- head_tab$schema

# Assemble the column names and types
# We use the Arrow type names here. See type_sum.DataType() below.
var_types <- map_chr(schema$fields, ~ format(pillar::new_pillar_type(.$type)))
# glimpse.tbl() left-aligns the var names (pads with whitespace to the right)
# and appends the types next to them. Because those type names are
# aggressively truncated to all be roughly the same length, this means the
# data glimpse that follows is also mostly aligned.
# However, Arrow type names are longer and variable length, and we're only
# truncating the nested type information inside of <...>. So, to keep the
# data glimpses aligned, we "justify" align the name and type: add the padding
# whitespace between them so that the total width is equal.
var_headings <- paste("$", center_pad(tickify(names(x)), var_types))

# Assemble the data glimpse
df <- as.data.frame(head_tab)
formatted_data <- map_chr(df, function(.) {
tryCatch(
paste(pillar::format_glimpse(.), collapse = ", "),
# This could error e.g. if you have a VctrsExtensionType and the package
# that defines methods for the data is not loaded
error = function(e) conditionMessage(e)
)
})
# Here and elsewhere in the glimpse code, you have to use pillar::get_extent()
# instead of nchar() because get_extent knows how to deal with ANSI escapes
# etc.--it counts how much space on the terminal will be taken when printed.
data_width <- width - pillar::get_extent(var_headings)
truncated_data <- make_shorter(formatted_data, data_width)

# Print the table body (var name, type, data glimpse)
cli::cat_line(var_headings, " ", truncated_data)

# TODO: use crayon to style these footers?
if (inherits(x, "arrow_dplyr_query")) {
cli::cat_line("Call `print()` for query details")
} else if (any(grepl("<...>", var_types, fixed = TRUE)) || schema$HasMetadata) {
cli::cat_line("Call `print()` for full schema details")
}
invisible(x)
}

# Dataset has an efficient head() method via Scanner so this is fine
glimpse.Dataset <- glimpse.ArrowTabular

glimpse.arrow_dplyr_query <- function(x,
width = getOption("pillar.width", getOption("width")),
...) {
if (any(map_lgl(all_sources(x), ~ inherits(., "RecordBatchReader")))) {
msg <- paste(
"Cannot glimpse() data from a RecordBatchReader because it can only be",
"read one time. Call `compute()` to evaluate the query first."
)
message(msg)
print(x)
} else if (query_on_dataset(x) && !query_can_stream(x)) {
msg <- paste(
"This query requires a full table scan, so glimpse() may be",
"expensive. Call `compute()` to evaluate the query first."
)
message(msg)
print(x)
} else {
# Go for it
glimpse.ArrowTabular(x, width = width, ...)
}
}

glimpse.RecordBatchReader <- function(x,
width = getOption("pillar.width", getOption("width")),
...) {
# TODO(ARROW-17038): to_arrow() on duckdb con should hold con not RBR so it
# can be run more than once (like duckdb does on the other side)
msg <- paste(
"Cannot glimpse() data from a RecordBatchReader because it can only be",
"read one time; call `as_arrow_table()` to consume it first."
)
message(msg)
print(x)
}

glimpse.ArrowDatum <- function(x, width, ...) {
cli::cat_line(gsub("[ \n]+", " ", x$ToString()))
invisible(x)
}

type_sum.DataType <- function(x) {
if (inherits(x, "VctrsExtensionType")) {
# ptype() holds a vctrs type object, which pillar knows how to format
paste0("ext<", pillar::type_sum(x$ptype()), ">")
} else {
# Trim long type names with <...>
sub("<.*>", "<...>", x$ToString())
}
}

center_pad <- function(left, right) {
left_sizes <- pillar::get_extent(left)
right_sizes <- pillar::get_extent(right)
total_width <- max(left_sizes + right_sizes) + 1L
paste0(left, strrep(" ", total_width - left_sizes - right_sizes), right)
}
47 changes: 42 additions & 5 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,9 @@ abandon_ship <- function(call, .data, msg) {
eval.parent(call, 2)
}

query_on_dataset <- function(x) inherits(source_data(x), c("Dataset", "RecordBatchReader"))
query_on_dataset <- function(x) {
any(map_lgl(all_sources(x), ~ inherits(., c("Dataset", "RecordBatchReader"))))
}

source_data <- function(x) {
if (!inherits(x, "arrow_dplyr_query")) {
Expand All @@ -276,13 +278,48 @@ source_data <- function(x) {
}
}

is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")
all_sources <- function(x) {
if (is.null(x)) {
x
} else if (!inherits(x, "arrow_dplyr_query")) {
list(x)
} else {
c(
all_sources(x$.data),
all_sources(x$join$right_data),
all_sources(x$union_all$right_data)
)
}
}

has_aggregation <- function(x) {
# TODO: update with joins (check right side data too)
!is.null(x$aggregations) || (is_collapsed(x) && has_aggregation(x$.data))
query_can_stream <- function(x) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason we can't push this down to C++ is because we haven't constructed an exec plan yet, right? Otherwise, it would be more maintainable to do so.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow. We could build an ExecPlan, but it wouldn't tell us anything about how it would perform, would it? I'm trying to detect cases where I can just take head() of the data without having to scan an entire dataset.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could build an ExecPlan, but it wouldn't tell us anything about how it would perform, would it?

I'm not super close to the ExecPlan code, but I thought they were composed of a graph of nodes that could be traversed and analyzed, just like our arrow_dplyr_query structure. Am I wrong on that?

I'm trying to detect cases where I can just take head() of the data without having to scan an entire dataset.

I was just thinking that having such a method on ExecPlan would be useful in general.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that probably would be useful

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was just thinking that having such a method on ExecPlan would be useful in general.

Possibly. We'd probably want to define it more formally. SQL has LIMIT X and Substrait's equivalent is FetchRel. Neither of these are exactly what is being detected here. For example, it is legal to have SELECT SUM(x) FROM table LIMIT 1 but it wouldn't actually limit any data being read.

We could define it as "single pipeline queries" but a pipeline breaker doesn't necessarily mean a query is non-streaming (for example, hash-join is sometimes permitted as "streaming" in this example but it is always a pipeline breaker).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you mentioned limit, I'll make a plug for ARROW-16628. Not relevant for this particular question, just would let me delete some R specific handling outside of the ExecPlan, and I'm guessing we'll have to do it to support substrait.

# Queries that just select/filter/mutate can stream:
# you can take head() without evaluating over the whole dataset
if (inherits(x, "arrow_dplyr_query")) {
# Aggregations require all of the data
is.null(x$aggregations) &&
# Sorting does too
length(x$arrange_vars) == 0 &&
# Joins are ok as long as the right-side data is in memory
# (we have to hash the whole dataset to join it)
!query_on_dataset(x$join$right_data) &&
# But need to check that this non-dataset join can stream
query_can_stream(x$join$right_data) &&
# Also check that any unioned datasets also can stream
query_can_stream(x$union_all$right_data) &&
# Recursively check any queries that have been collapsed
query_can_stream(x$.data)
} else {
# Not a query, so it must be a Table/Dataset (or NULL)
# Note that if you have a RecordBatchReader, you *can* stream,
# but the reader is consumed. If that's a problem, you should check
# for RBRs outside of this function.
TRUE
}
}

is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")

has_head_tail <- function(x) {
!is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && has_head_tail(x$.data))
}
22 changes: 5 additions & 17 deletions r/R/extension.R
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ ExtensionType <- R6Class("ExtensionType",
sprintf(
"<%s %s...>",
class(self)[1],
paste(format(utils::head(metadata_raw, 20)), collapse = " ")
paste(format(head(metadata_raw, 20)), collapse = " ")
)
} else {
sprintf(
Expand Down Expand Up @@ -420,31 +420,19 @@ unregister_extension_type <- function(extension_name) {
arrow__UnregisterRExtensionType(extension_name)
}

#' @importFrom utils capture.output
VctrsExtensionType <- R6Class("VctrsExtensionType",
inherit = ExtensionType,
public = list(
ptype = function() {
private$.ptype
},
ptype = function() private$.ptype,
ToString = function() {
tf <- tempfile()
sink(tf)
on.exit({
sink(NULL)
unlink(tf)
})
print(self$ptype())
paste0(readLines(tf), collapse = "\n")
paste0(capture.output(print(self$ptype())), collapse = "\n")
},
deserialize_instance = function() {
private$.ptype <- unserialize(self$extension_metadata())
},
ExtensionEquals = function(other) {
if (!inherits(other, "VctrsExtensionType")) {
return(FALSE)
}

identical(self$ptype(), other$ptype())
inherits(other, "VctrsExtensionType") && identical(self$ptype(), other$ptype())
},
as_vector = function(extension_array) {
if (inherits(extension_array, "ChunkedArray")) {
Expand Down
1 change: 0 additions & 1 deletion r/R/filesystem.R
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,6 @@ s3_bucket <- function(bucket, ...) {
#' @usage NULL
#' @format NULL
#' @rdname FileSystem
#' @importFrom utils modifyList
#' @export
GcsFileSystem <- R6Class("GcsFileSystem",
inherit = FileSystem
Expand Down
4 changes: 2 additions & 2 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ ExecPlan <- R6Class("ExecPlan",
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
# We already have everything we need for the head, so StopProducing
self$Stop()
}
# Can we now tell `self$Stop()` to StopProducing? We already have
# everything we need for the head (but it seems to segfault: ARROW-14329)
} else if (!is.null(node$extras$tail)) {
# TODO(ARROW-16630): proper BottomK support
# Reverse the row order to get back what we expect
Expand Down
Loading