From 9a0f996e97f2ea6e9d0d0d136a8ad0a928603a02 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 15:21:24 +0100 Subject: [PATCH 01/21] schema() supports tidy dots splicing, using rlang::list2 --- r/R/Schema.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/Schema.R b/r/R/Schema.R index d57e2749c40..fbf6581d7a3 100644 --- a/r/R/Schema.R +++ b/r/R/Schema.R @@ -61,7 +61,7 @@ #' #' @export schema <- function(...){ - shared_ptr(`arrow::Schema`, schema_(.fields(list(...)))) + shared_ptr(`arrow::Schema`, schema_(.fields(list2(...)))) } #' read a Schema from a stream From cd03e19ecc78ded91d7988525d9345455fe26ee0 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 15:35:21 +0100 Subject: [PATCH 02/21] + list_to_shared_ptr_vector --- r/src/arrow_types.h | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 32f54cb1d5a..a4a0ac6c86c 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -207,5 +207,16 @@ inline std::shared_ptr extract(SEXP x) { return Rcpp::ConstReferenceSmartPtrInputParameter>(x); } +template +std::vector> list_to_shared_ptr_vector(SEXP lst) { + R_xlen_t n = XLENGTH(lst); + std::vector> res(n); + for (R_xlen_t i = 0; i < n; i++) { + res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>( + VECTOR_ELT(lst, i)); + } + return res; +} + } // namespace r } // namespace arrow From d49906af4c07a7b3ebe63740e599c0b1c987ae82 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 16:49:17 +0100 Subject: [PATCH 03/21] Change record_batch() api so that it takes ... and schema. Needs discussion I guess --- r/R/RcppExports.R | 4 ++++ r/R/RecordBatch.R | 20 ++++++++++++++++++-- r/R/feather.R | 2 +- r/man/record_batch.Rd | 2 +- r/src/RcppExports.cpp | 13 +++++++++++++ r/src/arrow_types.h | 2 +- r/src/recordbatch.cpp | 15 +++++++++++++++ r/tests/testthat/test-RecordBatch.R | 4 ++-- r/tests/testthat/test-message.R | 2 +- r/tests/testthat/test-messagereader.R | 4 ++-- r/tests/testthat/test-read_record_batch.R | 4 ++-- r/tests/testthat/test-recordbatchreader.R | 8 ++++---- r/tests/testthat/test-schema.R | 2 +- 13 files changed, 65 insertions(+), 17 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 81e1c040444..b979c0e20a4 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -729,6 +729,10 @@ ipc___ReadRecordBatch__InputStream__Schema <- function(stream, schema) { .Call(`_arrow_ipc___ReadRecordBatch__InputStream__Schema`, stream, schema) } +RecordBatch__from_arrays <- function(schema, lst) { + .Call(`_arrow_RecordBatch__from_arrays`, schema, lst) +} + RecordBatchReader__schema <- function(reader) { .Call(`_arrow_RecordBatchReader__schema`, reader) } diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index 22fda8403c6..44b7197719b 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -89,12 +89,28 @@ RecordBatch__to_dataframe(x, use_threads = use_threads) } +to_array <- function(x) { + if (inherits(x, "arrow::Array")) { + x + } else { + array(x) + } +} + #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame #' #' @param .data a data frame #' #' @return a [arrow::RecordBatch][arrow__RecordBatch] #' @export -record_batch <- function(.data){ - shared_ptr(`arrow::RecordBatch`, RecordBatch__from_dataframe(.data)) +record_batch <- function(..., schema = NULL){ + # TODO: maybe this can be done internally in parallel + arrays <- map(tibble::lst(...), to_array) + stopifnot(length(arrays) > 0) + + if (!inherits(schema, "arrow::Schema")){ + schema <- schema(!!!map(arrays, ~.$type)) + } + + shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays) ) } diff --git a/r/R/feather.R b/r/R/feather.R index 6e4b3a65776..f47ff103224 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -72,7 +72,7 @@ write_feather.default <- function(data, stream) { #' @export write_feather.data.frame <- function(data, stream) { - write_feather(record_batch(data), stream) + write_feather(record_batch(!!!data), stream) } #' @method write_feather arrow::RecordBatch diff --git a/r/man/record_batch.Rd b/r/man/record_batch.Rd index 4567a9ab763..f886d7f4fda 100644 --- a/r/man/record_batch.Rd +++ b/r/man/record_batch.Rd @@ -4,7 +4,7 @@ \alias{record_batch} \title{Create an \link[=arrow__RecordBatch]{arrow::RecordBatch} from a data frame} \usage{ -record_batch(.data) +record_batch(..., schema = NULL) } \arguments{ \item{.data}{a data frame} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1ac96d43a5b..e9771623ce1 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2046,6 +2046,18 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// RecordBatch__from_arrays +std::shared_ptr RecordBatch__from_arrays(const std::shared_ptr& schema, List_ lst); +RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schemaSEXP, SEXP lstSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< const std::shared_ptr& >::type schema(schemaSEXP); + Rcpp::traits::input_parameter< List_ >::type lst(lstSEXP); + rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema, lst)); + return rcpp_result_gen; +END_RCPP +} // RecordBatchReader__schema std::shared_ptr RecordBatchReader__schema(const std::shared_ptr& reader); RcppExport SEXP _arrow_RecordBatchReader__schema(SEXP readerSEXP) { @@ -2495,6 +2507,7 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_RecordBatch__Slice2", (DL_FUNC) &_arrow_RecordBatch__Slice2, 3}, {"_arrow_ipc___SerializeRecordBatch__Raw", (DL_FUNC) &_arrow_ipc___SerializeRecordBatch__Raw, 1}, {"_arrow_ipc___ReadRecordBatch__InputStream__Schema", (DL_FUNC) &_arrow_ipc___ReadRecordBatch__InputStream__Schema, 2}, + {"_arrow_RecordBatch__from_arrays", (DL_FUNC) &_arrow_RecordBatch__from_arrays, 2}, {"_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, {"_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, {"_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index a4a0ac6c86c..4affee2d597 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -213,7 +213,7 @@ std::vector> list_to_shared_ptr_vector(SEXP lst) { std::vector> res(n); for (R_xlen_t i = 0; i < n; i++) { res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>( - VECTOR_ELT(lst, i)); + VECTOR_ELT(lst, i)); } return res; } diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 31fefa8de60..f81f531c84a 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -145,3 +145,18 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( STOP_IF_NOT_OK(arrow::ipc::ReadRecordBatch(schema, &memo, stream.get(), &batch)); return batch; } + +// [[Rcpp::export]] +std::shared_ptr RecordBatch__from_arrays(const std::shared_ptr& schema, List_ lst) { + auto arrays = arrow::r::list_to_shared_ptr_vector(lst); + + // check all sizes are the same + int64_t num_rows= arrays[0]->length(); + for(int64_t i = 1; ilength() != num_rows) { + Rcpp::stop("All arrays must have the same length"); + } + } + + return arrow::RecordBatch::Make(schema, num_rows, arrays); +} diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index da09984b198..096b32c2705 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -24,7 +24,7 @@ test_that("RecordBatch", { chr = letters[1:10], fct = factor(letters[1:10]) ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) expect_true(batch == batch) expect_equal( @@ -93,7 +93,7 @@ test_that("RecordBatch with 0 rows are supported", { fct = factor(character(), levels = c("a", "b")) ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) expect_equal(batch$num_columns, 5L) expect_equal(batch$num_rows, 0L) expect_equal( diff --git a/r/tests/testthat/test-message.R b/r/tests/testthat/test-message.R index 3fe5829f869..4cbf87d3d0f 100644 --- a/r/tests/testthat/test-message.R +++ b/r/tests/testthat/test-message.R @@ -18,7 +18,7 @@ context("arrow::ipc::Message") test_that("read_message can read from input stream", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() stream <- BufferReader(bytes) diff --git a/r/tests/testthat/test-messagereader.R b/r/tests/testthat/test-messagereader.R index 5ff8277625d..690228d2b6d 100644 --- a/r/tests/testthat/test-messagereader.R +++ b/r/tests/testthat/test-messagereader.R @@ -18,7 +18,7 @@ context("arrow::ipc::MessageReader") test_that("MessageReader can be created from raw vectors", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() reader <- MessageReader(bytes) @@ -34,7 +34,7 @@ test_that("MessageReader can be created from raw vectors", { }) test_that("MessageReader can be created from input stream", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() stream <- BufferReader(bytes) diff --git a/r/tests/testthat/test-read_record_batch.R b/r/tests/testthat/test-read_record_batch.R index 2618fae7326..277bfd084a2 100644 --- a/r/tests/testthat/test-read_record_batch.R +++ b/r/tests/testthat/test-read_record_batch.R @@ -48,7 +48,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3 lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) schema <- batch$schema raw <- batch$serialize() @@ -64,7 +64,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3 }) test_that("read_record_batch() can handle (Message, Schema) parameters (ARROW-3499)", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) schema <- batch$schema raw <- batch$serialize() diff --git a/r/tests/testthat/test-recordbatchreader.R b/r/tests/testthat/test-recordbatchreader.R index d2b6a09c37b..65f7933b42d 100644 --- a/r/tests/testthat/test-recordbatchreader.R +++ b/r/tests/testthat/test-recordbatchreader.R @@ -18,10 +18,10 @@ context("arrow::RecordBatch.*(Reader|Writer)") test_that("RecordBatchStreamReader / Writer", { - batch <- record_batch(tibble::tibble( + batch <- record_batch( x = 1:10, y = letters[1:10] - )) + ) sink <- BufferOutputStream() writer <- RecordBatchStreamWriter(sink, batch$schema) @@ -43,10 +43,10 @@ test_that("RecordBatchStreamReader / Writer", { }) test_that("RecordBatchFileReader / Writer", { - batch <- record_batch(tibble::tibble( + batch <- record_batch( x = 1:10, y = letters[1:10] - )) + ) sink <- BufferOutputStream() writer <- RecordBatchFileWriter(sink, batch$schema) diff --git a/r/tests/testthat/test-schema.R b/r/tests/testthat/test-schema.R index 2f2d3ee84e7..ff40b816ea6 100644 --- a/r/tests/testthat/test-schema.R +++ b/r/tests/testthat/test-schema.R @@ -20,7 +20,7 @@ context("arrow::Schema") test_that("reading schema from Buffer", { # TODO: this uses the streaming format, i.e. from RecordBatchStreamWriter # maybe there is an easier way to serialize a schema - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) expect_is(batch, "arrow::RecordBatch") stream <- BufferOutputStream() From c71d8728256632f97c6bf4de433b14c32ddda7b7 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 5 Feb 2019 17:09:55 +0100 Subject: [PATCH 04/21] update docs --- r/R/RecordBatch.R | 3 ++- r/man/record_batch.Rd | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index 44b7197719b..baf8ae81872 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -99,7 +99,8 @@ to_array <- function(x) { #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame #' -#' @param .data a data frame +#' @param ... A variable number of arrow::Array +#' @param schema a arrow::Schema #' #' @return a [arrow::RecordBatch][arrow__RecordBatch] #' @export diff --git a/r/man/record_batch.Rd b/r/man/record_batch.Rd index f886d7f4fda..a9680bf3735 100644 --- a/r/man/record_batch.Rd +++ b/r/man/record_batch.Rd @@ -7,7 +7,9 @@ record_batch(..., schema = NULL) } \arguments{ -\item{.data}{a data frame} +\item{...}{A variable number of arrow::Array} + +\item{schema}{a arrow::Schema} } \value{ a \link[=arrow__RecordBatch]{arrow::RecordBatch} From 20c5ce67156e7bb50ef3b738ba8b0570fa762afc Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Feb 2019 16:36:17 +0100 Subject: [PATCH 05/21] move the logic of `RecordBatch__from_arrays` internally. This will make it easier to do in parallel later. --- r/R/RcppExports.R | 4 ++-- r/R/RecordBatch.R | 8 +------- r/src/RcppExports.cpp | 8 ++++---- r/src/arrow_types.h | 3 +-- r/src/recordbatch.cpp | 23 +++++++++++++++++++++-- 5 files changed, 29 insertions(+), 17 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index b979c0e20a4..b46ee848f07 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -729,8 +729,8 @@ ipc___ReadRecordBatch__InputStream__Schema <- function(stream, schema) { .Call(`_arrow_ipc___ReadRecordBatch__InputStream__Schema`, stream, schema) } -RecordBatch__from_arrays <- function(schema, lst) { - .Call(`_arrow_RecordBatch__from_arrays`, schema, lst) +RecordBatch__from_arrays <- function(schema_sxp, lst) { + .Call(`_arrow_RecordBatch__from_arrays`, schema_sxp, lst) } RecordBatchReader__schema <- function(reader) { diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index baf8ae81872..04ee1dc4bc2 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -105,13 +105,7 @@ to_array <- function(x) { #' @return a [arrow::RecordBatch][arrow__RecordBatch] #' @export record_batch <- function(..., schema = NULL){ - # TODO: maybe this can be done internally in parallel - arrays <- map(tibble::lst(...), to_array) + arrays <- tibble::lst(...) stopifnot(length(arrays) > 0) - - if (!inherits(schema, "arrow::Schema")){ - schema <- schema(!!!map(arrays, ~.$type)) - } - shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays) ) } diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index e9771623ce1..65dd9a82f6e 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2047,14 +2047,14 @@ BEGIN_RCPP END_RCPP } // RecordBatch__from_arrays -std::shared_ptr RecordBatch__from_arrays(const std::shared_ptr& schema, List_ lst); -RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schemaSEXP, SEXP lstSEXP) { +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, List_ lst); +RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< const std::shared_ptr& >::type schema(schemaSEXP); + Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); Rcpp::traits::input_parameter< List_ >::type lst(lstSEXP); - rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema, lst)); + rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema_sxp, lst)); return rcpp_result_gen; END_RCPP } diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 4affee2d597..2e3549e8346 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -212,8 +212,7 @@ std::vector> list_to_shared_ptr_vector(SEXP lst) { R_xlen_t n = XLENGTH(lst); std::vector> res(n); for (R_xlen_t i = 0; i < n; i++) { - res[i] = Rcpp::ConstReferenceSmartPtrInputParameter>( - VECTOR_ELT(lst, i)); + res[i] = extract(VECTOR_ELT(lst, i)); } return res; } diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index f81f531c84a..715ef4458ee 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -147,8 +147,27 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( } // [[Rcpp::export]] -std::shared_ptr RecordBatch__from_arrays(const std::shared_ptr& schema, List_ lst) { - auto arrays = arrow::r::list_to_shared_ptr_vector(lst); +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, List_ lst) { + R_xlen_t n_arrays = lst.size(); + + // convert lst to a vector of arrow::Array + std::vector> arrays(n_arrays); + for(R_xlen_t i=0; i schema; + if( Rf_inherits(schema_sxp, "arrow::Schema")) { + schema = arrow::r::extract(schema_sxp); + } else { + CharacterVector names = lst.names(); + std::vector> fields(n_arrays); + for (R_xlen_t i=0; i(std::string(names[i]), arrays[i]->type()); + } + schema = std::make_shared(std::move(fields)); + } // check all sizes are the same int64_t num_rows= arrays[0]->length(); From c5ad62643a9bf5df6c89d62a7d7b243a16ab3c38 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Feb 2019 16:47:48 +0100 Subject: [PATCH 06/21] retire RecordBatch__from_dataframe() function, no longer needed and replaced by RecordBatch__from_arrays() --- r/R/RcppExports.R | 4 ---- r/src/RcppExports.cpp | 16 ++-------------- r/src/arrow_types.h | 3 +-- r/src/recordbatch.cpp | 26 ++++---------------------- r/src/table.cpp | 2 +- 5 files changed, 8 insertions(+), 43 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index b46ee848f07..fee6c5cb48b 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -693,10 +693,6 @@ RecordBatch__column <- function(batch, i) { .Call(`_arrow_RecordBatch__column`, batch, i) } -RecordBatch__from_dataframe <- function(tbl) { - .Call(`_arrow_RecordBatch__from_dataframe`, tbl) -} - RecordBatch__Equals <- function(self, other) { .Call(`_arrow_RecordBatch__Equals`, self, other) } diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 65dd9a82f6e..eee194f2c37 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -1940,17 +1940,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// RecordBatch__from_dataframe -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl); -RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< Rcpp::DataFrame >::type tbl(tblSEXP); - rcpp_result_gen = Rcpp::wrap(RecordBatch__from_dataframe(tbl)); - return rcpp_result_gen; -END_RCPP -} // RecordBatch__Equals bool RecordBatch__Equals(const std::shared_ptr& self, const std::shared_ptr& other); RcppExport SEXP _arrow_RecordBatch__Equals(SEXP selfSEXP, SEXP otherSEXP) { @@ -2047,13 +2036,13 @@ BEGIN_RCPP END_RCPP } // RecordBatch__from_arrays -std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, List_ lst); +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst); RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); - Rcpp::traits::input_parameter< List_ >::type lst(lstSEXP); + Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema_sxp, lst)); return rcpp_result_gen; END_RCPP @@ -2498,7 +2487,6 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, {"_arrow_RecordBatch__columns", (DL_FUNC) &_arrow_RecordBatch__columns, 1}, {"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2}, - {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) &_arrow_RecordBatch__from_dataframe, 1}, {"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2}, {"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) &_arrow_RecordBatch__RemoveColumn, 2}, {"_arrow_RecordBatch__column_name", (DL_FUNC) &_arrow_RecordBatch__column_name, 2}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 2e3549e8346..bade14243e1 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -178,8 +178,7 @@ inline constexpr Rbyte default_value() { SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); SEXP Array__as_vector(const std::shared_ptr& array); std::shared_ptr Array__from_vector(SEXP x, SEXP type); -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl); -std::shared_ptr Array__infer_type(SEXP x); +std::shared_ptr RecordBatch__from_arrays(SEXP, SEXP); namespace arrow { namespace r { diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 715ef4458ee..de996a7c881 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -54,24 +54,6 @@ std::shared_ptr RecordBatch__column( return batch->column(i); } -// [[Rcpp::export]] -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl) { - Rcpp::CharacterVector names = tbl.names(); - - std::vector> fields; - std::vector> arrays; - - for (int i = 0; i < tbl.size(); i++) { - SEXP x = tbl[i]; - arrays.push_back(Array__from_vector(x, R_NilValue)); - fields.push_back( - std::make_shared(std::string(names[i]), arrays[i]->type())); - } - auto schema = std::make_shared(std::move(fields)); - - return arrow::RecordBatch::Make(schema, tbl.nrow(), std::move(arrays)); -} - // [[Rcpp::export]] bool RecordBatch__Equals(const std::shared_ptr& self, const std::shared_ptr& other) { @@ -147,13 +129,13 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( } // [[Rcpp::export]] -std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, List_ lst) { - R_xlen_t n_arrays = lst.size(); +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) { + R_xlen_t n_arrays = XLENGTH(lst); // convert lst to a vector of arrow::Array std::vector> arrays(n_arrays); for(R_xlen_t i=0; i RecordBatch__from_arrays(SEXP schema_sxp, Li if( Rf_inherits(schema_sxp, "arrow::Schema")) { schema = arrow::r::extract(schema_sxp); } else { - CharacterVector names = lst.names(); + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); std::vector> fields(n_arrays); for (R_xlen_t i=0; i(std::string(names[i]), arrays[i]->type()); diff --git a/r/src/table.cpp b/r/src/table.cpp index c04e1d3aefa..92f29f37f29 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -24,7 +24,7 @@ using Rcpp::DataFrame; // [[Rcpp::export]] std::shared_ptr Table__from_dataframe(DataFrame tbl) { - auto rb = RecordBatch__from_dataframe(tbl); + auto rb = RecordBatch__from_arrays(R_NilValue, tbl); std::shared_ptr out; STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out)); From c00774a0f5011bb400be945d301ec07772d4c3ff Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Feb 2019 17:13:28 +0100 Subject: [PATCH 07/21] table() factory also handles ... and !!! a schema, similar to record_batch() --- r/R/RcppExports.R | 8 ++++---- r/R/Table.R | 9 +++++--- r/R/write_arrow.R | 2 +- r/man/table.Rd | 6 ++++-- r/src/RcppExports.cpp | 25 ++++++++++++----------- r/src/table.cpp | 18 ++++++++-------- r/tests/testthat/test-Table.R | 4 ++-- r/tests/testthat/test-arrow-csv-.R | 2 +- r/tests/testthat/test-read-write.R | 4 ++-- r/tests/testthat/test-read_record_batch.R | 7 ++++--- 10 files changed, 46 insertions(+), 39 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index fee6c5cb48b..0506cf9fb83 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -793,10 +793,6 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema) { .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema) } -Table__from_dataframe <- function(tbl) { - .Call(`_arrow_Table__from_dataframe`, tbl) -} - Table__num_columns <- function(x) { .Call(`_arrow_Table__num_columns`, x) } @@ -817,6 +813,10 @@ Table__columns <- function(table) { .Call(`_arrow_Table__columns`, table) } +Table__from_arrays <- function(schema_sxp, lst) { + .Call(`_arrow_Table__from_arrays`, schema_sxp, lst) +} + #' Get the capacity of the global thread pool #' #' @return the number of worker threads in the thread pool to which diff --git a/r/R/Table.R b/r/R/Table.R index 54731ca41e2..354bcf34ce9 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -53,11 +53,14 @@ #' Create an arrow::Table from a data frame #' -#' @param .data a data frame +#' @param ... arrays, chunked arrays, or R vectors +#' @param schema NULL or a schema #' #' @export -table <- function(.data){ - shared_ptr(`arrow::Table`, Table__from_dataframe(.data)) +table <- function(..., schema = NULL){ + arrays <- tibble::lst(...) + stopifnot(length(arrays) > 0) + shared_ptr(`arrow::Table`, Table__from_arrays(schema, arrays)) } #' @export diff --git a/r/R/write_arrow.R b/r/R/write_arrow.R index b979569d364..b489e5bf094 100644 --- a/r/R/write_arrow.R +++ b/r/R/write_arrow.R @@ -21,7 +21,7 @@ to_arrow <- function(x) { `to_arrow.arrow::RecordBatch` <- function(x) x `to_arrow.arrow::Table` <- function(x) x -`to_arrow.data.frame` <- function(x) table(x) +`to_arrow.data.frame` <- function(x) table(!!!x) #' serialize an [arrow::Table][arrow__Table], an [arrow::RecordBatch][arrow__RecordBatch], or a #' data frame to either the streaming format or the binary file format diff --git a/r/man/table.Rd b/r/man/table.Rd index 743ee06f360..4d93ff385b5 100644 --- a/r/man/table.Rd +++ b/r/man/table.Rd @@ -4,10 +4,12 @@ \alias{table} \title{Create an arrow::Table from a data frame} \usage{ -table(.data) +table(..., schema = NULL) } \arguments{ -\item{.data}{a data frame} +\item{...}{arrays, chunked arrays, or R vectors} + +\item{schema}{NULL or a schema} } \description{ Create an arrow::Table from a data frame diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index eee194f2c37..1f2c6ba32bc 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2225,17 +2225,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// Table__from_dataframe -std::shared_ptr Table__from_dataframe(DataFrame tbl); -RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP); - rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl)); - return rcpp_result_gen; -END_RCPP -} // Table__num_columns int Table__num_columns(const std::shared_ptr& x); RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) { @@ -2292,6 +2281,18 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Table__from_arrays +std::shared_ptr Table__from_arrays(SEXP schema_sxp, SEXP lst); +RcppExport SEXP _arrow_Table__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); + Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); + rcpp_result_gen = Rcpp::wrap(Table__from_arrays(schema_sxp, lst)); + return rcpp_result_gen; +END_RCPP +} // GetCpuThreadPoolCapacity int GetCpuThreadPoolCapacity(); RcppExport SEXP _arrow_GetCpuThreadPoolCapacity() { @@ -2512,12 +2513,12 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, - {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1}, {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2}, {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1}, + {"_arrow_Table__from_arrays", (DL_FUNC) &_arrow_Table__from_arrays, 2}, {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, {NULL, NULL, 0} diff --git a/r/src/table.cpp b/r/src/table.cpp index 92f29f37f29..531addc124c 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -22,15 +22,6 @@ using Rcpp::DataFrame; -// [[Rcpp::export]] -std::shared_ptr Table__from_dataframe(DataFrame tbl) { - auto rb = RecordBatch__from_arrays(R_NilValue, tbl); - - std::shared_ptr out; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out)); - return out; -} - // [[Rcpp::export]] int Table__num_columns(const std::shared_ptr& x) { return x->num_columns(); @@ -60,3 +51,12 @@ std::vector> Table__columns( } return res; } + +// [[Rcpp::export]] +std::shared_ptr Table__from_arrays(SEXP schema_sxp, SEXP lst) { + auto rb = RecordBatch__from_arrays(R_NilValue, lst); + + std::shared_ptr out; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out)); + return out; +} diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index 346ce4a2a66..0b11cb73da5 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -23,7 +23,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", { lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) tf <- tempfile() write_arrow(tab, tf) @@ -64,7 +64,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", { }) test_that("Table cast (ARROW-3741)", { - tab <- table(tibble::tibble(x = 1:10, y = 1:10)) + tab <- table(x = 1:10, y = 1:10) expect_error(tab$cast(schema(x = int32()))) expect_error(tab$cast(schema(x = int32(), z = int32()))) diff --git a/r/tests/testthat/test-arrow-csv-.R b/r/tests/testthat/test-arrow-csv-.R index 27b4d53c466..ee09b6005e4 100644 --- a/r/tests/testthat/test-arrow-csv-.R +++ b/r/tests/testthat/test-arrow-csv-.R @@ -27,7 +27,7 @@ test_that("Can read csv file", { tab3 <- read_csv_arrow(ReadableFile(tf)) iris$Species <- as.character(iris$Species) - tab0 <- table(iris) + tab0 <- table(!!!iris) expect_equal(tab0, tab1) expect_equal(tab0, tab2) expect_equal(tab0, tab3) diff --git a/r/tests/testthat/test-read-write.R b/r/tests/testthat/test-read-write.R index 3fe07cd91bb..c56a7d37901 100644 --- a/r/tests/testthat/test-read-write.R +++ b/r/tests/testthat/test-read-write.R @@ -24,7 +24,7 @@ test_that("arrow::table round trip", { raw = as.raw(1:10) ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) expect_equal(tab$num_columns, 3L) expect_equal(tab$num_rows, 10L) @@ -99,7 +99,7 @@ test_that("arrow::table round trip handles NA in integer and numeric", { raw = as.raw(1:10) ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) expect_equal(tab$num_columns, 3L) expect_equal(tab$num_rows, 10L) diff --git a/r/tests/testthat/test-read_record_batch.R b/r/tests/testthat/test-read_record_batch.R index 277bfd084a2..adbb192fa59 100644 --- a/r/tests/testthat/test-read_record_batch.R +++ b/r/tests/testthat/test-read_record_batch.R @@ -18,11 +18,12 @@ context("read_record_batch()") test_that("RecordBatchFileWriter / RecordBatchFileReader roundtrips", { - tab <- table(tibble::tibble( - int = 1:10, dbl = as.numeric(1:10), + tab <- table( + int = 1:10, + dbl = as.numeric(1:10), lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] - )) + ) tf <- tempfile() writer <- RecordBatchFileWriter(tf, tab$schema) From 41b496fc0b878d3afa6d823e5bffce65b4d8185b Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 7 Feb 2019 13:52:12 +0100 Subject: [PATCH 08/21] table(...) cab now either handle ... being: - list of record batches - list of - arrays - chunked arrays - columns - r vectors --- r/R/RcppExports.R | 4 ++-- r/R/Table.R | 8 +++---- r/src/RcppExports.cpp | 9 ++++---- r/src/table.cpp | 54 +++++++++++++++++++++++++++++++++++++++---- 4 files changed, 59 insertions(+), 16 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 0506cf9fb83..9c91323bd8d 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -813,8 +813,8 @@ Table__columns <- function(table) { .Call(`_arrow_Table__columns`, table) } -Table__from_arrays <- function(schema_sxp, lst) { - .Call(`_arrow_Table__from_arrays`, schema_sxp, lst) +Table__from_arrays <- function(lst) { + .Call(`_arrow_Table__from_arrays`, lst) } #' Get the capacity of the global thread pool diff --git a/r/R/Table.R b/r/R/Table.R index 354bcf34ce9..082990e28d3 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -54,13 +54,13 @@ #' Create an arrow::Table from a data frame #' #' @param ... arrays, chunked arrays, or R vectors -#' @param schema NULL or a schema +#' @param schema NULL or a schema (currently ignored) #' #' @export table <- function(..., schema = NULL){ - arrays <- tibble::lst(...) - stopifnot(length(arrays) > 0) - shared_ptr(`arrow::Table`, Table__from_arrays(schema, arrays)) + dots <- tibble::lst(...) + stopifnot(length(dots) > 0) + shared_ptr(`arrow::Table`, Table__from_arrays(dots)) } #' @export diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1f2c6ba32bc..5ee72498e2a 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2282,14 +2282,13 @@ BEGIN_RCPP END_RCPP } // Table__from_arrays -std::shared_ptr Table__from_arrays(SEXP schema_sxp, SEXP lst); -RcppExport SEXP _arrow_Table__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) { +std::shared_ptr Table__from_arrays(SEXP lst); +RcppExport SEXP _arrow_Table__from_arrays(SEXP lstSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); - rcpp_result_gen = Rcpp::wrap(Table__from_arrays(schema_sxp, lst)); + rcpp_result_gen = Rcpp::wrap(Table__from_arrays(lst)); return rcpp_result_gen; END_RCPP } @@ -2518,7 +2517,7 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2}, {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1}, - {"_arrow_Table__from_arrays", (DL_FUNC) &_arrow_Table__from_arrays, 2}, + {"_arrow_Table__from_arrays", (DL_FUNC) &_arrow_Table__from_arrays, 1}, {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, {NULL, NULL, 0} diff --git a/r/src/table.cpp b/r/src/table.cpp index 531addc124c..a2dcb993afe 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -52,11 +52,55 @@ std::vector> Table__columns( return res; } +bool all_record_batches(SEXP lst){ + R_xlen_t n = XLENGTH(lst); + for(R_xlen_t i = 0; i Table__from_arrays(SEXP schema_sxp, SEXP lst) { - auto rb = RecordBatch__from_arrays(R_NilValue, lst); +std::shared_ptr Table__from_arrays(SEXP lst) { + // lst can be aither: + // - a list of record batches, in which case we call Table::FromRecordBatches + + if (all_record_batches(lst)) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst); + std::shared_ptr tab; + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &tab)); + return tab; + } + + // - a list of arrays, chunked arrays or r vectors + + CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + + R_xlen_t n = XLENGTH(lst); + std::vector> columns(n); + std::vector> fields(n); + + for (R_xlen_t i = 0; i(x); + fields[i] = columns[i]->field(); + } else if (Rf_inherits(x, "arrow::ChunkedArray")) { + auto chunked_array = arrow::r::extract(x); + fields[i] = std::make_shared(std::string(names[i]), chunked_array->type()); + columns[i] = std::make_shared(fields[i], chunked_array); + } else if (Rf_inherits(x, "arrow::Array")) { + auto array = arrow::r::extract(x); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } else { + auto array = Array__from_vector(x); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } + } + + auto schema = std::make_shared(std::move(fields)); + return arrow::Table::Make(schema, columns); - std::shared_ptr out; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out)); - return out; } From 7e8a4b7d895ead827ba9a915b0f848c06e404885 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 7 Feb 2019 14:55:41 +0100 Subject: [PATCH 09/21] test for table(...) --- r/tests/testthat/test-Table.R | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index 0b11cb73da5..965db010553 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -81,3 +81,23 @@ test_that("Table dim() and nrow() (ARROW-3816)", { expect_equal(dim(tab), c(10L, 2L)) expect_equal(nrow(tab), 10L) }) + +test_that("table() handles record batches with splicing", { + batch <- record_batch(x = 1:2, y = letters[1:2]) + tab <- table(batch, batch, batch) + expect_equal(tab$schema, batch$schema) + expect_equal(tab$num_rows, 6L) + expect_equal( + as_tibble(tab), + vctrs::vec_rbind(as_tibble(batch), as_tibble(batch), as_tibble(batch)) + ) + + batches <- list(batch, batch, batch) + tab <- table(!!!batches) + expect_equal(tab$schema, batch$schema) + expect_equal(tab$num_rows, 6L) + expect_equal( + as_tibble(tab), + vctrs::vec_rbind(!!!purrr::map(batches, as_tibble)) + ) +}) From 68744f5d261b877de01f93e5b5f93cb22a31ba81 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 7 Feb 2019 15:05:30 +0100 Subject: [PATCH 10/21] tests for table(...) --- r/tests/testthat/test-Table.R | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index 965db010553..fa3cd86c771 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -101,3 +101,21 @@ test_that("table() handles record batches with splicing", { vctrs::vec_rbind(!!!purrr::map(batches, as_tibble)) ) }) + +test_that("table() handles ... of arrays, chunked arrays, vectors", { + a <- array(1:10) + ca <- chunked_array(1:5, 6:10) + v <- rnorm(10) + tbl <- tibble::tibble(x = 1:10, y = letters[1:10]) + + tab <- table(a = a, b = ca, c = v, !!!tbl) + expect_equal( + tab$schema, + schema(a = int32(), b = int32(), c = float64(), x = int32(), y = utf8()) + ) + res <- as_tibble(tab) + expect_equal(names(res), c("a", "b", "c", "x", "y")) + expect_equal(res, + tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10]) + ) +}) From d8e627fb4aade049b77cbe5fb69db715d109aefe Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 8 Feb 2019 15:06:12 +0100 Subject: [PATCH 11/21] use the schema= argument in table() --- r/R/RcppExports.R | 4 +-- r/R/Table.R | 6 ++-- r/src/RcppExports.cpp | 11 +++--- r/src/table.cpp | 81 ++++++++++++++++++++++++++++--------------- 4 files changed, 66 insertions(+), 36 deletions(-) diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 9c91323bd8d..e096122eaa6 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -813,8 +813,8 @@ Table__columns <- function(table) { .Call(`_arrow_Table__columns`, table) } -Table__from_arrays <- function(lst) { - .Call(`_arrow_Table__from_arrays`, lst) +Table__from_dots <- function(lst, schema_sxp) { + .Call(`_arrow_Table__from_dots`, lst, schema_sxp) } #' Get the capacity of the global thread pool diff --git a/r/R/Table.R b/r/R/Table.R index 082990e28d3..87e87ac0dba 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -54,13 +54,15 @@ #' Create an arrow::Table from a data frame #' #' @param ... arrays, chunked arrays, or R vectors -#' @param schema NULL or a schema (currently ignored) +#' @param schema a schema. The default (`NULL`) infers the schema from the `...` +#' +#' @return an arrow::Table #' #' @export table <- function(..., schema = NULL){ dots <- tibble::lst(...) stopifnot(length(dots) > 0) - shared_ptr(`arrow::Table`, Table__from_arrays(dots)) + shared_ptr(`arrow::Table`, Table__from_dots(dots, schema)) } #' @export diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 5ee72498e2a..a92c4c8d25b 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -2281,14 +2281,15 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// Table__from_arrays -std::shared_ptr Table__from_arrays(SEXP lst); -RcppExport SEXP _arrow_Table__from_arrays(SEXP lstSEXP) { +// Table__from_dots +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); +RcppExport SEXP _arrow_Table__from_dots(SEXP lstSEXP, SEXP schema_sxpSEXP) { BEGIN_RCPP Rcpp::RObject rcpp_result_gen; Rcpp::RNGScope rcpp_rngScope_gen; Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); - rcpp_result_gen = Rcpp::wrap(Table__from_arrays(lst)); + Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); + rcpp_result_gen = Rcpp::wrap(Table__from_dots(lst, schema_sxp)); return rcpp_result_gen; END_RCPP } @@ -2517,7 +2518,7 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2}, {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1}, - {"_arrow_Table__from_arrays", (DL_FUNC) &_arrow_Table__from_arrays, 1}, + {"_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, {NULL, NULL, 0} diff --git a/r/src/table.cpp b/r/src/table.cpp index a2dcb993afe..4698168ca1c 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -61,46 +61,73 @@ bool all_record_batches(SEXP lst){ } // [[Rcpp::export]] -std::shared_ptr Table__from_arrays(SEXP lst) { - // lst can be aither: +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { + // lst can be either: // - a list of record batches, in which case we call Table::FromRecordBatches if (all_record_batches(lst)) { auto batches = arrow::r::list_to_shared_ptr_vector(lst); std::shared_ptr tab; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &tab)); + + if(Rf_inherits(schema_sxp, "arrow::Schema")){ + auto schema = arrow::r::extract(schema_sxp); + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &tab)); + } else { + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &tab)); + } return tab; } - // - a list of arrays, chunked arrays or r vectors - - CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); - R_xlen_t n = XLENGTH(lst); std::vector> columns(n); - std::vector> fields(n); - - for (R_xlen_t i = 0; i(x); - fields[i] = columns[i]->field(); - } else if (Rf_inherits(x, "arrow::ChunkedArray")) { - auto chunked_array = arrow::r::extract(x); - fields[i] = std::make_shared(std::string(names[i]), chunked_array->type()); - columns[i] = std::make_shared(fields[i], chunked_array); - } else if (Rf_inherits(x, "arrow::Array")) { - auto array = arrow::r::extract(x); - fields[i] = std::make_shared(std::string(names[i]), array->type()); - columns[i] = std::make_shared(fields[i], array); - } else { - auto array = Array__from_vector(x); - fields[i] = std::make_shared(std::string(names[i]), array->type()); - columns[i] = std::make_shared(fields[i], array); + std::shared_ptr schema; + + if (Rf_isNull(schema_sxp)) { + // infer the schema from the ... + std::vector> fields(n); + CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + + for (R_xlen_t i = 0; i(x); + fields[i] = columns[i]->field(); + } else if (Rf_inherits(x, "arrow::ChunkedArray")) { + auto chunked_array = arrow::r::extract(x); + fields[i] = std::make_shared(std::string(names[i]), chunked_array->type()); + columns[i] = std::make_shared(fields[i], chunked_array); + } else if (Rf_inherits(x, "arrow::Array")) { + auto array = arrow::r::extract(x); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } else { + auto array = Array__from_vector(x); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } + } + schema = std::make_shared(std::move(fields)); + } else { + // use the schema that is given + schema = arrow::r::extract(schema_sxp); + + for (R_xlen_t i = 0; i(x); + } else if (Rf_inherits(x, "arrow::ChunkedArray")) { + auto chunked_array = arrow::r::extract(x); + columns[i] = std::make_shared(schema->field(i), chunked_array); + } else if (Rf_inherits(x, "arrow::Array")) { + auto array = arrow::r::extract(x); + columns[i] = std::make_shared(schema->field(i), array); + } else { + auto array = Array__from_vector(x); + columns[i] = std::make_shared(schema->field(i), array); + } } } - auto schema = std::make_shared(std::move(fields)); return arrow::Table::Make(schema, columns); } From d958108c9b6de8f4976d6048c08fddddf9a1222f Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Mar 2019 14:32:42 +0100 Subject: [PATCH 12/21] record_batch(..., schema = ) --- r/R/RecordBatch.R | 10 +------ r/src/arrow_types.h | 2 ++ r/src/recordbatch.cpp | 45 ++++++++++++++++++++++++----- r/src/table.cpp | 5 ++-- r/tests/testthat/test-RecordBatch.R | 4 +-- r/tests/testthat/test-Table.R | 2 +- 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index 04ee1dc4bc2..2b9148a8e3b 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -89,14 +89,6 @@ RecordBatch__to_dataframe(x, use_threads = use_threads) } -to_array <- function(x) { - if (inherits(x, "arrow::Array")) { - x - } else { - array(x) - } -} - #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame #' #' @param ... A variable number of arrow::Array @@ -107,5 +99,5 @@ to_array <- function(x) { record_batch <- function(..., schema = NULL){ arrays <- tibble::lst(...) stopifnot(length(arrays) > 0) - shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays) ) + shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays)) } diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index bade14243e1..6e454a19c86 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -216,5 +216,7 @@ std::vector> list_to_shared_ptr_vector(SEXP lst) { return res; } +std::shared_ptr Array__from_vector(SEXP x, const std::shared_ptr& type, bool type_infered); + } // namespace r } // namespace arrow diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index de996a7c881..01acd0ca4c4 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -128,8 +128,36 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( return batch; } +Status check_consistent_array_size(const std::vector>& arrays, int64_t* num_rows) { + *num_rows = arrays[0]->length(); + for(int64_t i = 1; ilength() != *num_rows) { + return Status::Invalid("All arrays must have the same length"); + } + } + return Status::OK(); +} + +std::shared_ptr RecordBatch__from_arrays__knwon_schema(const std::shared_ptr& schema, SEXP lst) { + R_xlen_t n_arrays = XLENGTH(lst); + + // convert lst to a vector of arrow::Array + std::vector> arrays(n_arrays); + for(R_xlen_t i=0; ifield(i)->type(), false); + } + + int64_t num_rows; + STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows)); + return arrow::RecordBatch::Make(schema, num_rows, arrays); +} + // [[Rcpp::export]] std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) { + if (Rf_inherits(schema_sxp, "arrow::Schema")) { + return RecordBatch__from_arrays__knwon_schema(arrow::r::extract(schema_sxp), lst); + } + R_xlen_t n_arrays = XLENGTH(lst); // convert lst to a vector of arrow::Array @@ -138,7 +166,7 @@ std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SE arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue); } - // extract or generate schema + // generate schema from the types that have been infered std::shared_ptr schema; if( Rf_inherits(schema_sxp, "arrow::Schema")) { schema = arrow::r::extract(schema_sxp); @@ -151,13 +179,16 @@ std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SE schema = std::make_shared(std::move(fields)); } - // check all sizes are the same - int64_t num_rows= arrays[0]->length(); - for(int64_t i = 1; ilength() != num_rows) { - Rcpp::stop("All arrays must have the same length"); - } + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + std::vector> fields(n_arrays); + for (R_xlen_t i=0; i(std::string(names[i]), arrays[i]->type()); } + schema = std::make_shared(std::move(fields)); + + // check all sizes are the same + int64_t num_rows; + STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows)); return arrow::RecordBatch::Make(schema, num_rows, arrays); } diff --git a/r/src/table.cpp b/r/src/table.cpp index 4698168ca1c..b45b4749025 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -101,7 +101,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { fields[i] = std::make_shared(std::string(names[i]), array->type()); columns[i] = std::make_shared(fields[i], array); } else { - auto array = Array__from_vector(x); + auto array = Array__from_vector(x, R_NilValue); fields[i] = std::make_shared(std::string(names[i]), array->type()); columns[i] = std::make_shared(fields[i], array); } @@ -122,7 +122,8 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { auto array = arrow::r::extract(x); columns[i] = std::make_shared(schema->field(i), array); } else { - auto array = Array__from_vector(x); + auto type = schema->field(i)->type(); + auto array = arrow::r::Array__from_vector(x, type, false); columns[i] = std::make_shared(schema->field(i), array); } } diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index 096b32c2705..3e391f40afa 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -109,7 +109,7 @@ test_that("RecordBatch with 0 rows are supported", { }) test_that("RecordBatch cast (ARROW-3741)", { - batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10)) + batch <- record_batch(x = 1:10, y = 1:10) expect_error(batch$cast(schema(x = int32()))) expect_error(batch$cast(schema(x = int32(), z = int32()))) @@ -122,7 +122,7 @@ test_that("RecordBatch cast (ARROW-3741)", { }) test_that("RecordBatch dim() and nrow() (ARROW-3816)", { - batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10)) + batch <- record_batch(x = 1:10, y = 1:10) expect_equal(dim(batch), c(10L, 2L)) expect_equal(nrow(batch), 10L) }) diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index fa3cd86c771..59234fa5c01 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -77,7 +77,7 @@ test_that("Table cast (ARROW-3741)", { }) test_that("Table dim() and nrow() (ARROW-3816)", { - tab <- table(tibble::tibble(x = 1:10, y = 1:10)) + tab <- table(x = 1:10, y = 1:10) expect_equal(dim(tab), c(10L, 2L)) expect_equal(nrow(tab), 10L) }) From fc885fda7cc630a5fded83dcfa8620d3d0754af6 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Mar 2019 14:34:55 +0100 Subject: [PATCH 13/21] directly return from builder_->Finish(), as suggested here: https://github.com/apache/arrow/pull/3635/files/08b295370271f122b410b991282b4919510b5cea#r261012517 --- r/src/array_from_vector.cpp | 3 +-- r/src/arrow_types.h | 4 +++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index 509a39a5e08..86acbb46e03 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -270,8 +270,7 @@ class VectorConverter { virtual Status Ingest(SEXP obj) = 0; virtual Status GetResult(std::shared_ptr* result) { - RETURN_NOT_OK(builder_->Finish(result)); - return Status::OK(); + return builder_->Finish(result); } ArrayBuilder* builder() const { return builder_; } diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 6e454a19c86..495c925d41e 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -19,6 +19,7 @@ #include #include +#include #include @@ -216,7 +217,8 @@ std::vector> list_to_shared_ptr_vector(SEXP lst) { return res; } -std::shared_ptr Array__from_vector(SEXP x, const std::shared_ptr& type, bool type_infered); +std::shared_ptr Array__from_vector( + SEXP x, const std::shared_ptr& type, bool type_infered); } // namespace r } // namespace arrow From c04f904c960dacaf43aa50fe93eb261551594fe0 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Mar 2019 14:54:44 +0100 Subject: [PATCH 14/21] tests about record_batch(schema=) argument --- r/tests/testthat/test-RecordBatch.R | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index 3e391f40afa..1c3ce7e9733 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -109,7 +109,7 @@ test_that("RecordBatch with 0 rows are supported", { }) test_that("RecordBatch cast (ARROW-3741)", { - batch <- record_batch(x = 1:10, y = 1:10) + batch <- record_batch(x = 1:10, y = 1:10) expect_error(batch$cast(schema(x = int32()))) expect_error(batch$cast(schema(x = int32(), z = int32()))) @@ -121,6 +121,19 @@ test_that("RecordBatch cast (ARROW-3741)", { expect_equal(batch2$column(1L)$type, int64()) }) +test_that("record_batch() handles schema= argument", { + s <- schema(x = int32(), y = int32()) + batch <- record_batch(x = 1:10, y = 1:10, schema = s) + expect_equal(s, batch$schema) + + s <- schema(x = int32(), y = float64()) + batch <- record_batch(x = 1:10, y = 1:10, schema = s) + expect_equal(s, batch$schema) + + s <- schema(x = int32(), y = utf8()) + expect_error(record_batch(x = 1:10, y = 1:10, schema = s)) +}) + test_that("RecordBatch dim() and nrow() (ARROW-3816)", { batch <- record_batch(x = 1:10, y = 1:10) expect_equal(dim(batch), c(10L, 2L)) From 457494266c2f36e40662d770a299da8979dae66d Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Mar 2019 15:22:42 +0100 Subject: [PATCH 15/21] STOP_IF migth be useful too --- r/src/arrow_types.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 495c925d41e..b5dc7a69170 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -36,11 +36,12 @@ #include #include -#define STOP_IF_NOT(TEST, MSG) \ - do { \ - if (!(TEST)) Rcpp::stop(MSG); \ - } while (0) +#define STOP_IF(TEST, MSG) \ +do { \ + if (TEST) Rcpp::stop(MSG); \ +} while (0) +#define STOP_IF_NOT(TEST, MSG) STOP_IF(!(TEST), MSG) #define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString()) template From efd84c50660d470ea7f92f8ed4250ca9a478a1b4 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 6 Mar 2019 15:23:25 +0100 Subject: [PATCH 16/21] record_batch(schema=) compares names --- r/src/arrow_types.h | 8 ++++---- r/src/recordbatch.cpp | 9 +++++++++ r/tests/testthat/test-RecordBatch.R | 6 ++++++ 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index b5dc7a69170..f4dad1bf219 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -36,10 +36,10 @@ #include #include -#define STOP_IF(TEST, MSG) \ -do { \ - if (TEST) Rcpp::stop(MSG); \ -} while (0) +#define STOP_IF(TEST, MSG) \ + do { \ + if (TEST) Rcpp::stop(MSG); \ + } while (0) #define STOP_IF_NOT(TEST, MSG) STOP_IF(!(TEST), MSG) #define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString()) diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 01acd0ca4c4..12593f6bedf 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -140,10 +140,19 @@ Status check_consistent_array_size(const std::vector RecordBatch__from_arrays__knwon_schema(const std::shared_ptr& schema, SEXP lst) { R_xlen_t n_arrays = XLENGTH(lst); + if(schema->num_fields() != n_arrays) { + Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied", schema->num_fields(), n_arrays); + } // convert lst to a vector of arrow::Array std::vector> arrays(n_arrays); + SEXP names = Rf_getAttrib(lst, R_NamesSymbol); + bool has_names = !Rf_isNull(names); + for(R_xlen_t i=0; ifield(i)->name() != CHAR(STRING_ELT(names, i))) { + Rcpp::stop("field at index %d has name '%s' != '%s'", i+1, schema->field(i)->name(), CHAR(STRING_ELT(names, i))); + } arrays[i] = arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false); } diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index 1c3ce7e9733..c0295bac2c8 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -134,6 +134,12 @@ test_that("record_batch() handles schema= argument", { expect_error(record_batch(x = 1:10, y = 1:10, schema = s)) }) +test_that("record_batch(schema=) does some basic consistency checking of the schema", { + s <- schema(x = int32()) + expect_error(record_batch(x = 1:10, y = 1:10, schema = s)) + expect_error(record_batch(z = 1:10, schema = s)) +}) + test_that("RecordBatch dim() and nrow() (ARROW-3816)", { batch <- record_batch(x = 1:10, y = 1:10) expect_equal(dim(batch), c(10L, 2L)) From eed535f6aa11a22769e44064fb265ccb92510614 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 7 Mar 2019 13:17:59 +0100 Subject: [PATCH 17/21] add comments about !!! --- r/R/feather.R | 5 +++++ r/R/write_arrow.R | 3 +++ 2 files changed, 8 insertions(+) diff --git a/r/R/feather.R b/r/R/feather.R index f47ff103224..4a1d9de0fa1 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -72,6 +72,11 @@ write_feather.default <- function(data, stream) { #' @export write_feather.data.frame <- function(data, stream) { + # splice the columns in the record_batch() call + # e.g. if we had data <- data.frame(x = <...>, y = <...>) + # then record_batch(!!!data) is the same as + # record_batch(x = data$x, y = data$y) + # see ?rlang::list2() write_feather(record_batch(!!!data), stream) } diff --git a/r/R/write_arrow.R b/r/R/write_arrow.R index b489e5bf094..486f3619730 100644 --- a/r/R/write_arrow.R +++ b/r/R/write_arrow.R @@ -21,6 +21,9 @@ to_arrow <- function(x) { `to_arrow.arrow::RecordBatch` <- function(x) x `to_arrow.arrow::Table` <- function(x) x + +# splice the data frame as arguments of table() +# see ?rlang::list2() `to_arrow.data.frame` <- function(x) table(!!!x) #' serialize an [arrow::Table][arrow__Table], an [arrow::RecordBatch][arrow__RecordBatch], or a From 2362ea0e7d4f6c2e94f3abc066f11b527f6e924c Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 7 Mar 2019 13:23:13 +0100 Subject: [PATCH 18/21] typo --- r/src/recordbatch.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 12593f6bedf..7de7d21187f 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -138,7 +138,7 @@ Status check_consistent_array_size(const std::vector RecordBatch__from_arrays__knwon_schema(const std::shared_ptr& schema, SEXP lst) { +std::shared_ptr RecordBatch__from_arrays__known_schema(const std::shared_ptr& schema, SEXP lst) { R_xlen_t n_arrays = XLENGTH(lst); if(schema->num_fields() != n_arrays) { Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied", schema->num_fields(), n_arrays); @@ -164,7 +164,7 @@ std::shared_ptr RecordBatch__from_arrays__knwon_schema(const // [[Rcpp::export]] std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) { if (Rf_inherits(schema_sxp, "arrow::Schema")) { - return RecordBatch__from_arrays__knwon_schema(arrow::r::extract(schema_sxp), lst); + return RecordBatch__from_arrays__known_schema(arrow::r::extract(schema_sxp), lst); } R_xlen_t n_arrays = XLENGTH(lst); From f27dcb9fe8635d905bdcafd92414ae979d9938b0 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 27 Feb 2019 19:21:25 -0600 Subject: [PATCH 19/21] Also run cpplint and clang-format on .cpp files --- r/src/array_from_vector.cpp | 38 +++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index 86acbb46e03..b026eeeb46b 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -298,14 +298,13 @@ struct Unbox> { return IngestRange(builder, reinterpret_cast(REAL(obj)), XLENGTH(obj), NA_INT64); } - // TODO: handle aw and logical + // TODO: handle raw and logical default: break; } - return Status::Invalid( - tfm::format("Cannot convert R vector of type %s to integer Arrow array", - Rcpp::type2name(obj))); + return Status::Invalid(tfm::format( + "Cannot convert R vector of type %s to integer Arrow array", Rcpp::type2name(obj))); } template @@ -593,7 +592,11 @@ class TimeConverter : public VectorConverter { using BuilderType = typename TypeTraits::BuilderType; public: +<<<<<<< HEAD explicit TimeConverter(TimeUnit::type unit) +======= + TimeConverter(TimeUnit::type unit) +>>>>>>> Also run cpplint and clang-format on .cpp files : unit_(unit), multiplier_(get_time_multiplier(unit)) {} Status Init(ArrayBuilder* builder) override { @@ -655,10 +658,17 @@ class TimeConverter : public VectorConverter { class TimestampConverter : public TimeConverter { public: +<<<<<<< HEAD explicit TimestampConverter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { +======= + TimestampConverter(TimeUnit::type unit) : TimeConverter(unit) {} + + protected: + virtual bool valid_R_object(SEXP obj) override { +>>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "POSIXct"); } @@ -670,20 +680,34 @@ class TimestampConverter : public TimeConverter { class Time32Converter : public TimeConverter { public: +<<<<<<< HEAD explicit Time32Converter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { +======= + Time32Converter(TimeUnit::type unit) : TimeConverter(unit) {} + + protected: + virtual bool valid_R_object(SEXP obj) override { +>>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); } }; class Time64Converter : public TimeConverter { public: +<<<<<<< HEAD explicit Time64Converter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { +======= + Time64Converter(TimeUnit::type unit) : TimeConverter(unit) {} + + protected: + virtual bool valid_R_object(SEXP obj) override { +>>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); } }; @@ -879,8 +903,14 @@ std::shared_ptr MakeSimpleArray(SEXP x) { buffers[0] = std::move(null_bitmap); } +<<<<<<< HEAD auto data = ArrayData::Make(std::make_shared(), LENGTH(x), std::move(buffers), null_count, 0); +======= + auto data = ArrayData::Make( + std::make_shared(), LENGTH(x), std::move(buffers), null_count, 0 /*offset*/ + ); +>>>>>>> Also run cpplint and clang-format on .cpp files // return the right Array class return std::make_shared::ArrayType>(data); From 6ea077883f335041c33df35631c0682633a4f7f4 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 8 Mar 2019 09:05:45 +0100 Subject: [PATCH 20/21] rebase --- r/src/array_from_vector.cpp | 38 ++++-------------------------------- r/src/recordbatch.cpp | 39 ++++++++++++++++++++++--------------- r/src/table.cpp | 16 +++++++-------- 3 files changed, 35 insertions(+), 58 deletions(-) diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index b026eeeb46b..e2b82ac90f1 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -303,8 +303,9 @@ struct Unbox> { break; } - return Status::Invalid(tfm::format( - "Cannot convert R vector of type %s to integer Arrow array", Rcpp::type2name(obj))); + return Status::Invalid( + tfm::format("Cannot convert R vector of type %s to integer Arrow array", + Rcpp::type2name(obj))); } template @@ -592,11 +593,7 @@ class TimeConverter : public VectorConverter { using BuilderType = typename TypeTraits::BuilderType; public: -<<<<<<< HEAD explicit TimeConverter(TimeUnit::type unit) -======= - TimeConverter(TimeUnit::type unit) ->>>>>>> Also run cpplint and clang-format on .cpp files : unit_(unit), multiplier_(get_time_multiplier(unit)) {} Status Init(ArrayBuilder* builder) override { @@ -658,17 +655,10 @@ class TimeConverter : public VectorConverter { class TimestampConverter : public TimeConverter { public: -<<<<<<< HEAD explicit TimestampConverter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { -======= - TimestampConverter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - virtual bool valid_R_object(SEXP obj) override { ->>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "POSIXct"); } @@ -680,34 +670,20 @@ class TimestampConverter : public TimeConverter { class Time32Converter : public TimeConverter { public: -<<<<<<< HEAD explicit Time32Converter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { -======= - Time32Converter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - virtual bool valid_R_object(SEXP obj) override { ->>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); } }; class Time64Converter : public TimeConverter { public: -<<<<<<< HEAD explicit Time64Converter(TimeUnit::type unit) : TimeConverter(unit) {} protected: bool valid_R_object(SEXP obj) override { -======= - Time64Converter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - virtual bool valid_R_object(SEXP obj) override { ->>>>>>> Also run cpplint and clang-format on .cpp files return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); } }; @@ -903,14 +879,8 @@ std::shared_ptr MakeSimpleArray(SEXP x) { buffers[0] = std::move(null_bitmap); } -<<<<<<< HEAD auto data = ArrayData::Make(std::make_shared(), LENGTH(x), std::move(buffers), - null_count, 0); -======= - auto data = ArrayData::Make( - std::make_shared(), LENGTH(x), std::move(buffers), null_count, 0 /*offset*/ - ); ->>>>>>> Also run cpplint and clang-format on .cpp files + null_count, 0 /*offset*/); // return the right Array class return std::make_shared::ArrayType>(data); diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 7de7d21187f..390143859f8 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -128,20 +128,23 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( return batch; } -Status check_consistent_array_size(const std::vector>& arrays, int64_t* num_rows) { +arrow::Status check_consistent_array_size( + const std::vector>& arrays, int64_t* num_rows) { *num_rows = arrays[0]->length(); - for(int64_t i = 1; ilength() != *num_rows) { - return Status::Invalid("All arrays must have the same length"); + return arrow::Status::Invalid("All arrays must have the same length"); } } - return Status::OK(); + return arrow::Status::OK(); } -std::shared_ptr RecordBatch__from_arrays__known_schema(const std::shared_ptr& schema, SEXP lst) { +std::shared_ptr RecordBatch__from_arrays__known_schema( + const std::shared_ptr& schema, SEXP lst) { R_xlen_t n_arrays = XLENGTH(lst); - if(schema->num_fields() != n_arrays) { - Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied", schema->num_fields(), n_arrays); + if (schema->num_fields() != n_arrays) { + Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied", + schema->num_fields(), n_arrays); } // convert lst to a vector of arrow::Array @@ -149,11 +152,13 @@ std::shared_ptr RecordBatch__from_arrays__known_schema(const SEXP names = Rf_getAttrib(lst, R_NamesSymbol); bool has_names = !Rf_isNull(names); - for(R_xlen_t i=0; ifield(i)->name() != CHAR(STRING_ELT(names, i))) { - Rcpp::stop("field at index %d has name '%s' != '%s'", i+1, schema->field(i)->name(), CHAR(STRING_ELT(names, i))); + Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1, + schema->field(i)->name(), CHAR(STRING_ELT(names, i))); } - arrays[i] = arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false); + arrays[i] = + arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false); } int64_t num_rows; @@ -164,33 +169,35 @@ std::shared_ptr RecordBatch__from_arrays__known_schema(const // [[Rcpp::export]] std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) { if (Rf_inherits(schema_sxp, "arrow::Schema")) { - return RecordBatch__from_arrays__known_schema(arrow::r::extract(schema_sxp), lst); + return RecordBatch__from_arrays__known_schema( + arrow::r::extract(schema_sxp), lst); } R_xlen_t n_arrays = XLENGTH(lst); // convert lst to a vector of arrow::Array std::vector> arrays(n_arrays); - for(R_xlen_t i=0; i schema; - if( Rf_inherits(schema_sxp, "arrow::Schema")) { + if (Rf_inherits(schema_sxp, "arrow::Schema")) { schema = arrow::r::extract(schema_sxp); } else { Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); std::vector> fields(n_arrays); - for (R_xlen_t i=0; i(std::string(names[i]), arrays[i]->type()); + for (R_xlen_t i = 0; i < n_arrays; i++) { + fields[i] = + std::make_shared(std::string(names[i]), arrays[i]->type()); } schema = std::make_shared(std::move(fields)); } Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); std::vector> fields(n_arrays); - for (R_xlen_t i=0; i(std::string(names[i]), arrays[i]->type()); } schema = std::make_shared(std::move(fields)); diff --git a/r/src/table.cpp b/r/src/table.cpp index b45b4749025..f78b2afdd7a 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -52,9 +52,9 @@ std::vector> Table__columns( return res; } -bool all_record_batches(SEXP lst){ +bool all_record_batches(SEXP lst) { R_xlen_t n = XLENGTH(lst); - for(R_xlen_t i = 0; i Table__from_dots(SEXP lst, SEXP schema_sxp) { auto batches = arrow::r::list_to_shared_ptr_vector(lst); std::shared_ptr tab; - if(Rf_inherits(schema_sxp, "arrow::Schema")){ + if (Rf_inherits(schema_sxp, "arrow::Schema")) { auto schema = arrow::r::extract(schema_sxp); STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &tab)); } else { @@ -85,16 +85,17 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { if (Rf_isNull(schema_sxp)) { // infer the schema from the ... std::vector> fields(n); - CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); - for (R_xlen_t i = 0; i(x); fields[i] = columns[i]->field(); } else if (Rf_inherits(x, "arrow::ChunkedArray")) { auto chunked_array = arrow::r::extract(x); - fields[i] = std::make_shared(std::string(names[i]), chunked_array->type()); + fields[i] = + std::make_shared(std::string(names[i]), chunked_array->type()); columns[i] = std::make_shared(fields[i], chunked_array); } else if (Rf_inherits(x, "arrow::Array")) { auto array = arrow::r::extract(x); @@ -111,7 +112,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { // use the schema that is given schema = arrow::r::extract(schema_sxp); - for (R_xlen_t i = 0; i(x); @@ -130,5 +131,4 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { } return arrow::Table::Make(schema, columns); - } From ab0cd1626699e6de4960b1810401fc510a5c60ae Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 8 Mar 2019 09:06:35 +0100 Subject: [PATCH 21/21] only pass $1 to run_clang_format so that we can do: ./r/lint.sh --fix --- r/lint.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/lint.sh b/r/lint.sh index e0a89413f20..fed64c1b31c 100755 --- a/r/lint.sh +++ b/r/lint.sh @@ -33,4 +33,4 @@ CPPLINT=$CPP_BUILD_SUPPORT/cpplint.py $CPP_BUILD_SUPPORT/run_cpplint.py \ --cpplint_binary=$CPPLINT \ --exclude_glob=$CPP_BUILD_SUPPORT/lint_exclusions.txt \ - --source_dir=$SOURCE_DIR/src --quiet $1 + --source_dir=$SOURCE_DIR/src --quiet