Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions r/R/dplyr-summarize.R
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,11 @@ do_arrow_summarize <- function(.data, ..., .groups = NULL) {
stop(paste("Invalid .groups argument:", .groups))
}
out$drop_empty_groups <- .data$drop_empty_groups
if (getOption("arrow.summarise.sort", FALSE)) {
# Add sorting instructions for the rows to match dplyr
out$arrange_vars <- .data$selected_columns[.data$group_by_vars]
out$arrange_desc <- rep(FALSE, length(.data$group_by_vars))
}
}
out
}
Expand Down
35 changes: 22 additions & 13 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,7 @@ tail.arrow_dplyr_query <- function(x, n = 6L, ...) {
#' mutate(x = gear / carb) %>%
#' show_exec_plan()
show_exec_plan <- function(x) {
adq <- as_adq(x)

# do not show the plan if we have a nested query (as this will force the
# evaluation of the inner query/queries)
# TODO see if we can remove after ARROW-16628
if (is_collapsed(x) && has_head_tail(x$.data)) {
warn("The `ExecPlan` cannot be printed for a nested query.")
return(invisible(x))
}

result <- as_record_batch_reader(adq)
result <- as_record_batch_reader(as_adq(x))
plan <- result$Plan()
on.exit({
plan$.unsafe_delete()
Expand Down Expand Up @@ -419,6 +409,25 @@ query_can_stream <- function(x) {

is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")

has_head_tail <- function(x) {
!is.null(x$head) || !is.null(x$tail) || (is_collapsed(x) && has_head_tail(x$.data))
has_unordered_head <- function(x) {
if (is.null(x$head %||% x$tail)) {
# no head/tail
return(FALSE)
}
!has_order(x)
}

has_order <- function(x) {
length(x$arrange_vars) > 0 ||
has_implicit_order(x) ||
(is_collapsed(x) && has_order(x$.data))
}

has_implicit_order <- function(x) {
# Approximate what ExecNode$has_ordered_batches() would return (w/o building ExecPlan)
# An in-memory table has an implicit order
# TODO(GH-34698): FileSystemDataset and RecordBatchReader will have implicit order
inherits(x$.data, "ArrowTabular") &&
# But joins, aggregations, etc. will result in non-deterministic order
is.null(x$aggregations) && is.null(x$join) && is.null(x$union_all)
}
158 changes: 77 additions & 81 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ ExecPlan <- R6Class("ExecPlan",

if (is_collapsed(.data)) {
# We have a nested query.
if (has_head_tail(.data$.data)) {
# head and tail are not ExecNodes; at best we can handle them via
# SinkNode, so if there are any steps done after head/tail, we need to
# evaluate the query up to then and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan
if (has_unordered_head(.data$.data)) {
# TODO(GH-34941): FetchNode should do non-deterministic fetch
# Instead, we need to evaluate the query up to here,
# and then do a new query for the rest.
# as_record_batch_reader() will build and run an ExecPlan and do head() on it
reader <- as_record_batch_reader(.data$.data)
on.exit(reader$.unsafe_delete())
node <- self$SourceNode(reader)
Expand Down Expand Up @@ -126,15 +126,6 @@ ExecPlan <- R6Class("ExecPlan",
options = .data$aggregations,
key_names = group_vars
)

if (grouped && getOption("arrow.summarise.sort", FALSE)) {
# Add sorting instructions for the rows too to match dplyr
# (see below about why sorting isn't itself a Node)
node$extras$sort <- list(
names = group_vars,
orders = rep(0L, length(group_vars))
)
}
} else {
# If any columns are derived, reordered, or renamed we need to Project
# If there are aggregations, the projection was already handled above.
Expand Down Expand Up @@ -166,82 +157,81 @@ ExecPlan <- R6Class("ExecPlan",
}
}

# Apply sorting: this is currently not an ExecNode itself, it is a
# sink node option.
# TODO: handle some cases:
# (1) arrange > summarize > arrange
# (2) ARROW-13779: arrange then operation where order matters (e.g. cumsum)
# Apply sorting and head/tail
head_or_tail <- .data$head %||% .data$tail
if (length(.data$arrange_vars)) {
node$extras$sort <- list(
if (!is.null(.data$tail)) {
# Handle tail first: Reverse sort, take head
# TODO(GH-34942): FetchNode support for tail
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = as.integer(!.data$arrange_desc)
))
node <- node$Fetch(.data$tail)
}
# Apply sorting
node <- node$OrderBy(list(
names = names(.data$arrange_vars),
orders = .data$arrange_desc,
temp_columns = names(.data$temp_columns)
)
}
# This is only safe because we are going to evaluate queries that end
# with head/tail first, then evaluate any subsequent query as a new query
if (!is.null(.data$head)) {
node$extras$head <- .data$head
}
if (!is.null(.data$tail)) {
node$extras$tail <- .data$tail
orders = as.integer(.data$arrange_desc)
))

if (length(.data$temp_columns)) {
# If we sorted on ad-hoc derived columns, Project to drop them
temp_schema <- node$schema
cols_to_keep <- setdiff(names(temp_schema), names(.data$temp_columns))
node <- node$Project(make_field_refs(cols_to_keep))
}

if (!is.null(.data$head)) {
# Take the head now
node <- node$Fetch(.data$head)
}
} else if (!is.null(head_or_tail)) {
# Unsorted head/tail
# Handle a couple of special cases here:
if (node$has_ordered_batches()) {
# Data that has order, even implicit order from an in-memory table, is supported
# in FetchNode
if (!is.null(.data$head)) {
node <- node$Fetch(.data$head)
} else {
# TODO(GH-34942): FetchNode support for tail
# FetchNode currently doesn't support tail, but it has limit + offset
# So if we know how many rows the query will result in, we can offset
data_without_tail <- .data
data_without_tail$tail <- NULL
row_count <- nrow(data_without_tail)
if (!is.na(row_count)) {
node <- node$Fetch(.data$tail, offset = row_count - .data$tail)
} else {
# Workaround: non-deterministic tail
node$extras$slice_size <- head_or_tail
}
}
} else {
# TODO(GH-34941): non-deterministic FetchNode
# Data has non-deterministic order, so head/tail means "just show me any N rows"
# FetchNode does not support non-deterministic scans, so we have to handle outside
node$extras$slice_size <- head_or_tail
}
}
node
},
Run = function(node) {
assert_is(node, "ExecNode")

# Sorting and head/tail (if sorted) are handled in the SinkNode,
# created in ExecPlan_build
sorting <- node$extras$sort %||% list()
select_k <- node$extras$head %||% -1L
has_sorting <- length(sorting) > 0
if (has_sorting) {
if (!is.null(node$extras$tail)) {
# Reverse the sort order and take the top K, then after we'll reverse
# the resulting rows so that it is ordered as expected
sorting$orders <- !sorting$orders
select_k <- node$extras$tail
}
sorting$orders <- as.integer(sorting$orders)
}

out <- ExecPlan_run(
self,
node,
sorting,
prepare_key_value_metadata(node$final_metadata()),
select_k
prepare_key_value_metadata(node$final_metadata())
)

if (!has_sorting) {
# Since ExecPlans don't scan in deterministic order, head/tail are both
if (!is.null(node$extras$slice_size)) {
# For non-deterministic scans, head/tail are
# essentially taking a random slice from somewhere in the dataset.
# And since the head() implementation is way more efficient than tail(),
# just use it to take the random slice
# TODO(ARROW-16628): handle limit in ExecNode
slice_size <- node$extras$head %||% node$extras$tail
if (!is.null(slice_size)) {
out <- head(out, slice_size)
}
} else if (!is.null(node$extras$tail)) {
# TODO(ARROW-16630): proper BottomK support
# Reverse the row order to get back what we expect
out <- as_arrow_table(out)
out <- out[rev(seq_len(nrow(out))), , drop = FALSE]
out <- as_record_batch_reader(out)
}

# If arrange() created $temp_columns, make sure to omit them from the result
# We can't currently handle this in ExecPlan_run itself because sorting
# happens in the end (SinkNode) so nothing comes after it.
# TODO(ARROW-16631): move into ExecPlan
if (length(node$extras$sort$temp_columns) > 0) {
tab <- as_arrow_table(out)
tab <- tab[, setdiff(names(tab), node$extras$sort$temp_columns), drop = FALSE]
out <- as_record_batch_reader(tab)
out <- head(out, node$extras$slice_size)
}

out
},
Write = function(node, ...) {
Expand Down Expand Up @@ -272,13 +262,8 @@ ExecNode <- R6Class("ExecNode",
inherit = ArrowObject,
public = list(
extras = list(
# `sort` is a slight hack to be able to keep around arrange() params,
# which don't currently yield their own ExecNode but rather are consumed
# in the SinkNode (in ExecPlan$run())
sort = NULL,
# Similar hacks for head and tail
head = NULL,
tail = NULL,
# Workaround for non-deterministic head/tail
slice_size = NULL,
# `source_schema` is put here in Scan() so that at Run/Write, we can
# extract the relevant metadata and keep it in the result
source_schema = NULL
Expand All @@ -295,6 +280,7 @@ ExecNode <- R6Class("ExecNode",
old_meta$r <- get_r_metadata_from_old_schema(self$schema, old_schema)
old_meta
},
has_ordered_batches = function() ExecNode_has_ordered_batches(self),
Project = function(cols) {
if (length(cols)) {
assert_is_list_of(cols, "Expression")
Expand Down Expand Up @@ -336,6 +322,16 @@ ExecNode <- R6Class("ExecNode",
},
Union = function(right_node) {
self$preserve_extras(ExecNode_Union(self, right_node))
},
Fetch = function(limit, offset = 0L) {
self$preserve_extras(
ExecNode_Fetch(self, offset, limit)
)
},
OrderBy = function(sorting) {
self$preserve_extras(
ExecNode_OrderBy(self, sorting)
)
}
),
active = list(
Expand Down
Loading