From 5f7e5b5c0f715ab231a0bd223f705169cbf1f53b Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 13:37:50 +0100 Subject: [PATCH 1/3] Implement table_from_batches() --- r/NAMESPACE | 2 ++ r/R/R6.R | 2 +- r/R/RcppExports.R | 8 ++++++++ r/R/Table.R | 21 +++++++++++++++++++++ r/man/table_from_batches.Rd | 19 +++++++++++++++++++ r/src/RcppExports.cpp | 25 +++++++++++++++++++++++++ r/src/arrow_types.h | 11 +++++++++++ r/src/table.cpp | 16 ++++++++++++++++ 8 files changed, 103 insertions(+), 1 deletion(-) create mode 100644 r/man/table_from_batches.Rd diff --git a/r/NAMESPACE b/r/NAMESPACE index 4636c1215f2..30e2d5e5414 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -145,6 +145,7 @@ export(schema) export(str.integer64) export(struct) export(table) +export(table_from_batches) export(time32) export(time64) export(timestamp) @@ -165,6 +166,7 @@ importFrom(glue,glue) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_int) +importFrom(purrr,walk) importFrom(rlang,abort) importFrom(rlang,dots_n) importFrom(rlang,list2) diff --git a/r/R/R6.R b/r/R/R6.R index 69d58e0c136..a9449c973d2 100644 --- a/r/R/R6.R +++ b/r/R/R6.R @@ -18,7 +18,7 @@ #' @include enums.R #' @importFrom R6 R6Class #' @importFrom glue glue -#' @importFrom purrr map map_int map2 +#' @importFrom purrr map map_int map2 walk #' @importFrom rlang dots_n #' @importFrom assertthat assert_that diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 3d493c7f066..ebc457cb574 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -793,6 +793,14 @@ Table__from_dataframe <- function(tbl) { .Call(`_arrow_Table__from_dataframe`, tbl) } +Table__FromRecordBatches <- function(lst_batches) { + .Call(`_arrow_Table__FromRecordBatches`, lst_batches) +} + +Table__FromRecordBatches_Schema <- function(lst_batches, schema) { + .Call(`_arrow_Table__FromRecordBatches_Schema`, lst_batches, schema) +} + Table__num_columns <- function(x) { .Call(`_arrow_Table__num_columns`, x) } diff --git a/r/R/Table.R b/r/R/Table.R index c39fce246af..19ba9f9ca13 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -60,6 +60,27 @@ table <- function(.data){ shared_ptr(`arrow::Table`, Table__from_dataframe(.data)) } +#' Create a arrow::Table from record batches +#' +#' @param ... variable number of arrow::RecordBatch. This supports tidy dots splicing +#' @param schema either NULL or a arrow::Schema +#' +#' @return a arrow::Table +#' +#' @export +table_from_batches <- function(..., schema = NULL) { + batches <- list2(...) + walk(batches, ~stopifnot(inherits(., "arrow::RecordBatch"))) + + if(is.null(schema)) { + Table__FromRecordBatches(batches) + } else if(inherits(schema, "arrow::Schema")) { + Table__FromRecordBatches_Schema(batches, schema) + } else { + abort("schema should be NULL or a `arrow::Scherma") + } +} + #' @export `as_tibble.arrow::Table` <- function(x, use_threads = TRUE, ...){ Table__to_dataframe(x, use_threads = use_threads) diff --git a/r/man/table_from_batches.Rd b/r/man/table_from_batches.Rd new file mode 100644 index 00000000000..87e45198fe2 --- /dev/null +++ b/r/man/table_from_batches.Rd @@ -0,0 +1,19 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/Table.R +\name{table_from_batches} +\alias{table_from_batches} +\title{Create a arrow::Table from record batches} +\usage{ +table_from_batches(..., schema = NULL) +} +\arguments{ +\item{...}{variable number of arrow::RecordBatch. This supports tidy dots splicing} + +\item{schema}{either NULL or a arrow::Schema} +} +\value{ +a arrow::Table +} +\description{ +Create a arrow::Table from record batches +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index e726f470969..b0fd018dc3f 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2222,6 +2222,29 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Table__FromRecordBatches +std::shared_ptr Table__FromRecordBatches(List_ lst_batches); +RcppExport SEXP _arrow_Table__FromRecordBatches(SEXP lst_batchesSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type lst_batches(lst_batchesSEXP); + rcpp_result_gen = Rcpp::wrap(Table__FromRecordBatches(lst_batches)); + return rcpp_result_gen; +END_RCPP +} +// Table__FromRecordBatches_Schema +std::shared_ptr Table__FromRecordBatches_Schema(List_ lst_batches, const std::shared_ptr& schema); +RcppExport SEXP _arrow_Table__FromRecordBatches_Schema(SEXP lst_batchesSEXP, SEXP schemaSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List_ >::type lst_batches(lst_batchesSEXP); + Rcpp::traits::input_parameter< const std::shared_ptr& >::type schema(schemaSEXP); + rcpp_result_gen = Rcpp::wrap(Table__FromRecordBatches_Schema(lst_batches, schema)); + return rcpp_result_gen; +END_RCPP +} // Table__num_columns int Table__num_columns(const std::shared_ptr& x); RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) { @@ -2498,6 +2521,8 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1}, + {"_arrow_Table__FromRecordBatches", (DL_FUNC) &_arrow_Table__FromRecordBatches, 1}, + {"_arrow_Table__FromRecordBatches_Schema", (DL_FUNC) &_arrow_Table__FromRecordBatches_Schema, 2}, {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 4843f95ace3..40db67f363a 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -198,5 +198,16 @@ class RBuffer : public MutableBuffer { Vec vec_; }; +template +std::vector> list_to_shared_ptr_vector(SEXP lst) { + SEXP* p = VECTOR_PTR(lst); + R_xlen_t n = XLENGTH(lst); + std::vector> res(n); + for (R_xlen_t i = 0; i < n; i++, ++p) { + res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>(*p); + } + return res; +} + } // namespace r } // namespace arrow diff --git a/r/src/table.cpp b/r/src/table.cpp index fcf2a034768..376a6252f97 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -32,6 +32,22 @@ std::shared_ptr Table__from_dataframe(DataFrame tbl) { return out; } +// [[Rcpp::export]] +std::shared_ptr Table__FromRecordBatches(List_ lst_batches) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); + std::shared_ptr out; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &out)); + return out; +} + +// [[Rcpp::export]] +std::shared_ptr Table__FromRecordBatches_Schema(List_ lst_batches, const std::shared_ptr& schema) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); + std::shared_ptr out; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &out)); + return out; +} + // [[Rcpp::export]] int Table__num_columns(const std::shared_ptr& x) { return x->num_columns(); From 91a36fef66e4d6b18784acdbd308c67141ab102e Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 14:00:12 +0100 Subject: [PATCH 2/3] test table_from_batches() --- r/R/Table.R | 4 ++-- r/src/arrow_types.h | 5 ++--- r/src/table.cpp | 4 ++-- r/tests/testthat/test-Table.R | 12 ++++++++++++ 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/r/R/Table.R b/r/R/Table.R index 19ba9f9ca13..da2515e776a 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -73,9 +73,9 @@ table_from_batches <- function(..., schema = NULL) { walk(batches, ~stopifnot(inherits(., "arrow::RecordBatch"))) if(is.null(schema)) { - Table__FromRecordBatches(batches) + shared_ptr(`arrow::Table`, Table__FromRecordBatches(batches)) } else if(inherits(schema, "arrow::Schema")) { - Table__FromRecordBatches_Schema(batches, schema) + shared_ptr(`arrow::Table`, Table__FromRecordBatches_Schema(batches, schema)) } else { abort("schema should be NULL or a `arrow::Scherma") } diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 40db67f363a..12c813c7d93 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -200,11 +200,10 @@ class RBuffer : public MutableBuffer { template std::vector> list_to_shared_ptr_vector(SEXP lst) { - SEXP* p = VECTOR_PTR(lst); R_xlen_t n = XLENGTH(lst); std::vector> res(n); - for (R_xlen_t i = 0; i < n; i++, ++p) { - res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>(*p); + for (R_xlen_t i = 0; i < n; i++) { + res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>(VECTOR_ELT(lst, i)); } return res; } diff --git a/r/src/table.cpp b/r/src/table.cpp index 376a6252f97..c3c116f1fa2 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -36,7 +36,7 @@ std::shared_ptr Table__from_dataframe(DataFrame tbl) { std::shared_ptr Table__FromRecordBatches(List_ lst_batches) { auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); std::shared_ptr out; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &out)); + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(std::move(batches), &out)); return out; } @@ -44,7 +44,7 @@ std::shared_ptr Table__FromRecordBatches(List_ lst_batches) { std::shared_ptr Table__FromRecordBatches_Schema(List_ lst_batches, const std::shared_ptr& schema) { auto batches = arrow::r::list_to_shared_ptr_vector(lst_batches); std::shared_ptr out; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &out)); + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, std::move(batches), &out)); return out; } diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index ec1be9b2348..983a0048ff5 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -68,3 +68,15 @@ test_that("Table cast (ARROW-3741)", { expect_equal(tab2$column(0L)$type, int16()) expect_equal(tab2$column(1L)$type, int64()) }) + +test_that("table_from_batches() cast (ARROW-3818)", { + d <- tibble::tibble(int = 1:10, dbl = rnorm(10)) + batches <- purrr::rerun(10, record_batch(d)) + table <- table_from_batches(!!!batches) + expect_is(table, "arrow::Table") + expect_equal(as_tibble(table), vctrs::vec_rbind(!!!purrr::rerun(10, d))) + + table <- table_from_batches(!!!batches, schema = schema(int = int32(), dbl = float64())) + expect_is(table, "arrow::Table") + expect_equal(as_tibble(table), vctrs::vec_rbind(!!!purrr::rerun(10, d))) +}) From c7c5d0f25e171db784cb8061792844de7e39f8fa Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 14:19:20 +0100 Subject: [PATCH 3/3] linting --- r/src/arrow_types.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 12c813c7d93..ef6fce34a56 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -203,7 +203,8 @@ std::vector> list_to_shared_ptr_vector(SEXP lst) { R_xlen_t n = XLENGTH(lst); std::vector> res(n); for (R_xlen_t i = 0; i < n; i++) { - res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>(VECTOR_ELT(lst, i)); + res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>( + VECTOR_ELT(lst, i)); } return res; }