Skip to content
Closed
2 changes: 1 addition & 1 deletion r/R/arrow-package.R
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
"semi_join", "anti_join", "count", "tally"
)
)
for (cl in c("Dataset", "ArrowTabular", "arrow_dplyr_query")) {
for (cl in c("Dataset", "ArrowTabular", "RecordBatchReader", "arrow_dplyr_query")) {
for (m in dplyr_methods) {
s3_register(m, cl)
}
Expand Down
2 changes: 1 addition & 1 deletion r/R/dplyr-arrange.R
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ arrange.arrow_dplyr_query <- function(.data, ..., .by_group = FALSE) {
.data$arrange_desc <- c(descs, .data$arrange_desc)
.data
}
arrange.Dataset <- arrange.ArrowTabular <- arrange.arrow_dplyr_query
arrange.Dataset <- arrange.ArrowTabular <- arrange.RecordBatchReader <- arrange.arrow_dplyr_query

# Helper to handle desc() in arrange()
# * Takes a quosure as input
Expand Down
8 changes: 4 additions & 4 deletions r/R/dplyr-collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ collect.ArrowTabular <- function(x, as_data_frame = TRUE, ...) {
x
}
}
collect.Dataset <- function(x, ...) dplyr::collect(as_adq(x), ...)
collect.Dataset <- collect.RecordBatchReader <- function(x, ...) dplyr::collect(as_adq(x), ...)

compute.arrow_dplyr_query <- function(x, ...) dplyr::collect(x, as_data_frame = FALSE)
compute.ArrowTabular <- function(x, ...) x
compute.Dataset <- compute.arrow_dplyr_query
compute.Dataset <- compute.RecordBatchReader <- compute.arrow_dplyr_query

pull.arrow_dplyr_query <- function(.data, var = -1) {
.data <- as_adq(.data)
var <- vars_pull(names(.data), !!enquo(var))
.data$selected_columns <- set_names(.data$selected_columns[var], var)
dplyr::collect(.data)[[1]]
}
pull.Dataset <- pull.ArrowTabular <- pull.arrow_dplyr_query
pull.Dataset <- pull.ArrowTabular <- pull.RecordBatchReader <- pull.arrow_dplyr_query

restore_dplyr_features <- function(df, query) {
# An arrow_dplyr_query holds some attributes that Arrow doesn't know about
Expand Down Expand Up @@ -85,7 +85,7 @@ collapse.arrow_dplyr_query <- function(x, ...) {
# Nest inside a new arrow_dplyr_query (and keep groups)
restore_dplyr_features(arrow_dplyr_query(x), x)
}
collapse.Dataset <- collapse.ArrowTabular <- function(x, ...) {
collapse.Dataset <- collapse.ArrowTabular <- collapse.RecordBatchReader <- function(x, ...) {
arrow_dplyr_query(x)
}

Expand Down
4 changes: 2 additions & 2 deletions r/R/dplyr-count.R
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ count.arrow_dplyr_query <- function(x, ..., wt = NULL, sort = FALSE, name = NULL
out
}

count.Dataset <- count.ArrowTabular <- count.arrow_dplyr_query
count.Dataset <- count.ArrowTabular <- count.RecordBatchReader <- count.arrow_dplyr_query

#' @importFrom rlang sym :=
tally.arrow_dplyr_query <- function(x, wt = NULL, sort = FALSE, name = NULL) {
Expand All @@ -54,7 +54,7 @@ tally.arrow_dplyr_query <- function(x, wt = NULL, sort = FALSE, name = NULL) {
}
}

tally.Dataset <- tally.ArrowTabular <- tally.arrow_dplyr_query
tally.Dataset <- tally.ArrowTabular <- tally.RecordBatchReader <- tally.arrow_dplyr_query

# we don't want to depend on dplyr, but we refrence these above
utils::globalVariables(c("n", "desc"))
2 changes: 1 addition & 1 deletion r/R/dplyr-distinct.R
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ distinct.arrow_dplyr_query <- function(.data, ..., .keep_all = FALSE) {
out
}

distinct.Dataset <- distinct.ArrowTabular <- distinct.arrow_dplyr_query
distinct.Dataset <- distinct.ArrowTabular <- distinct.RecordBatchReader <- distinct.arrow_dplyr_query
2 changes: 1 addition & 1 deletion r/R/dplyr-filter.R
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ filter.arrow_dplyr_query <- function(.data, ..., .preserve = FALSE) {

set_filters(.data, filters)
}
filter.Dataset <- filter.ArrowTabular <- filter.arrow_dplyr_query
filter.Dataset <- filter.ArrowTabular <- filter.RecordBatchReader <- filter.arrow_dplyr_query

set_filters <- function(.data, expressions) {
if (length(expressions)) {
Expand Down
8 changes: 4 additions & 4 deletions r/R/dplyr-group-by.R
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ group_by.arrow_dplyr_query <- function(.data,
.data$drop_empty_groups <- ifelse(length(gv), .drop, dplyr::group_by_drop_default(.data))
.data
}
group_by.Dataset <- group_by.ArrowTabular <- group_by.arrow_dplyr_query
group_by.Dataset <- group_by.ArrowTabular <- group_by.RecordBatchReader <- group_by.arrow_dplyr_query

groups.arrow_dplyr_query <- function(x) syms(dplyr::group_vars(x))
groups.Dataset <- groups.ArrowTabular <- function(x) NULL
groups.Dataset <- groups.ArrowTabular <- groups.RecordBatchReader <- function(x) NULL

group_vars.arrow_dplyr_query <- function(x) x$group_by_vars
group_vars.Dataset <- function(x) NULL
Expand All @@ -71,15 +71,15 @@ group_vars.ArrowTabular <- function(x) {
# the .drop argument to group_by()
group_by_drop_default.arrow_dplyr_query <-
function(.tbl) .tbl$drop_empty_groups %||% TRUE
group_by_drop_default.Dataset <- group_by_drop_default.ArrowTabular <-
group_by_drop_default.Dataset <- group_by_drop_default.ArrowTabular <- group_by_drop_default.RecordBatchReader <-
function(.tbl) TRUE

ungroup.arrow_dplyr_query <- function(x, ...) {
x$group_by_vars <- character()
x$drop_empty_groups <- NULL
x
}
ungroup.Dataset <- force
ungroup.Dataset <- ungroup.RecordBatchReader <- force
ungroup.ArrowTabular <- function(x) {
x$r_metadata$attributes$.group_vars <- NULL
x
Expand Down
12 changes: 6 additions & 6 deletions r/R/dplyr-join.R
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ left_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "LEFT_OUTER")
}
left_join.Dataset <- left_join.ArrowTabular <- left_join.arrow_dplyr_query
left_join.Dataset <- left_join.ArrowTabular <- left_join.RecordBatchReader <- left_join.arrow_dplyr_query

right_join.arrow_dplyr_query <- function(x,
y,
Expand All @@ -63,7 +63,7 @@ right_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "RIGHT_OUTER")
}
right_join.Dataset <- right_join.ArrowTabular <- right_join.arrow_dplyr_query
right_join.Dataset <- right_join.ArrowTabular <- right_join.RecordBatchReader <- right_join.arrow_dplyr_query

inner_join.arrow_dplyr_query <- function(x,
y,
Expand All @@ -74,7 +74,7 @@ inner_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "INNER")
}
inner_join.Dataset <- inner_join.ArrowTabular <- inner_join.arrow_dplyr_query
inner_join.Dataset <- inner_join.ArrowTabular <- inner_join.RecordBatchReader <- inner_join.arrow_dplyr_query

full_join.arrow_dplyr_query <- function(x,
y,
Expand All @@ -85,7 +85,7 @@ full_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "FULL_OUTER")
}
full_join.Dataset <- full_join.ArrowTabular <- full_join.arrow_dplyr_query
full_join.Dataset <- full_join.ArrowTabular <- full_join.RecordBatchReader <- full_join.arrow_dplyr_query

semi_join.arrow_dplyr_query <- function(x,
y,
Expand All @@ -96,7 +96,7 @@ semi_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "LEFT_SEMI")
}
semi_join.Dataset <- semi_join.ArrowTabular <- semi_join.arrow_dplyr_query
semi_join.Dataset <- semi_join.ArrowTabular <- semi_join.RecordBatchReader <- semi_join.arrow_dplyr_query

anti_join.arrow_dplyr_query <- function(x,
y,
Expand All @@ -107,7 +107,7 @@ anti_join.arrow_dplyr_query <- function(x,
keep = FALSE) {
do_join(x, y, by, copy, suffix, ..., keep = keep, join_type = "LEFT_ANTI")
}
anti_join.Dataset <- anti_join.ArrowTabular <- anti_join.arrow_dplyr_query
anti_join.Dataset <- anti_join.ArrowTabular <- anti_join.RecordBatchReader <- anti_join.arrow_dplyr_query

handle_join_by <- function(by, x, y) {
if (is.null(by)) {
Expand Down
4 changes: 2 additions & 2 deletions r/R/dplyr-mutate.R
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ mutate.arrow_dplyr_query <- function(.data,
# Even if "none", we still keep group vars
ensure_group_vars(.data)
}
mutate.Dataset <- mutate.ArrowTabular <- mutate.arrow_dplyr_query
mutate.Dataset <- mutate.ArrowTabular <- mutate.RecordBatchReader <- mutate.arrow_dplyr_query

transmute.arrow_dplyr_query <- function(.data, ...) {
dots <- check_transmute_args(...)
dplyr::mutate(.data, !!!dots, .keep = "none")
}
transmute.Dataset <- transmute.ArrowTabular <- transmute.arrow_dplyr_query
transmute.Dataset <- transmute.ArrowTabular <- transmute.RecordBatchReader <- transmute.arrow_dplyr_query

# This function is a copy of dplyr:::check_transmute_args at
# https://github.com/tidyverse/dplyr/blob/master/R/mutate.R
Expand Down
6 changes: 3 additions & 3 deletions r/R/dplyr-select.R
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ select.arrow_dplyr_query <- function(.data, ...) {
check_select_helpers(enexprs(...))
column_select(as_adq(.data), !!!enquos(...))
}
select.Dataset <- select.ArrowTabular <- select.arrow_dplyr_query
select.Dataset <- select.ArrowTabular <- select.RecordBatchReader <- select.arrow_dplyr_query

rename.arrow_dplyr_query <- function(.data, ...) {
check_select_helpers(enexprs(...))
column_select(as_adq(.data), !!!enquos(...), .FUN = vars_rename)
}
rename.Dataset <- rename.ArrowTabular <- rename.arrow_dplyr_query
rename.Dataset <- rename.ArrowTabular <- rename.RecordBatchReader <- rename.arrow_dplyr_query

column_select <- function(.data, ..., .FUN = vars_select) {
# .FUN is either tidyselect::vars_select or tidyselect::vars_rename
Expand Down Expand Up @@ -106,7 +106,7 @@ relocate.arrow_dplyr_query <- function(.data, ..., .before = NULL, .after = NULL
}
.data
}
relocate.Dataset <- relocate.ArrowTabular <- relocate.arrow_dplyr_query
relocate.Dataset <- relocate.ArrowTabular <- relocate.RecordBatchReader <- relocate.arrow_dplyr_query

check_select_helpers <- function(exprs) {
# Throw an error if unsupported tidyselect selection helpers in `exprs`
Expand Down
2 changes: 1 addition & 1 deletion r/R/dplyr-summarize.R
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ summarise.arrow_dplyr_query <- function(.data, ...) {
return(out)
}
}
summarise.Dataset <- summarise.ArrowTabular <- summarise.arrow_dplyr_query
summarise.Dataset <- summarise.ArrowTabular <- summarise.RecordBatchReader <- summarise.arrow_dplyr_query

# This is the Arrow summarize implementation
do_arrow_summarize <- function(.data, ..., .groups = NULL) {
Expand Down
19 changes: 12 additions & 7 deletions r/R/duckdb.R
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,11 @@ unique_arrow_tablename <- function() {

# Creates an environment that disconnects the database when it's GC'd
duckdb_disconnector <- function(con, tbl_name) {
force(tbl_name)
reg.finalizer(environment(), function(...) {
# remote the table we ephemerally created (though only if the connection is
# still valid)
if (DBI::dbIsValid(con)) {
duckdb::duckdb_unregister_arrow(con, tbl_name)
}
duckdb::duckdb_unregister_arrow(con, tbl_name)
})
environment()
}
Expand All @@ -120,8 +119,11 @@ duckdb_disconnector <- function(con, tbl_name) {
#' other processes (like DuckDB).
#'
#' @param .data the object to be converted
#' @param as_arrow_query should the returned object be wrapped as an
#' `arrow_dplyr_query`? (logical, default: `TRUE`)
#'
#' @return an `arrow_dplyr_query` object, to be used in dplyr pipelines.
#' @return a `RecordBatchReader` object, wrapped as an arrow dplyr query which
#' can be used in dplyr pipelines.
#' @export
#'
#' @examplesIf getFromNamespace("run_duckdb_examples", "arrow")()
Expand All @@ -136,7 +138,7 @@ duckdb_disconnector <- function(con, tbl_name) {
#' summarize(mean_mpg = mean(mpg, na.rm = TRUE)) %>%
#' to_arrow() %>%
#' collect()
to_arrow <- function(.data) {
to_arrow <- function(.data, as_arrow_query = TRUE) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How useful is this argument?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably (and hopefully) not very, but I wanted to have an escape route in case we see another issue like at the start of this PR where duckdb::duckdb_fetch_record_batch(res) fails, but duckdb::duckdb_fetch_record_batch(res)$read_table() works (over time, the DuckDB master branch got into a state where both failed consistently, but at the beginning reading the table worked just fine, but accessing the RBR did not. And since we are at the mercy of both of our release cycles for fixing this, the cost of having this escape hatch doesn't seem so bad to me, but I can remove it.

It also helps a bit when debugging / testing (one doesn't have to recreate to_arrow() without the wrapper).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can get the RBR from $.data in the query object, is that sufficient for debugging purposes?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh nm, it gets wrapped in a InMemoryDataset. Alright, I don't like this but this is fine, we can prune it later.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could also mark it is a temporary workaround for when we simplify this in the future? It's a little silly to effectively say "this is deprecated" when we introduce it, but I'm not sure when we'll get to doing the simplification + improvements so this can simply emit RBRs and those have the right signaling/methods so that it's clear that they are a good thing to use in dplyr queries

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think to_arrow() is a good fit...it gives the thing that a user probably wants. A possible future as_record_batch_reader() would be the right incantation for a user who wants it!

# If this is an Arrow object already, return quickly since we're already Arrow
if (inherits(.data, c("arrow_dplyr_query", "ArrowObject"))) {
return(.data)
Expand All @@ -155,6 +157,9 @@ to_arrow <- function(.data) {
# Run the query
res <- DBI::dbSendQuery(dbplyr::remote_con(.data), dbplyr::remote_query(.data), arrow = TRUE)

# TODO: we shouldn't need $read_table(), but we get segfaults when we do.
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res)$read_table())
if (as_arrow_query) {
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
} else {
duckdb::duckdb_fetch_record_batch(res)
}
Comment on lines +160 to +164
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (as_arrow_query) {
arrow_dplyr_query(duckdb::duckdb_fetch_record_batch(res))
} else {
duckdb::duckdb_fetch_record_batch(res)
}
out <- duckdb::duckdb_fetch_record_batch(res)
if (as_arrow_query) {
out <- arrow_dplyr_query(out)
}
out

}
8 changes: 6 additions & 2 deletions r/man/to_arrow.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 42 additions & 12 deletions r/tests/testthat/test-dataset.R
Original file line number Diff line number Diff line change
Expand Up @@ -841,33 +841,63 @@ test_that("Collecting zero columns from a dataset doesn't return entire dataset"


test_that("dataset RecordBatchReader to C-interface to arrow_dplyr_query", {
ds <- open_dataset(ipc_dir, partitioning = "part", format = "feather")
ds <- open_dataset(hive_dir)

# export the RecordBatchReader via the C-interface
stream_ptr <- allocate_arrow_array_stream()
scan <- Scanner$create(ds)
reader <- scan$ToRecordBatchReader()
reader$export_to_c(stream_ptr)

expect_equal(
RecordBatchStreamReader$import_from_c(stream_ptr) %>%
filter(int < 8 | int > 55) %>%
mutate(part_plus = group + 6) %>%
arrange(dbl) %>%
collect(),
ds %>%
filter(int < 8 | int > 55) %>%
mutate(part_plus = group + 6) %>%
arrange(dbl) %>%
collect()
)

# must clean up the pointer or we leak
delete_arrow_array_stream(stream_ptr)
})

test_that("dataset to C-interface to arrow_dplyr_query with proj/filter", {
ds <- open_dataset(hive_dir)

# filter the dataset
ds <- ds %>%
filter(int > 2)

# export the RecordBatchReader via the C-interface
stream_ptr <- allocate_arrow_array_stream()
scan <- Scanner$create(
ds,
projection = names(ds),
filter = Expression$create("less", Expression$field_ref("int"), Expression$scalar(8L)))
reader <- scan$ToRecordBatchReader()
reader$export_to_c(stream_ptr)

# then import it and check that the roundtripped value is the same
circle <- RecordBatchStreamReader$import_from_c(stream_ptr)

# create an arrow_dplyr_query() from the recordbatch reader
reader_adq <- arrow_dplyr_query(circle)

# TODO: ARROW-14321 should be able to arrange then collect
tab_from_c_new <- reader_adq %>%
filter(int < 8, int > 55) %>%
mutate(part_plus = part + 6) %>%
collect()
expect_equal(
tab_from_c_new %>%
arrange(dbl),
reader_adq %>%
mutate(part_plus = group + 6) %>%
arrange(dbl) %>%
collect(),
ds %>%
filter(int < 8, int > 55) %>%
mutate(part_plus = part + 6) %>%
collect() %>%
arrange(dbl)
filter(int < 8, int > 2) %>%
mutate(part_plus = group + 6) %>%
arrange(dbl) %>%
collect()
)

# must clean up the pointer or we leak
Expand Down
Loading