Skip to content

Conversation

@wjones127
Copy link
Member

@wjones127 wjones127 commented Dec 7, 2021

Updating map_batches() function to use RecordBatchReader instead of Scanner$ScanBatches() so that only one record batch is in memory at a time.

As part of this, I refactored do_exec_plan to always return a RBR instead of a materialized Table. I don't think I can refactor do_exec_plan to always return a RBR until we get arrange, head, and tail operations to work outside of a sink node. See: https://issues.apache.org/jira/browse/ARROW-15271

@github-actions
Copy link

github-actions bot commented Dec 7, 2021

@@ -290,6 +290,64 @@ rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data you can
avoid scanning because they have no rows where `total_amount > 100`.

### Processing data in batches
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example was useful in testing, and hopefully gives some ideas for usage. Though perhaps it belongs more in the cookbook? LMK what you think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it a lot. And I think it totally belongs here in a vignette (especially in the tone you have here). But it wouldn't be bad to make an issue to add to the cookbook as well (though don't feel obligated to do that right now if you don't want to!).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wjones127 wjones127 marked this pull request as ready for review December 9, 2021 21:25
Copy link
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note on the use of append() that may or may not be useful here!

@wjones127 wjones127 force-pushed the ARROW-14029-r-map-batches branch from 1ab8643 to adbd79e Compare December 21, 2021 16:27
@wjones127
Copy link
Member Author

@jonkeane If you have time to review, I think it would be cool to get this into 7.0.0.

Copy link
Member

@jonkeane jonkeane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this fix up, especially with a part of the code that is not easy to grok / not super well documented! A few questions / comments, but this looks really good so far

c(5, 10)
)

# $Take returns RecordBatch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment accurate here? It looks like it's returning a tibble? Or is that a side effect of arrange()?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns a record batch within that function. But since the .data.frame option on map_batches() is TRUE by default, the results are combined into a tibble using dplyr::bind_rows().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see, it might be good to add that to the comment so that it's clear what's going on there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I've added a couple clarifying comments.

Comment on lines 473 to 476
map_batches(~ .$num_rows, .data.frame = FALSE) %>%
as.numeric() %>%
sort(),
c(5, 10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
map_batches(~ .$num_rows, .data.frame = FALSE) %>%
as.numeric() %>%
sort(),
c(5, 10)
map_batches(~ .$num_rows, .data.frame = FALSE) %>%
sort(),
c(5L, 10L)

Does this work? as.numeric() in there is a little suspicious — what issues do you have if you take it out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map_batches() will return a list, but sort() only takes atomic vectors.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaaaaah, I see now. Maybe it would be a bit cleared with unlist() instead of as.numeric()?

lapply <- map_dfr
}
scanner <- Scanner$create(ensure_group_vars(X))
# TODO: possibly refactor do_exec_plan to return a RecordBatchReader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind making the Jira for this + put the number here? I don't know of one off the top of my head but we should get one if we think we'll (possibly) want to move in that direction

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIra created: https://issues.apache.org/jira/browse/ARROW-15271

Added to the comment as well.

@@ -174,8 +174,6 @@ ScanTask <- R6Class("ScanTask",
#' a `data.frame` for further aggregation, even if you couldn't fit the whole
#' `Dataset` result in memory.
#'
#' This is experimental and not recommended for production use.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to still keep the experimental label here — it's working now, but as you mention, we might refactor it / have it possibly have different behavior in the future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the do_exec_plan refactor wouldn't affect behavior of this. But not sure about the "wrap in arrow_dplyr_query" one.

If we keep it experimental, we should either remove the vignette below or mark that as experimental as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the vignette is helpful and useful. I would lean (slightly) towards marking this as experimental in both places, but only pretty weakly, happy to go with it if you would prefer to take the experimental marks off.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I've marked both as experimental.

@wjones127 wjones127 force-pushed the ARROW-14029-r-map-batches branch from adbd79e to 7457c9d Compare January 6, 2022 18:08
@wjones127 wjones127 requested a review from jonkeane January 6, 2022 20:40
Copy link
Member

@jonkeane jonkeane left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks great, thank you! One super minor suggestion that you can take or leave. I'll merge this sometime tomorrow, to give you a chance to look at that.

filter(int > 5) %>%
select(int, lgl) %>%
map_batches(~ .$num_rows, .data.frame = FALSE) %>%
unlist() %>% # Returns list because .data.frame is FALSE
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this fantastic extra clarification!

@@ -290,6 +290,64 @@ rows match the filter. Relatedly, since Parquet files contain row groups with
statistics on the data within, there may be entire chunks of data you can
avoid scanning because they have no rows where `total_amount > 100`.

### Processing data in batches
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it a lot. And I think it totally belongs here in a vignette (especially in the tone you have here). But it wouldn't be bad to make an issue to add to the cookbook as well (though don't feel obligated to do that right now if you don't want to!).

Co-authored-by: Jonathan Keane <jkeane@gmail.com>
@ursabot
Copy link

ursabot commented Jan 7, 2022

Benchmark runs are scheduled for baseline = e64480d and contender = f054440. f054440 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Finished ⬇️0.0% ⬆️0.0%] ec2-t3-xlarge-us-east-2
[Failed ⬇️1.35% ⬆️0.0%] ursa-i9-9960x
[Finished ⬇️0.7% ⬆️0.04%] ursa-thinkcentre-m75q
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python. Runs only benchmarks with cloud = True
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants