diff --git a/r/NAMESPACE b/r/NAMESPACE index 4636c1215f2..30e2d5e5414 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -145,6 +145,7 @@ export(schema) export(str.integer64) export(struct) export(table) +export(table_from_batches) export(time32) export(time64) export(timestamp) @@ -165,6 +166,7 @@ importFrom(glue,glue) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_int) +importFrom(purrr,walk) importFrom(rlang,abort) importFrom(rlang,dots_n) importFrom(rlang,list2) diff --git a/r/R/R6.R b/r/R/R6.R index 69d58e0c136..a9449c973d2 100644 --- a/r/R/R6.R +++ b/r/R/R6.R @@ -18,7 +18,7 @@ #' @include enums.R #' @importFrom R6 R6Class #' @importFrom glue glue -#' @importFrom purrr map map_int map2 +#' @importFrom purrr map map_int map2 walk #' @importFrom rlang dots_n #' @importFrom assertthat assert_that diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 3d493c7f066..ebc457cb574 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -793,6 +793,14 @@ Table__from_dataframe <- function(tbl) { .Call(`_arrow_Table__from_dataframe`, tbl) } +Table__FromRecordBatches <- function(lst_batches) { + .Call(`_arrow_Table__FromRecordBatches`, lst_batches) +} + +Table__FromRecordBatches_Schema <- function(lst_batches, schema) { + .Call(`_arrow_Table__FromRecordBatches_Schema`, lst_batches, schema) +} + Table__num_columns <- function(x) { .Call(`_arrow_Table__num_columns`, x) } diff --git a/r/R/Table.R b/r/R/Table.R index c39fce246af..da2515e776a 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -60,6 +60,27 @@ table <- function(.data){ shared_ptr(`arrow::Table`, Table__from_dataframe(.data)) } +#' Create a arrow::Table from record batches +#' +#' @param ... variable number of arrow::RecordBatch. This supports tidy dots splicing +#' @param schema either NULL or a arrow::Schema +#' +#' @return a arrow::Table +#' +#' @export +table_from_batches <- function(..., schema = NULL) { + batches <- list2(...) + walk(batches, ~stopifnot(inherits(., "arrow::RecordBatch"))) + + if(is.null(schema)) { + shared_ptr(`arrow::Table`, Table__FromRecordBatches(batches)) + } else if(inherits(schema, "arrow::Schema")) { + shared_ptr(`arrow::Table`, Table__FromRecordBatches_Schema(batches, schema)) + } else { + abort("schema should be NULL or a `arrow::Scherma") + } +} + #' @export `as_tibble.arrow::Table` <- function(x, use_threads = TRUE, ...){ Table__to_dataframe(x, use_threads = use_threads) diff --git a/r/man/table_from_batches.Rd b/r/man/table_from_batches.Rd new file mode 100644 index 00000000000..87e45198fe2 --- /dev/null +++ b/r/man/table_from_batches.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/Table.R +\name{table_from_batches} +\alias{table_from_batches} +\title{Create a arrow::Table from record batches} +\usage{ +table_from_batches(..., schema = NULL) +} +\arguments{ +\item{...}{variable number of arrow::RecordBatch. This supports tidy dots splicing} + +\item{schema}{either NULL or a arrow::Schema} +} +\value{ +a arrow::Table +} +\description{ +Create a arrow::Table from record batches +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index e726f470969..b0fd018dc3f 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2222,6 +2222,29 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Table__FromRecordBatches +std::shared_ptr Table__FromRecordBatches(List_ lst_batches); +RcppExport SEXP _arrow_Table__FromRecordBatches(SEXP lst_batchesSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type lst_batches(lst_batchesSEXP); + rcpp_result_gen = Rcpp::wrap(Table__FromRecordBatches(lst_batches)); + return rcpp_result_gen; +END_RCPP +} +// Table__FromRecordBatches_Schema +std::shared_ptr Table__FromRecordBatches_Schema(List_ lst_batches, const std::shared_ptr& schema); +RcppExport SEXP _arrow_Table__FromRecordBatches_Schema(SEXP lst_batchesSEXP, SEXP schemaSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type lst_batches(lst_batchesSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type schema(schemaSEXP); + rcpp_result_gen = Rcpp::wrap(Table__FromRecordBatches_Schema(lst_batches, schema)); + return rcpp_result_gen; +END_RCPP +} // Table__num_columns int Table__num_columns(const std::shared_ptr& x); RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) { @@ -2498,6 +2521,8 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1}, + {"_arrow_Table__FromRecordBatches", (DL_FUNC) &_arrow_Table__FromRecordBatches, 1}, + {"_arrow_Table__FromRecordBatches_Schema", (DL_FUNC) &_arrow_Table__FromRecordBatches_Schema, 2}, {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 4843f95ace3..ef6fce34a56 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -198,5 +198,16 @@ class RBuffer : public MutableBuffer { Vec vec_; }; +template +std::vector> list_to_shared_ptr_vector(SEXP lst) { + R_xlen_t n = XLENGTH(lst); + std::vector> res(n); + for (R_xlen_t i = 0; i < n; i++) { + res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>( + VECTOR_ELT(lst, i)); + } + return res; +} + } // namespace r } // namespace arrow diff --git a/r/src/table.cpp b/r/src/table.cpp index fcf2a034768..c3c116f1fa2 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -32,6 +32,22 @@ std::shared_ptr Table__from_dataframe(DataFrame tbl) { return out; } +// [[Rcpp::export]] +std::shared_ptr Table__FromRecordBatches(List_ lst_batches) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); + std::shared_ptr out; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &out)); + return out; +} + +// [[Rcpp::export]] +std::shared_ptr Table__FromRecordBatches_Schema(List_ lst_batches, const std::shared_ptr& schema) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); + std::shared_ptr out; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, std::move(batches), &out)); + return out; +} + // [[Rcpp::export]] int Table__num_columns(const std::shared_ptr& x) { return x->num_columns(); diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index ec1be9b2348..983a0048ff5 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -68,3 +68,15 @@ test_that("Table cast (ARROW-3741)", { expect_equal(tab2$column(0L)$type, int16()) expect_equal(tab2$column(1L)$type, int64()) }) + +test_that("table_from_batches() cast (ARROW-3818)", { + d <- tibble::tibble(int = 1:10, dbl = rnorm(10)) + batches <- purrr::rerun(10, record_batch(d)) + table <- table_from_batches(!!!batches) + expect_is(table, "arrow::Table") + expect_equal(as_tibble(table), vctrs::vec_rbind(!!!purrr::rerun(10, d))) + + table <- table_from_batches(!!!batches, schema = schema(int = int32(), dbl = float64())) + expect_is(table, "arrow::Table") + expect_equal(as_tibble(table), vctrs::vec_rbind(!!!purrr::rerun(10, d))) +})