diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 81e1c040444..e096122eaa6 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -693,10 +693,6 @@ RecordBatch__column <- function(batch, i) { .Call(`_arrow_RecordBatch__column`, batch, i) } -RecordBatch__from_dataframe <- function(tbl) { - .Call(`_arrow_RecordBatch__from_dataframe`, tbl) -} - RecordBatch__Equals <- function(self, other) { .Call(`_arrow_RecordBatch__Equals`, self, other) } @@ -729,6 +725,10 @@ ipc___ReadRecordBatch__InputStream__Schema <- function(stream, schema) { .Call(`_arrow_ipc___ReadRecordBatch__InputStream__Schema`, stream, schema) } +RecordBatch__from_arrays <- function(schema_sxp, lst) { + .Call(`_arrow_RecordBatch__from_arrays`, schema_sxp, lst) +} + RecordBatchReader__schema <- function(reader) { .Call(`_arrow_RecordBatchReader__schema`, reader) } @@ -793,10 +793,6 @@ ipc___RecordBatchStreamWriter__Open <- function(stream, schema) { .Call(`_arrow_ipc___RecordBatchStreamWriter__Open`, stream, schema) } -Table__from_dataframe <- function(tbl) { - .Call(`_arrow_Table__from_dataframe`, tbl) -} - Table__num_columns <- function(x) { .Call(`_arrow_Table__num_columns`, x) } @@ -817,6 +813,10 @@ Table__columns <- function(table) { .Call(`_arrow_Table__columns`, table) } +Table__from_dots <- function(lst, schema_sxp) { + .Call(`_arrow_Table__from_dots`, lst, schema_sxp) +} + #' Get the capacity of the global thread pool #' #' @return the number of worker threads in the thread pool to which diff --git a/r/R/RecordBatch.R b/r/R/RecordBatch.R index 22fda8403c6..2b9148a8e3b 100644 --- a/r/R/RecordBatch.R +++ b/r/R/RecordBatch.R @@ -91,10 +91,13 @@ #' Create an [arrow::RecordBatch][arrow__RecordBatch] from a data frame #' -#' @param .data a data frame +#' @param ... A variable number of arrow::Array +#' @param schema a arrow::Schema #' #' @return a [arrow::RecordBatch][arrow__RecordBatch] #' @export -record_batch <- function(.data){ - shared_ptr(`arrow::RecordBatch`, RecordBatch__from_dataframe(.data)) +record_batch <- function(..., schema = NULL){ + arrays <- tibble::lst(...) + stopifnot(length(arrays) > 0) + shared_ptr(`arrow::RecordBatch`, RecordBatch__from_arrays(schema, arrays)) } diff --git a/r/R/Schema.R b/r/R/Schema.R index d57e2749c40..fbf6581d7a3 100644 --- a/r/R/Schema.R +++ b/r/R/Schema.R @@ -61,7 +61,7 @@ #' #' @export schema <- function(...){ - shared_ptr(`arrow::Schema`, schema_(.fields(list(...)))) + shared_ptr(`arrow::Schema`, schema_(.fields(list2(...)))) } #' read a Schema from a stream diff --git a/r/R/Table.R b/r/R/Table.R index 54731ca41e2..87e87ac0dba 100644 --- a/r/R/Table.R +++ b/r/R/Table.R @@ -53,11 +53,16 @@ #' Create an arrow::Table from a data frame #' -#' @param .data a data frame +#' @param ... arrays, chunked arrays, or R vectors +#' @param schema a schema. The default (`NULL`) infers the schema from the `...` +#' +#' @return an arrow::Table #' #' @export -table <- function(.data){ - shared_ptr(`arrow::Table`, Table__from_dataframe(.data)) +table <- function(..., schema = NULL){ + dots <- tibble::lst(...) + stopifnot(length(dots) > 0) + shared_ptr(`arrow::Table`, Table__from_dots(dots, schema)) } #' @export diff --git a/r/R/feather.R b/r/R/feather.R index 6e4b3a65776..4a1d9de0fa1 100644 --- a/r/R/feather.R +++ b/r/R/feather.R @@ -72,7 +72,12 @@ write_feather.default <- function(data, stream) { #' @export write_feather.data.frame <- function(data, stream) { - write_feather(record_batch(data), stream) + # splice the columns in the record_batch() call + # e.g. if we had data <- data.frame(x = <...>, y = <...>) + # then record_batch(!!!data) is the same as + # record_batch(x = data$x, y = data$y) + # see ?rlang::list2() + write_feather(record_batch(!!!data), stream) } #' @method write_feather arrow::RecordBatch diff --git a/r/R/write_arrow.R b/r/R/write_arrow.R index b979569d364..486f3619730 100644 --- a/r/R/write_arrow.R +++ b/r/R/write_arrow.R @@ -21,7 +21,10 @@ to_arrow <- function(x) { `to_arrow.arrow::RecordBatch` <- function(x) x `to_arrow.arrow::Table` <- function(x) x -`to_arrow.data.frame` <- function(x) table(x) + +# splice the data frame as arguments of table() +# see ?rlang::list2() +`to_arrow.data.frame` <- function(x) table(!!!x) #' serialize an [arrow::Table][arrow__Table], an [arrow::RecordBatch][arrow__RecordBatch], or a #' data frame to either the streaming format or the binary file format diff --git a/r/lint.sh b/r/lint.sh index e0a89413f20..fed64c1b31c 100755 --- a/r/lint.sh +++ b/r/lint.sh @@ -33,4 +33,4 @@ CPPLINT=$CPP_BUILD_SUPPORT/cpplint.py $CPP_BUILD_SUPPORT/run_cpplint.py \ --cpplint_binary=$CPPLINT \ --exclude_glob=$CPP_BUILD_SUPPORT/lint_exclusions.txt \ - --source_dir=$SOURCE_DIR/src --quiet $1 + --source_dir=$SOURCE_DIR/src --quiet diff --git a/r/man/record_batch.Rd b/r/man/record_batch.Rd index 4567a9ab763..a9680bf3735 100644 --- a/r/man/record_batch.Rd +++ b/r/man/record_batch.Rd @@ -4,10 +4,12 @@ \alias{record_batch} \title{Create an \link[=arrow__RecordBatch]{arrow::RecordBatch} from a data frame} \usage{ -record_batch(.data) +record_batch(..., schema = NULL) } \arguments{ -\item{.data}{a data frame} +\item{...}{A variable number of arrow::Array} + +\item{schema}{a arrow::Schema} } \value{ a \link[=arrow__RecordBatch]{arrow::RecordBatch} diff --git a/r/man/table.Rd b/r/man/table.Rd index 743ee06f360..4d93ff385b5 100644 --- a/r/man/table.Rd +++ b/r/man/table.Rd @@ -4,10 +4,12 @@ \alias{table} \title{Create an arrow::Table from a data frame} \usage{ -table(.data) +table(..., schema = NULL) } \arguments{ -\item{.data}{a data frame} +\item{...}{arrays, chunked arrays, or R vectors} + +\item{schema}{NULL or a schema} } \description{ Create an arrow::Table from a data frame diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index 1ac96d43a5b..a92c4c8d25b 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -1940,17 +1940,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// RecordBatch__from_dataframe -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl); -RcppExport SEXP _arrow_RecordBatch__from_dataframe(SEXP tblSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< Rcpp::DataFrame >::type tbl(tblSEXP); - rcpp_result_gen = Rcpp::wrap(RecordBatch__from_dataframe(tbl)); - return rcpp_result_gen; -END_RCPP -} // RecordBatch__Equals bool RecordBatch__Equals(const std::shared_ptr& self, const std::shared_ptr& other); RcppExport SEXP _arrow_RecordBatch__Equals(SEXP selfSEXP, SEXP otherSEXP) { @@ -2046,6 +2035,18 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// RecordBatch__from_arrays +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst); +RcppExport SEXP _arrow_RecordBatch__from_arrays(SEXP schema_sxpSEXP, SEXP lstSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); + Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); + rcpp_result_gen = Rcpp::wrap(RecordBatch__from_arrays(schema_sxp, lst)); + return rcpp_result_gen; +END_RCPP +} // RecordBatchReader__schema std::shared_ptr RecordBatchReader__schema(const std::shared_ptr& reader); RcppExport SEXP _arrow_RecordBatchReader__schema(SEXP readerSEXP) { @@ -2224,17 +2225,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// Table__from_dataframe -std::shared_ptr Table__from_dataframe(DataFrame tbl); -RcppExport SEXP _arrow_Table__from_dataframe(SEXP tblSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< DataFrame >::type tbl(tblSEXP); - rcpp_result_gen = Rcpp::wrap(Table__from_dataframe(tbl)); - return rcpp_result_gen; -END_RCPP -} // Table__num_columns int Table__num_columns(const std::shared_ptr& x); RcppExport SEXP _arrow_Table__num_columns(SEXP xSEXP) { @@ -2291,6 +2281,18 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Table__from_dots +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); +RcppExport SEXP _arrow_Table__from_dots(SEXP lstSEXP, SEXP schema_sxpSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type lst(lstSEXP); + Rcpp::traits::input_parameter< SEXP >::type schema_sxp(schema_sxpSEXP); + rcpp_result_gen = Rcpp::wrap(Table__from_dots(lst, schema_sxp)); + return rcpp_result_gen; +END_RCPP +} // GetCpuThreadPoolCapacity int GetCpuThreadPoolCapacity(); RcppExport SEXP _arrow_GetCpuThreadPoolCapacity() { @@ -2486,7 +2488,6 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, {"_arrow_RecordBatch__columns", (DL_FUNC) &_arrow_RecordBatch__columns, 1}, {"_arrow_RecordBatch__column", (DL_FUNC) &_arrow_RecordBatch__column, 2}, - {"_arrow_RecordBatch__from_dataframe", (DL_FUNC) &_arrow_RecordBatch__from_dataframe, 1}, {"_arrow_RecordBatch__Equals", (DL_FUNC) &_arrow_RecordBatch__Equals, 2}, {"_arrow_RecordBatch__RemoveColumn", (DL_FUNC) &_arrow_RecordBatch__RemoveColumn, 2}, {"_arrow_RecordBatch__column_name", (DL_FUNC) &_arrow_RecordBatch__column_name, 2}, @@ -2495,6 +2496,7 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_RecordBatch__Slice2", (DL_FUNC) &_arrow_RecordBatch__Slice2, 3}, {"_arrow_ipc___SerializeRecordBatch__Raw", (DL_FUNC) &_arrow_ipc___SerializeRecordBatch__Raw, 1}, {"_arrow_ipc___ReadRecordBatch__InputStream__Schema", (DL_FUNC) &_arrow_ipc___ReadRecordBatch__InputStream__Schema, 2}, + {"_arrow_RecordBatch__from_arrays", (DL_FUNC) &_arrow_RecordBatch__from_arrays, 2}, {"_arrow_RecordBatchReader__schema", (DL_FUNC) &_arrow_RecordBatchReader__schema, 1}, {"_arrow_RecordBatchReader__ReadNext", (DL_FUNC) &_arrow_RecordBatchReader__ReadNext, 1}, {"_arrow_ipc___RecordBatchStreamReader__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamReader__Open, 1}, @@ -2511,12 +2513,12 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ipc___RecordBatchWriter__Close", (DL_FUNC) &_arrow_ipc___RecordBatchWriter__Close, 1}, {"_arrow_ipc___RecordBatchFileWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchFileWriter__Open, 2}, {"_arrow_ipc___RecordBatchStreamWriter__Open", (DL_FUNC) &_arrow_ipc___RecordBatchStreamWriter__Open, 2}, - {"_arrow_Table__from_dataframe", (DL_FUNC) &_arrow_Table__from_dataframe, 1}, {"_arrow_Table__num_columns", (DL_FUNC) &_arrow_Table__num_columns, 1}, {"_arrow_Table__num_rows", (DL_FUNC) &_arrow_Table__num_rows, 1}, {"_arrow_Table__schema", (DL_FUNC) &_arrow_Table__schema, 1}, {"_arrow_Table__column", (DL_FUNC) &_arrow_Table__column, 2}, {"_arrow_Table__columns", (DL_FUNC) &_arrow_Table__columns, 1}, + {"_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, {"_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, {"_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, {NULL, NULL, 0} diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp index 509a39a5e08..e2b82ac90f1 100644 --- a/r/src/array_from_vector.cpp +++ b/r/src/array_from_vector.cpp @@ -270,8 +270,7 @@ class VectorConverter { virtual Status Ingest(SEXP obj) = 0; virtual Status GetResult(std::shared_ptr* result) { - RETURN_NOT_OK(builder_->Finish(result)); - return Status::OK(); + return builder_->Finish(result); } ArrayBuilder* builder() const { return builder_; } @@ -299,7 +298,7 @@ struct Unbox> { return IngestRange(builder, reinterpret_cast(REAL(obj)), XLENGTH(obj), NA_INT64); } - // TODO: handle aw and logical + // TODO: handle raw and logical default: break; } @@ -881,7 +880,7 @@ std::shared_ptr MakeSimpleArray(SEXP x) { } auto data = ArrayData::Make(std::make_shared(), LENGTH(x), std::move(buffers), - null_count, 0); + null_count, 0 /*offset*/); // return the right Array class return std::make_shared::ArrayType>(data); diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 32f54cb1d5a..f4dad1bf219 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -19,6 +19,7 @@ #include #include +#include #include @@ -35,11 +36,12 @@ #include #include -#define STOP_IF_NOT(TEST, MSG) \ - do { \ - if (!(TEST)) Rcpp::stop(MSG); \ +#define STOP_IF(TEST, MSG) \ + do { \ + if (TEST) Rcpp::stop(MSG); \ } while (0) +#define STOP_IF_NOT(TEST, MSG) STOP_IF(!(TEST), MSG) #define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString()) template @@ -178,8 +180,7 @@ inline constexpr Rbyte default_value() { SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); SEXP Array__as_vector(const std::shared_ptr& array); std::shared_ptr Array__from_vector(SEXP x, SEXP type); -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl); -std::shared_ptr Array__infer_type(SEXP x); +std::shared_ptr RecordBatch__from_arrays(SEXP, SEXP); namespace arrow { namespace r { @@ -207,5 +208,18 @@ inline std::shared_ptr extract(SEXP x) { return Rcpp::ConstReferenceSmartPtrInputParameter>(x); } +template +std::vector> list_to_shared_ptr_vector(SEXP lst) { + R_xlen_t n = XLENGTH(lst); + std::vector> res(n); + for (R_xlen_t i = 0; i < n; i++) { + res[i] = extract(VECTOR_ELT(lst, i)); + } + return res; +} + +std::shared_ptr Array__from_vector( + SEXP x, const std::shared_ptr& type, bool type_infered); + } // namespace r } // namespace arrow diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 31fefa8de60..390143859f8 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -54,24 +54,6 @@ std::shared_ptr RecordBatch__column( return batch->column(i); } -// [[Rcpp::export]] -std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl) { - Rcpp::CharacterVector names = tbl.names(); - - std::vector> fields; - std::vector> arrays; - - for (int i = 0; i < tbl.size(); i++) { - SEXP x = tbl[i]; - arrays.push_back(Array__from_vector(x, R_NilValue)); - fields.push_back( - std::make_shared(std::string(names[i]), arrays[i]->type())); - } - auto schema = std::make_shared(std::move(fields)); - - return arrow::RecordBatch::Make(schema, tbl.nrow(), std::move(arrays)); -} - // [[Rcpp::export]] bool RecordBatch__Equals(const std::shared_ptr& self, const std::shared_ptr& other) { @@ -145,3 +127,84 @@ std::shared_ptr ipc___ReadRecordBatch__InputStream__Schema( STOP_IF_NOT_OK(arrow::ipc::ReadRecordBatch(schema, &memo, stream.get(), &batch)); return batch; } + +arrow::Status check_consistent_array_size( + const std::vector>& arrays, int64_t* num_rows) { + *num_rows = arrays[0]->length(); + for (int64_t i = 1; i < arrays.size(); i++) { + if (arrays[i]->length() != *num_rows) { + return arrow::Status::Invalid("All arrays must have the same length"); + } + } + return arrow::Status::OK(); +} + +std::shared_ptr RecordBatch__from_arrays__known_schema( + const std::shared_ptr& schema, SEXP lst) { + R_xlen_t n_arrays = XLENGTH(lst); + if (schema->num_fields() != n_arrays) { + Rcpp::stop("incompatible. schema has %d fields, and %d arrays are supplied", + schema->num_fields(), n_arrays); + } + + // convert lst to a vector of arrow::Array + std::vector> arrays(n_arrays); + SEXP names = Rf_getAttrib(lst, R_NamesSymbol); + bool has_names = !Rf_isNull(names); + + for (R_xlen_t i = 0; i < n_arrays; i++) { + if (has_names && schema->field(i)->name() != CHAR(STRING_ELT(names, i))) { + Rcpp::stop("field at index %d has name '%s' != '%s'", i + 1, + schema->field(i)->name(), CHAR(STRING_ELT(names, i))); + } + arrays[i] = + arrow::r::Array__from_vector(VECTOR_ELT(lst, i), schema->field(i)->type(), false); + } + + int64_t num_rows; + STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows)); + return arrow::RecordBatch::Make(schema, num_rows, arrays); +} + +// [[Rcpp::export]] +std::shared_ptr RecordBatch__from_arrays(SEXP schema_sxp, SEXP lst) { + if (Rf_inherits(schema_sxp, "arrow::Schema")) { + return RecordBatch__from_arrays__known_schema( + arrow::r::extract(schema_sxp), lst); + } + + R_xlen_t n_arrays = XLENGTH(lst); + + // convert lst to a vector of arrow::Array + std::vector> arrays(n_arrays); + for (R_xlen_t i = 0; i < n_arrays; i++) { + arrays[i] = Array__from_vector(VECTOR_ELT(lst, i), R_NilValue); + } + + // generate schema from the types that have been infered + std::shared_ptr schema; + if (Rf_inherits(schema_sxp, "arrow::Schema")) { + schema = arrow::r::extract(schema_sxp); + } else { + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + std::vector> fields(n_arrays); + for (R_xlen_t i = 0; i < n_arrays; i++) { + fields[i] = + std::make_shared(std::string(names[i]), arrays[i]->type()); + } + schema = std::make_shared(std::move(fields)); + } + + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + std::vector> fields(n_arrays); + for (R_xlen_t i = 0; i < n_arrays; i++) { + fields[i] = std::make_shared(std::string(names[i]), arrays[i]->type()); + } + schema = std::make_shared(std::move(fields)); + + // check all sizes are the same + int64_t num_rows; + STOP_IF_NOT_OK(check_consistent_array_size(arrays, &num_rows)); + + return arrow::RecordBatch::Make(schema, num_rows, arrays); +} diff --git a/r/src/table.cpp b/r/src/table.cpp index c04e1d3aefa..f78b2afdd7a 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -22,15 +22,6 @@ using Rcpp::DataFrame; -// [[Rcpp::export]] -std::shared_ptr Table__from_dataframe(DataFrame tbl) { - auto rb = RecordBatch__from_dataframe(tbl); - - std::shared_ptr out; - STOP_IF_NOT_OK(arrow::Table::FromRecordBatches({std::move(rb)}, &out)); - return out; -} - // [[Rcpp::export]] int Table__num_columns(const std::shared_ptr& x) { return x->num_columns(); @@ -60,3 +51,84 @@ std::vector> Table__columns( } return res; } + +bool all_record_batches(SEXP lst) { + R_xlen_t n = XLENGTH(lst); + for (R_xlen_t i = 0; i < n; i++) { + if (!Rf_inherits(VECTOR_ELT(lst, i), "arrow::RecordBatch")) return false; + } + return true; +} + +// [[Rcpp::export]] +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { + // lst can be either: + // - a list of record batches, in which case we call Table::FromRecordBatches + + if (all_record_batches(lst)) { + auto batches = arrow::r::list_to_shared_ptr_vector(lst); + std::shared_ptr tab; + + if (Rf_inherits(schema_sxp, "arrow::Schema")) { + auto schema = arrow::r::extract(schema_sxp); + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(schema, batches, &tab)); + } else { + STOP_IF_NOT_OK(arrow::Table::FromRecordBatches(batches, &tab)); + } + return tab; + } + + R_xlen_t n = XLENGTH(lst); + std::vector> columns(n); + std::shared_ptr schema; + + if (Rf_isNull(schema_sxp)) { + // infer the schema from the ... + std::vector> fields(n); + Rcpp::CharacterVector names(Rf_getAttrib(lst, R_NamesSymbol)); + + for (R_xlen_t i = 0; i < n; i++) { + SEXP x = VECTOR_ELT(lst, i); + if (Rf_inherits(x, "arrow::Column")) { + columns[i] = arrow::r::extract(x); + fields[i] = columns[i]->field(); + } else if (Rf_inherits(x, "arrow::ChunkedArray")) { + auto chunked_array = arrow::r::extract(x); + fields[i] = + std::make_shared(std::string(names[i]), chunked_array->type()); + columns[i] = std::make_shared(fields[i], chunked_array); + } else if (Rf_inherits(x, "arrow::Array")) { + auto array = arrow::r::extract(x); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } else { + auto array = Array__from_vector(x, R_NilValue); + fields[i] = std::make_shared(std::string(names[i]), array->type()); + columns[i] = std::make_shared(fields[i], array); + } + } + schema = std::make_shared(std::move(fields)); + } else { + // use the schema that is given + schema = arrow::r::extract(schema_sxp); + + for (R_xlen_t i = 0; i < n; i++) { + SEXP x = VECTOR_ELT(lst, i); + if (Rf_inherits(x, "arrow::Column")) { + columns[i] = arrow::r::extract(x); + } else if (Rf_inherits(x, "arrow::ChunkedArray")) { + auto chunked_array = arrow::r::extract(x); + columns[i] = std::make_shared(schema->field(i), chunked_array); + } else if (Rf_inherits(x, "arrow::Array")) { + auto array = arrow::r::extract(x); + columns[i] = std::make_shared(schema->field(i), array); + } else { + auto type = schema->field(i)->type(); + auto array = arrow::r::Array__from_vector(x, type, false); + columns[i] = std::make_shared(schema->field(i), array); + } + } + } + + return arrow::Table::Make(schema, columns); +} diff --git a/r/tests/testthat/test-RecordBatch.R b/r/tests/testthat/test-RecordBatch.R index da09984b198..c0295bac2c8 100644 --- a/r/tests/testthat/test-RecordBatch.R +++ b/r/tests/testthat/test-RecordBatch.R @@ -24,7 +24,7 @@ test_that("RecordBatch", { chr = letters[1:10], fct = factor(letters[1:10]) ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) expect_true(batch == batch) expect_equal( @@ -93,7 +93,7 @@ test_that("RecordBatch with 0 rows are supported", { fct = factor(character(), levels = c("a", "b")) ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) expect_equal(batch$num_columns, 5L) expect_equal(batch$num_rows, 0L) expect_equal( @@ -109,7 +109,7 @@ test_that("RecordBatch with 0 rows are supported", { }) test_that("RecordBatch cast (ARROW-3741)", { - batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10)) + batch <- record_batch(x = 1:10, y = 1:10) expect_error(batch$cast(schema(x = int32()))) expect_error(batch$cast(schema(x = int32(), z = int32()))) @@ -121,8 +121,27 @@ test_that("RecordBatch cast (ARROW-3741)", { expect_equal(batch2$column(1L)$type, int64()) }) +test_that("record_batch() handles schema= argument", { + s <- schema(x = int32(), y = int32()) + batch <- record_batch(x = 1:10, y = 1:10, schema = s) + expect_equal(s, batch$schema) + + s <- schema(x = int32(), y = float64()) + batch <- record_batch(x = 1:10, y = 1:10, schema = s) + expect_equal(s, batch$schema) + + s <- schema(x = int32(), y = utf8()) + expect_error(record_batch(x = 1:10, y = 1:10, schema = s)) +}) + +test_that("record_batch(schema=) does some basic consistency checking of the schema", { + s <- schema(x = int32()) + expect_error(record_batch(x = 1:10, y = 1:10, schema = s)) + expect_error(record_batch(z = 1:10, schema = s)) +}) + test_that("RecordBatch dim() and nrow() (ARROW-3816)", { - batch <- record_batch(tibble::tibble(x = 1:10, y = 1:10)) + batch <- record_batch(x = 1:10, y = 1:10) expect_equal(dim(batch), c(10L, 2L)) expect_equal(nrow(batch), 10L) }) diff --git a/r/tests/testthat/test-Table.R b/r/tests/testthat/test-Table.R index 346ce4a2a66..59234fa5c01 100644 --- a/r/tests/testthat/test-Table.R +++ b/r/tests/testthat/test-Table.R @@ -23,7 +23,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", { lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) tf <- tempfile() write_arrow(tab, tf) @@ -64,7 +64,7 @@ test_that("read_table handles various input streams (ARROW-3450, ARROW-3505)", { }) test_that("Table cast (ARROW-3741)", { - tab <- table(tibble::tibble(x = 1:10, y = 1:10)) + tab <- table(x = 1:10, y = 1:10) expect_error(tab$cast(schema(x = int32()))) expect_error(tab$cast(schema(x = int32(), z = int32()))) @@ -77,7 +77,45 @@ test_that("Table cast (ARROW-3741)", { }) test_that("Table dim() and nrow() (ARROW-3816)", { - tab <- table(tibble::tibble(x = 1:10, y = 1:10)) + tab <- table(x = 1:10, y = 1:10) expect_equal(dim(tab), c(10L, 2L)) expect_equal(nrow(tab), 10L) }) + +test_that("table() handles record batches with splicing", { + batch <- record_batch(x = 1:2, y = letters[1:2]) + tab <- table(batch, batch, batch) + expect_equal(tab$schema, batch$schema) + expect_equal(tab$num_rows, 6L) + expect_equal( + as_tibble(tab), + vctrs::vec_rbind(as_tibble(batch), as_tibble(batch), as_tibble(batch)) + ) + + batches <- list(batch, batch, batch) + tab <- table(!!!batches) + expect_equal(tab$schema, batch$schema) + expect_equal(tab$num_rows, 6L) + expect_equal( + as_tibble(tab), + vctrs::vec_rbind(!!!purrr::map(batches, as_tibble)) + ) +}) + +test_that("table() handles ... of arrays, chunked arrays, vectors", { + a <- array(1:10) + ca <- chunked_array(1:5, 6:10) + v <- rnorm(10) + tbl <- tibble::tibble(x = 1:10, y = letters[1:10]) + + tab <- table(a = a, b = ca, c = v, !!!tbl) + expect_equal( + tab$schema, + schema(a = int32(), b = int32(), c = float64(), x = int32(), y = utf8()) + ) + res <- as_tibble(tab) + expect_equal(names(res), c("a", "b", "c", "x", "y")) + expect_equal(res, + tibble::tibble(a = 1:10, b = 1:10, c = v, x = 1:10, y = letters[1:10]) + ) +}) diff --git a/r/tests/testthat/test-arrow-csv-.R b/r/tests/testthat/test-arrow-csv-.R index 27b4d53c466..ee09b6005e4 100644 --- a/r/tests/testthat/test-arrow-csv-.R +++ b/r/tests/testthat/test-arrow-csv-.R @@ -27,7 +27,7 @@ test_that("Can read csv file", { tab3 <- read_csv_arrow(ReadableFile(tf)) iris$Species <- as.character(iris$Species) - tab0 <- table(iris) + tab0 <- table(!!!iris) expect_equal(tab0, tab1) expect_equal(tab0, tab2) expect_equal(tab0, tab3) diff --git a/r/tests/testthat/test-message.R b/r/tests/testthat/test-message.R index 3fe5829f869..4cbf87d3d0f 100644 --- a/r/tests/testthat/test-message.R +++ b/r/tests/testthat/test-message.R @@ -18,7 +18,7 @@ context("arrow::ipc::Message") test_that("read_message can read from input stream", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() stream <- BufferReader(bytes) diff --git a/r/tests/testthat/test-messagereader.R b/r/tests/testthat/test-messagereader.R index 5ff8277625d..690228d2b6d 100644 --- a/r/tests/testthat/test-messagereader.R +++ b/r/tests/testthat/test-messagereader.R @@ -18,7 +18,7 @@ context("arrow::ipc::MessageReader") test_that("MessageReader can be created from raw vectors", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() reader <- MessageReader(bytes) @@ -34,7 +34,7 @@ test_that("MessageReader can be created from raw vectors", { }) test_that("MessageReader can be created from input stream", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) bytes <- batch$serialize() stream <- BufferReader(bytes) diff --git a/r/tests/testthat/test-read-write.R b/r/tests/testthat/test-read-write.R index 3fe07cd91bb..c56a7d37901 100644 --- a/r/tests/testthat/test-read-write.R +++ b/r/tests/testthat/test-read-write.R @@ -24,7 +24,7 @@ test_that("arrow::table round trip", { raw = as.raw(1:10) ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) expect_equal(tab$num_columns, 3L) expect_equal(tab$num_rows, 10L) @@ -99,7 +99,7 @@ test_that("arrow::table round trip handles NA in integer and numeric", { raw = as.raw(1:10) ) - tab <- arrow::table(tbl) + tab <- arrow::table(!!!tbl) expect_equal(tab$num_columns, 3L) expect_equal(tab$num_rows, 10L) diff --git a/r/tests/testthat/test-read_record_batch.R b/r/tests/testthat/test-read_record_batch.R index 2618fae7326..adbb192fa59 100644 --- a/r/tests/testthat/test-read_record_batch.R +++ b/r/tests/testthat/test-read_record_batch.R @@ -18,11 +18,12 @@ context("read_record_batch()") test_that("RecordBatchFileWriter / RecordBatchFileReader roundtrips", { - tab <- table(tibble::tibble( - int = 1:10, dbl = as.numeric(1:10), + tab <- table( + int = 1:10, + dbl = as.numeric(1:10), lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] - )) + ) tf <- tempfile() writer <- RecordBatchFileWriter(tf, tab$schema) @@ -48,7 +49,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3 lgl = sample(c(TRUE, FALSE, NA), 10, replace = TRUE), chr = letters[1:10] ) - batch <- record_batch(tbl) + batch <- record_batch(!!!tbl) schema <- batch$schema raw <- batch$serialize() @@ -64,7 +65,7 @@ test_that("read_record_batch() handles (raw|Buffer|InputStream, Schema) (ARROW-3 }) test_that("read_record_batch() can handle (Message, Schema) parameters (ARROW-3499)", { - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) schema <- batch$schema raw <- batch$serialize() diff --git a/r/tests/testthat/test-recordbatchreader.R b/r/tests/testthat/test-recordbatchreader.R index d2b6a09c37b..65f7933b42d 100644 --- a/r/tests/testthat/test-recordbatchreader.R +++ b/r/tests/testthat/test-recordbatchreader.R @@ -18,10 +18,10 @@ context("arrow::RecordBatch.*(Reader|Writer)") test_that("RecordBatchStreamReader / Writer", { - batch <- record_batch(tibble::tibble( + batch <- record_batch( x = 1:10, y = letters[1:10] - )) + ) sink <- BufferOutputStream() writer <- RecordBatchStreamWriter(sink, batch$schema) @@ -43,10 +43,10 @@ test_that("RecordBatchStreamReader / Writer", { }) test_that("RecordBatchFileReader / Writer", { - batch <- record_batch(tibble::tibble( + batch <- record_batch( x = 1:10, y = letters[1:10] - )) + ) sink <- BufferOutputStream() writer <- RecordBatchFileWriter(sink, batch$schema) diff --git a/r/tests/testthat/test-schema.R b/r/tests/testthat/test-schema.R index 2f2d3ee84e7..ff40b816ea6 100644 --- a/r/tests/testthat/test-schema.R +++ b/r/tests/testthat/test-schema.R @@ -20,7 +20,7 @@ context("arrow::Schema") test_that("reading schema from Buffer", { # TODO: this uses the streaming format, i.e. from RecordBatchStreamWriter # maybe there is an easier way to serialize a schema - batch <- record_batch(tibble::tibble(x = 1:10)) + batch <- record_batch(x = 1:10) expect_is(batch, "arrow::RecordBatch") stream <- BufferOutputStream()