diff --git a/r/DESCRIPTION b/r/DESCRIPTION index 0521922a866..83df2569fd6 100644 --- a/r/DESCRIPTION +++ b/r/DESCRIPTION @@ -22,14 +22,13 @@ Imports: assertthat, glue, R6, - vctrs (>= 0.1.0.9000), + vctrs (>= 0.1.0), fs, tibble, crayon, withr, bit64 Remotes: - r-lib/vctrs, r-lib/withr Roxygen: list(markdown = TRUE) RoxygenNote: 6.1.1 diff --git a/r/NAMESPACE b/r/NAMESPACE index 4636c1215f2..adc481e1b1f 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -68,6 +68,10 @@ S3method(read_table,"arrow::ipc::RecordBatchStreamReader") S3method(read_table,character) S3method(read_table,fs_path) S3method(read_table,raw) +S3method(type,"arrow::Array") +S3method(type,"arrow::ChunkedArray") +S3method(type,"arrow::Column") +S3method(type,default) S3method(write_arrow,"arrow::ipc::RecordBatchWriter") S3method(write_arrow,character) S3method(write_arrow,fs_path) @@ -148,6 +152,7 @@ export(table) export(time32) export(time64) export(timestamp) +export(type) export(uint16) export(uint32) export(uint64) @@ -165,6 +170,7 @@ importFrom(glue,glue) importFrom(purrr,map) importFrom(purrr,map2) importFrom(purrr,map_int) +importFrom(rlang,"%||%") importFrom(rlang,abort) importFrom(rlang,dots_n) importFrom(rlang,list2) diff --git a/r/R/ArrayData.R b/r/R/ArrayData.R index 765971b405b..f18317913ba 100644 --- a/r/R/ArrayData.R +++ b/r/R/ArrayData.R @@ -26,7 +26,7 @@ #' @section Usage: #' #' ``` -#' data <- array(...)$data() +#' data <- array(x)$data() #' #' data$type() #' data$length() diff --git a/r/R/ChunkedArray.R b/r/R/ChunkedArray.R index 46e40766290..339a416dea6 100644 --- a/r/R/ChunkedArray.R +++ b/r/R/ChunkedArray.R @@ -60,11 +60,8 @@ #' @param \dots Vectors to coerce #' @param type currently ignored #' -#' @importFrom rlang list2 +#' @importFrom rlang list2 %||% #' @export -chunked_array <- function(..., type){ - if (!missing(type)) { - warn("The `type` argument is currently ignored") - } - shared_ptr(`arrow::ChunkedArray`, ChunkedArray__from_list(list2(...))) +chunked_array <- function(..., type = NULL){ + shared_ptr(`arrow::ChunkedArray`, ChunkedArray__from_list(list2(...), type)) } diff --git a/r/R/R6.R b/r/R/R6.R index 69d58e0c136..26c679f2017 100644 --- a/r/R/R6.R +++ b/r/R/R6.R @@ -130,6 +130,31 @@ unique_ptr <- function(class, xp) { shared_ptr(`arrow::DataType`, xp)$..dispatch() } +#' infer the arrow Array type from an R vector +#' +#' @param x an R vector +#' +#' @return an arrow logical type +#' @export +type <- function(x) { + UseMethod("type") +} + +#' @export +type.default <- function(x) { + `arrow::DataType`$dispatch(Array__infer_type(x)) +} + +#' @export +`type.arrow::Array` <- function(x) x$type + +#' @export +`type.arrow::ChunkedArray` <- function(x) x$type + +#' @export +`type.arrow::Column` <- function(x) x$type + + #----- metadata #' @title class arrow::FixedWidthType diff --git a/r/R/RcppExports.R b/r/R/RcppExports.R index 3d493c7f066..3940cc208ef 100644 --- a/r/R/RcppExports.R +++ b/r/R/RcppExports.R @@ -1,10 +1,6 @@ # Generated by using Rcpp::compileAttributes() -> do not edit by hand # Generator token: 10BE3573-1514-4C36-9D1C-5A225CD40393 -Array__from_vector <- function(x) { - .Call(`_arrow_Array__from_vector`, x) -} - Array__Slice1 <- function(array, offset) { .Call(`_arrow_Array__Slice1`, array, offset) } @@ -89,6 +85,18 @@ Table__to_dataframe <- function(table, use_threads) { .Call(`_arrow_Table__to_dataframe`, table, use_threads) } +Array__infer_type <- function(x) { + .Call(`_arrow_Array__infer_type`, x) +} + +Array__from_vector <- function(x, s_type) { + .Call(`_arrow_Array__from_vector`, x, s_type) +} + +ChunkedArray__from_list <- function(chunks, s_type) { + .Call(`_arrow_ChunkedArray__from_list`, chunks, s_type) +} + ArrayData__get_type <- function(x) { .Call(`_arrow_ArrayData__get_type`, x) } @@ -161,10 +169,6 @@ ChunkArray__Slice2 <- function(chunked_array, offset, length) { .Call(`_arrow_ChunkArray__Slice2`, chunked_array, offset, length) } -ChunkedArray__from_list <- function(chunks) { - .Call(`_arrow_ChunkedArray__from_list`, chunks) -} - Column__length <- function(column) { .Call(`_arrow_Column__length`, column) } diff --git a/r/R/array.R b/r/R/array.R index 63fdb4e0f61..ccb852181cc 100644 --- a/r/R/array.R +++ b/r/R/array.R @@ -28,7 +28,7 @@ #' @section Usage: #' #' ``` -#' a <- array(...) +#' a <- array(x) #' #' a$IsNull(i) #' a$IsValid(i) @@ -119,16 +119,13 @@ #' create an [arrow::Array][arrow__Array] from an R vector #' -#' @param \dots Vectors to coerce -#' @param type currently ignored +#' @param x R object +#' @param type Explicit [type][arrow__DataType], or NULL (the default) to infer from the data #' #' @importFrom rlang warn #' @export -array <- function(..., type){ - if (!missing(type)) { - warn("The `type` argument is currently ignored") - } - `arrow::Array`$dispatch(Array__from_vector(vctrs::vec_c(...))) +array <- function(x, type = NULL){ + `arrow::Array`$dispatch(Array__from_vector(x, type)) } `arrow::DictionaryArray` <- R6Class("arrow::DictionaryArray", inherit = `arrow::Array`, diff --git a/r/man/array.Rd b/r/man/array.Rd index ccdba181db8..2b784caf9a1 100644 --- a/r/man/array.Rd +++ b/r/man/array.Rd @@ -4,12 +4,12 @@ \alias{array} \title{create an \link[=arrow__Array]{arrow::Array} from an R vector} \usage{ -array(..., type) +array(x, type = NULL) } \arguments{ -\item{\dots}{Vectors to coerce} +\item{x}{R object} -\item{type}{currently ignored} +\item{type}{Explicit \link[=arrow__DataType]{type}, or NULL (the default) to infer from the data} } \description{ create an \link[=arrow__Array]{arrow::Array} from an R vector diff --git a/r/man/arrow__Array.Rd b/r/man/arrow__Array.Rd index b11373d26b3..dabed1f6fa2 100644 --- a/r/man/arrow__Array.Rd +++ b/r/man/arrow__Array.Rd @@ -13,7 +13,7 @@ class arrow::Array Array base type. Immutable data array with some logical type and some length. } \section{Usage}{ -\preformatted{a <- array(...) +\preformatted{a <- array(x) a$IsNull(i) a$IsValid(i) diff --git a/r/man/arrow__ArrayData.Rd b/r/man/arrow__ArrayData.Rd index bdf996605c5..af48dd334a5 100644 --- a/r/man/arrow__ArrayData.Rd +++ b/r/man/arrow__ArrayData.Rd @@ -9,7 +9,7 @@ class arrow::ArrayData } \section{Usage}{ -\preformatted{data <- array(...)$data() +\preformatted{data <- array(x)$data() data$type() data$length() diff --git a/r/man/chunked_array.Rd b/r/man/chunked_array.Rd index c6973be7210..07dac8a841d 100644 --- a/r/man/chunked_array.Rd +++ b/r/man/chunked_array.Rd @@ -4,7 +4,7 @@ \alias{chunked_array} \title{create an \link[=arrow__ChunkedArray]{arrow::ChunkedArray} from various R vectors} \usage{ -chunked_array(..., type) +chunked_array(..., type = NULL) } \arguments{ \item{\dots}{Vectors to coerce} diff --git a/r/man/type.Rd b/r/man/type.Rd new file mode 100644 index 00000000000..3e2b4f408a2 --- /dev/null +++ b/r/man/type.Rd @@ -0,0 +1,17 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/R6.R +\name{type} +\alias{type} +\title{infer the arrow Array type from an R vector} +\usage{ +type(x) +} +\arguments{ +\item{x}{an R vector} +} +\value{ +an arrow logical type +} +\description{ +infer the arrow Array type from an R vector +} diff --git a/r/src/RcppExports.cpp b/r/src/RcppExports.cpp index e726f470969..6f78fcf0a77 100644 --- a/r/src/RcppExports.cpp +++ b/r/src/RcppExports.cpp @@ -6,17 +6,6 @@ using namespace Rcpp; -// Array__from_vector -std::shared_ptr Array__from_vector(SEXP x); -RcppExport SEXP _arrow_Array__from_vector(SEXP xSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< SEXP >::type x(xSEXP); - rcpp_result_gen = Rcpp::wrap(Array__from_vector(x)); - return rcpp_result_gen; -END_RCPP -} // Array__Slice1 std::shared_ptr Array__Slice1(const std::shared_ptr& array, int offset); RcppExport SEXP _arrow_Array__Slice1(SEXP arraySEXP, SEXP offsetSEXP) { @@ -261,6 +250,41 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } +// Array__infer_type +std::shared_ptr Array__infer_type(SEXP x); +RcppExport SEXP _arrow_Array__infer_type(SEXP xSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type x(xSEXP); + rcpp_result_gen = Rcpp::wrap(Array__infer_type(x)); + return rcpp_result_gen; +END_RCPP +} +// Array__from_vector +std::shared_ptr Array__from_vector(SEXP x, SEXP s_type); +RcppExport SEXP _arrow_Array__from_vector(SEXP xSEXP, SEXP s_typeSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< SEXP >::type x(xSEXP); + Rcpp::traits::input_parameter< SEXP >::type s_type(s_typeSEXP); + rcpp_result_gen = Rcpp::wrap(Array__from_vector(x, s_type)); + return rcpp_result_gen; +END_RCPP +} +// ChunkedArray__from_list +std::shared_ptr ChunkedArray__from_list(List chunks, SEXP s_type); +RcppExport SEXP _arrow_ChunkedArray__from_list(SEXP chunksSEXP, SEXP s_typeSEXP) { +BEGIN_RCPP + Rcpp::RObject rcpp_result_gen; + Rcpp::RNGScope rcpp_rngScope_gen; + Rcpp::traits::input_parameter< List >::type chunks(chunksSEXP); + Rcpp::traits::input_parameter< SEXP >::type s_type(s_typeSEXP); + rcpp_result_gen = Rcpp::wrap(ChunkedArray__from_list(chunks, s_type)); + return rcpp_result_gen; +END_RCPP +} // ArrayData__get_type std::shared_ptr ArrayData__get_type(const std::shared_ptr& x); RcppExport SEXP _arrow_ArrayData__get_type(SEXP xSEXP) { @@ -462,17 +486,6 @@ BEGIN_RCPP return rcpp_result_gen; END_RCPP } -// ChunkedArray__from_list -std::shared_ptr ChunkedArray__from_list(List chunks); -RcppExport SEXP _arrow_ChunkedArray__from_list(SEXP chunksSEXP) { -BEGIN_RCPP - Rcpp::RObject rcpp_result_gen; - Rcpp::RNGScope rcpp_rngScope_gen; - Rcpp::traits::input_parameter< List >::type chunks(chunksSEXP); - rcpp_result_gen = Rcpp::wrap(ChunkedArray__from_list(chunks)); - return rcpp_result_gen; -END_RCPP -} // Column__length int Column__length(const std::shared_ptr& column); RcppExport SEXP _arrow_Column__length(SEXP columnSEXP) { @@ -2300,7 +2313,6 @@ END_RCPP } static const R_CallMethodDef CallEntries[] = { - {"_arrow_Array__from_vector", (DL_FUNC) &_arrow_Array__from_vector, 1}, {"_arrow_Array__Slice1", (DL_FUNC) &_arrow_Array__Slice1, 2}, {"_arrow_Array__Slice2", (DL_FUNC) &_arrow_Array__Slice2, 3}, {"_arrow_Array__IsNull", (DL_FUNC) &_arrow_Array__IsNull, 2}, @@ -2322,6 +2334,9 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 1}, {"_arrow_RecordBatch__to_dataframe", (DL_FUNC) &_arrow_RecordBatch__to_dataframe, 2}, {"_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 2}, + {"_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, + {"_arrow_Array__from_vector", (DL_FUNC) &_arrow_Array__from_vector, 2}, + {"_arrow_ChunkedArray__from_list", (DL_FUNC) &_arrow_ChunkedArray__from_list, 2}, {"_arrow_ArrayData__get_type", (DL_FUNC) &_arrow_ArrayData__get_type, 1}, {"_arrow_ArrayData__get_length", (DL_FUNC) &_arrow_ArrayData__get_length, 1}, {"_arrow_ArrayData__get_null_count", (DL_FUNC) &_arrow_ArrayData__get_null_count, 1}, @@ -2340,7 +2355,6 @@ static const R_CallMethodDef CallEntries[] = { {"_arrow_ChunkedArray__type", (DL_FUNC) &_arrow_ChunkedArray__type, 1}, {"_arrow_ChunkArray__Slice1", (DL_FUNC) &_arrow_ChunkArray__Slice1, 2}, {"_arrow_ChunkArray__Slice2", (DL_FUNC) &_arrow_ChunkArray__Slice2, 3}, - {"_arrow_ChunkedArray__from_list", (DL_FUNC) &_arrow_ChunkedArray__from_list, 1}, {"_arrow_Column__length", (DL_FUNC) &_arrow_Column__length, 1}, {"_arrow_Column__null_count", (DL_FUNC) &_arrow_Column__null_count, 1}, {"_arrow_Column__type", (DL_FUNC) &_arrow_Column__type, 1}, diff --git a/r/src/array.cpp b/r/src/array.cpp index dd0d7e64a20..0749f2f80b4 100644 --- a/r/src/array.cpp +++ b/r/src/array.cpp @@ -20,486 +20,6 @@ using namespace Rcpp; using namespace arrow; -namespace arrow { -namespace r { - -template -inline bool isna(typename Vector::stored_type x) { - return Vector::is_na(x); -} - -template <> -inline bool isna(double x) { - return ISNA(x); -} - -template -std::shared_ptr SimpleArray(SEXP x) { - Rcpp::Vector vec(x); - auto n = vec.size(); - std::vector> buffers{nullptr, - std::make_shared>(vec)}; - - int null_count = 0; - if (RTYPE != RAWSXP) { - std::shared_ptr null_bitmap; - - auto first_na = std::find_if(vec.begin(), vec.end(), Rcpp::Vector::is_na); - if (first_na < vec.end()) { - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_bitmap)); - internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); - - // first loop to clear all the bits before the first NA - auto j = std::distance(vec.begin(), first_na); - int i = 0; - for (; i < j; i++, bitmap_writer.Next()) { - bitmap_writer.Set(); - } - - // then finish - for (; i < n; i++, bitmap_writer.Next()) { - if (isna(vec[i])) { - bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - } - } - - bitmap_writer.Finish(); - buffers[0] = std::move(null_bitmap); - } - } - - auto data = ArrayData::Make( - std::make_shared(), LENGTH(x), std::move(buffers), null_count, 0 /*offset*/ - ); - - // return the right Array class - return std::make_shared::ArrayType>(data); -} - -std::shared_ptr MakeBooleanArray(LogicalVector_ vec) { - R_xlen_t n = vec.size(); - - // allocate a buffer for the data - std::shared_ptr data_bitmap; - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &data_bitmap)); - auto data_bitmap_data = data_bitmap->mutable_data(); - internal::FirstTimeBitmapWriter bitmap_writer(data_bitmap_data, 0, n); - R_xlen_t null_count = 0; - - // loop until the first no null - R_xlen_t i = 0; - for (; i < n; i++, bitmap_writer.Next()) { - if (vec[i] == 0) { - bitmap_writer.Clear(); - } else if (vec[i] == NA_LOGICAL) { - break; - } else { - bitmap_writer.Set(); - } - } - - std::shared_ptr null_bitmap(nullptr); - if (i < n) { - // there has been a null before the end, so we need - // to collect that information in a null bitmap - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_bitmap)); - auto null_bitmap_data = null_bitmap->mutable_data(); - internal::FirstTimeBitmapWriter null_bitmap_writer(null_bitmap_data, 0, n); - - // catch up on the initial `i` bits - for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { - null_bitmap_writer.Set(); - } - - // finish both bitmaps - for (; i < n; i++, bitmap_writer.Next(), null_bitmap_writer.Next()) { - if (vec[i] == 0) { - bitmap_writer.Clear(); - null_bitmap_writer.Set(); - } else if (vec[i] == NA_LOGICAL) { - null_bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - null_bitmap_writer.Set(); - } - } - null_bitmap_writer.Finish(); - } - bitmap_writer.Finish(); - - auto data = - ArrayData::Make(boolean(), n, {std::move(null_bitmap), std::move(data_bitmap)}, - null_count, 0 /*offset*/ - ); - - // return the right Array class - return MakeArray(data); -} - -std::shared_ptr MakeStringArray(StringVector_ vec) { - R_xlen_t n = vec.size(); - - std::shared_ptr null_buffer; - std::shared_ptr offset_buffer; - std::shared_ptr value_buffer; - - // there is always an offset buffer - STOP_IF_NOT_OK(AllocateBuffer((n + 1) * sizeof(int32_t), &offset_buffer)); - - R_xlen_t i = 0; - int current_offset = 0; - int64_t null_count = 0; - auto p_offset = reinterpret_cast(offset_buffer->mutable_data()); - *p_offset = 0; - for (++p_offset; i < n; i++, ++p_offset) { - SEXP s = STRING_ELT(vec, i); - if (s == NA_STRING) { - // break as we are going to need a null_bitmap buffer - break; - } - - *p_offset = current_offset += LENGTH(s); - } - - if (i < n) { - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_buffer)); - internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); - - // catch up - for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { - null_bitmap_writer.Set(); - } - - // resume offset filling - for (; i < n; i++, ++p_offset, null_bitmap_writer.Next()) { - SEXP s = STRING_ELT(vec, i); - if (s == NA_STRING) { - null_bitmap_writer.Clear(); - *p_offset = current_offset; - null_count++; - } else { - null_bitmap_writer.Set(); - *p_offset = current_offset += LENGTH(s); - } - } - - null_bitmap_writer.Finish(); - } - - // ----- data buffer - if (current_offset > 0) { - STOP_IF_NOT_OK(AllocateBuffer(current_offset, &value_buffer)); - p_offset = reinterpret_cast(offset_buffer->mutable_data()); - auto p_data = reinterpret_cast(value_buffer->mutable_data()); - - for (R_xlen_t i = 0; i < n; i++) { - SEXP s = STRING_ELT(vec, i); - if (s != NA_STRING) { - auto ni = LENGTH(s); - std::copy_n(CHAR(s), ni, p_data); - p_data += ni; - } - } - } - - auto data = ArrayData::Make(arrow::utf8(), n, - {null_buffer, offset_buffer, value_buffer}, null_count, 0); - return MakeArray(data); -} - -template -std::shared_ptr MakeFactorArrayImpl(Rcpp::IntegerVector_ factor) { - using value_type = typename arrow::TypeTraits::ArrayType::value_type; - auto dict_values = MakeStringArray(Rf_getAttrib(factor, R_LevelsSymbol)); - auto dict_type = - dictionary(std::make_shared(), dict_values, Rf_inherits(factor, "ordered")); - - auto n = factor.size(); - - std::shared_ptr indices_buffer; - STOP_IF_NOT_OK(AllocateBuffer(n * sizeof(value_type), &indices_buffer)); - - std::vector> buffers{nullptr, indices_buffer}; - - int64_t null_count = 0; - R_xlen_t i = 0; - auto p_factor = factor.begin(); - auto p_indices = reinterpret_cast(indices_buffer->mutable_data()); - for (; i < n; i++, ++p_indices, ++p_factor) { - if (*p_factor == NA_INTEGER) break; - *p_indices = *p_factor - 1; - } - - if (i < n) { - // there are NA's so we need a null buffer - std::shared_ptr null_buffer; - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_buffer)); - internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); - - // catch up - for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { - null_bitmap_writer.Set(); - } - - // resume offset filling - for (; i < n; i++, ++p_indices, ++p_factor, null_bitmap_writer.Next()) { - if (*p_factor == NA_INTEGER) { - null_bitmap_writer.Clear(); - null_count++; - } else { - null_bitmap_writer.Set(); - *p_indices = *p_factor - 1; - } - } - - null_bitmap_writer.Finish(); - buffers[0] = std::move(null_buffer); - } - - auto array_indices_data = - ArrayData::Make(std::make_shared(), n, std::move(buffers), null_count, 0); - auto array_indices = MakeArray(array_indices_data); - - std::shared_ptr out; - STOP_IF_NOT_OK(DictionaryArray::FromArrays(dict_type, array_indices, &out)); - return out; -} - -std::shared_ptr MakeFactorArray(Rcpp::IntegerVector_ factor) { - SEXP levels = factor.attr("levels"); - int n = Rf_length(levels); - if (n < 128) { - return MakeFactorArrayImpl(factor); - } else if (n < 32768) { - return MakeFactorArrayImpl(factor); - } else { - return MakeFactorArrayImpl(factor); - } -} - -template -int64_t time_cast(T value); - -template <> -inline int64_t time_cast(int value) { - return static_cast(value) * 1000; -} - -template <> -inline int64_t time_cast(double value) { - return static_cast(value * 1000); -} - -inline int64_t timestamp_cast(int value) { return static_cast(value) * 1000000; } - -template -std::shared_ptr TimeStampArray_From_POSIXct(SEXP x) { - Rcpp::Vector vec(x); - auto p_vec = vec.begin(); - auto n = vec.size(); - - std::shared_ptr values_buffer; - STOP_IF_NOT_OK(AllocateBuffer(n * sizeof(int64_t), &values_buffer)); - auto p_values = reinterpret_cast(values_buffer->mutable_data()); - - std::vector> buffers{nullptr, values_buffer}; - - int null_count = 0; - R_xlen_t i = 0; - for (; i < n; i++, ++p_vec, ++p_values) { - if (Rcpp::Vector::is_na(*p_vec)) break; - *p_values = timestamp_cast(*p_vec); - } - if (i < n) { - std::shared_ptr null_buffer; - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_buffer)); - internal::FirstTimeBitmapWriter bitmap_writer(null_buffer->mutable_data(), 0, n); - - // catch up - for (R_xlen_t j = 0; j < i; j++, bitmap_writer.Next()) { - bitmap_writer.Set(); - } - - // finish - for (; i < n; i++, ++p_vec, ++p_values, bitmap_writer.Next()) { - if (Rcpp::Vector::is_na(*p_vec)) { - bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - *p_values = timestamp_cast(*p_vec); - } - } - - bitmap_writer.Finish(); - buffers[0] = std::move(null_buffer); - } - - auto data = ArrayData::Make(std::make_shared(TimeUnit::MICRO, "GMT"), n, - std::move(buffers), null_count, 0); - - return std::make_shared(data); -} - -std::shared_ptr Int64Array(SEXP x) { - auto p_vec_start = reinterpret_cast(REAL(x)); - auto n = Rf_xlength(x); - int64_t null_count = 0; - - std::vector> buffers{nullptr, - std::make_shared>(x)}; - - auto p_vec = std::find(p_vec_start, p_vec_start + n, NA_INT64); - auto first_na = p_vec - p_vec_start; - if (first_na < n) { - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &buffers[0])); - internal::FirstTimeBitmapWriter bitmap_writer(buffers[0]->mutable_data(), 0, n); - - // first loop to clear all the bits before the first NA - int i = 0; - for (; i < first_na; i++, bitmap_writer.Next()) { - bitmap_writer.Set(); - } - - // then finish - for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { - if (*p_vec == NA_INT64) { - bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - } - } - - bitmap_writer.Finish(); - } - - auto data = ArrayData::Make( - std::make_shared(), n, std::move(buffers), null_count, 0 /*offset*/ - ); - - // return the right Array class - return std::make_shared::ArrayType>(data); -} - -inline int difftime_unit_multiplier(SEXP x) { - std::string unit(CHAR(STRING_ELT(Rf_getAttrib(x, symbols::units), 0))); - if (unit == "secs") { - return 1; - } else if (unit == "mins") { - return 60; - } else if (unit == "hours") { - return 3600; - } else if (unit == "days") { - return 86400; - } else if (unit == "weeks") { - return 604800; - } - Rcpp::stop("unknown difftime unit"); - return 0; -} - -std::shared_ptr Time32Array_From_difftime(SEXP x) { - // number of seconds as a double - auto p_vec_start = REAL(x); - auto n = Rf_xlength(x); - int64_t null_count = 0; - - int multiplier = difftime_unit_multiplier(x); - std::vector> buffers(2); - - STOP_IF_NOT_OK(AllocateBuffer(n * sizeof(int32_t), &buffers[1])); - auto p_values = reinterpret_cast(buffers[1]->mutable_data()); - - R_xlen_t i = 0; - auto p_vec = p_vec_start; - for (; i < n; i++, ++p_vec, ++p_values) { - if (NumericVector::is_na(*p_vec)) { - break; - } - *p_values = static_cast(*p_vec * multiplier); - } - - if (i < n) { - STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &buffers[0])); - internal::FirstTimeBitmapWriter bitmap_writer(buffers[0]->mutable_data(), 0, n); - - // first loop to clear all the bits before the first NA - for (R_xlen_t j = 0; j < i; j++, bitmap_writer.Next()) { - bitmap_writer.Set(); - } - - // then finish - for (; i < n; i++, bitmap_writer.Next(), ++p_vec, ++p_values) { - if (NumericVector::is_na(*p_vec)) { - bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - *p_values = static_cast(*p_vec * multiplier); - } - } - - bitmap_writer.Finish(); - } - - auto data = ArrayData::Make( - time32(TimeUnit::SECOND), n, std::move(buffers), null_count, 0 /*offset*/ - ); - - // return the right Array class - return std::make_shared(data); -} - -} // namespace r -} // namespace arrow - -// [[Rcpp::export]] -std::shared_ptr Array__from_vector(SEXP x) { - switch (TYPEOF(x)) { - case LGLSXP: - return arrow::r::MakeBooleanArray(x); - case INTSXP: - if (Rf_isFactor(x)) { - return arrow::r::MakeFactorArray(x); - } - if (Rf_inherits(x, "Date")) { - return arrow::r::SimpleArray(x); - } - if (Rf_inherits(x, "POSIXct")) { - return arrow::r::TimeStampArray_From_POSIXct(x); - } - return arrow::r::SimpleArray(x); - case REALSXP: - if (Rf_inherits(x, "Date")) { - return arrow::r::SimpleArray(x); - } - if (Rf_inherits(x, "POSIXct")) { - return arrow::r::TimeStampArray_From_POSIXct(x); - } - if (Rf_inherits(x, "integer64")) { - return arrow::r::Int64Array(x); - } - if (Rf_inherits(x, "difftime")) { - return arrow::r::Time32Array_From_difftime(x); - } - return arrow::r::SimpleArray(x); - case RAWSXP: - return arrow::r::SimpleArray(x); - case STRSXP: - return arrow::r::MakeStringArray(x); - default: - break; - } - - stop("not handled"); - return nullptr; -} - // [[Rcpp::export]] std::shared_ptr Array__Slice1(const std::shared_ptr& array, int offset) { diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp new file mode 100644 index 00000000000..d02ce643003 --- /dev/null +++ b/r/src/array_from_vector.cpp @@ -0,0 +1,1014 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow_types.h" + +using namespace Rcpp; +using namespace arrow; + +namespace arrow { +namespace r { + +std::shared_ptr MakeStringArray(StringVector_ vec) { + R_xlen_t n = vec.size(); + + std::shared_ptr null_buffer; + std::shared_ptr offset_buffer; + std::shared_ptr value_buffer; + + // there is always an offset buffer + STOP_IF_NOT_OK(AllocateBuffer((n + 1) * sizeof(int32_t), &offset_buffer)); + + R_xlen_t i = 0; + int current_offset = 0; + int64_t null_count = 0; + auto p_offset = reinterpret_cast(offset_buffer->mutable_data()); + *p_offset = 0; + for (++p_offset; i < n; i++, ++p_offset) { + SEXP s = STRING_ELT(vec, i); + if (s == NA_STRING) { + // break as we are going to need a null_bitmap buffer + break; + } + + *p_offset = current_offset += LENGTH(s); + } + + if (i < n) { + STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_buffer)); + internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); + + // catch up + for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { + null_bitmap_writer.Set(); + } + + // resume offset filling + for (; i < n; i++, ++p_offset, null_bitmap_writer.Next()) { + SEXP s = STRING_ELT(vec, i); + if (s == NA_STRING) { + null_bitmap_writer.Clear(); + *p_offset = current_offset; + null_count++; + } else { + null_bitmap_writer.Set(); + *p_offset = current_offset += LENGTH(s); + } + } + + null_bitmap_writer.Finish(); + } + + // ----- data buffer + if (current_offset > 0) { + STOP_IF_NOT_OK(AllocateBuffer(current_offset, &value_buffer)); + p_offset = reinterpret_cast(offset_buffer->mutable_data()); + auto p_data = reinterpret_cast(value_buffer->mutable_data()); + + for (R_xlen_t i = 0; i < n; i++) { + SEXP s = STRING_ELT(vec, i); + if (s != NA_STRING) { + auto ni = LENGTH(s); + std::copy_n(CHAR(s), ni, p_data); + p_data += ni; + } + } + } + + auto data = ArrayData::Make(arrow::utf8(), n, + {null_buffer, offset_buffer, value_buffer}, null_count, 0); + return MakeArray(data); +} + +template +std::shared_ptr MakeFactorArrayImpl(Rcpp::IntegerVector_ factor, const std::shared_ptr& type) { + using value_type = typename arrow::TypeTraits::ArrayType::value_type; + auto n = factor.size(); + + std::shared_ptr indices_buffer; + STOP_IF_NOT_OK(AllocateBuffer(n * sizeof(value_type), &indices_buffer)); + + std::vector> buffers{nullptr, indices_buffer}; + + int64_t null_count = 0; + R_xlen_t i = 0; + auto p_factor = factor.begin(); + auto p_indices = reinterpret_cast(indices_buffer->mutable_data()); + for (; i < n; i++, ++p_indices, ++p_factor) { + if (*p_factor == NA_INTEGER) break; + *p_indices = *p_factor - 1; + } + + if (i < n) { + // there are NA's so we need a null buffer + std::shared_ptr null_buffer; + STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_buffer)); + internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); + + // catch up + for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { + null_bitmap_writer.Set(); + } + + // resume offset filling + for (; i < n; i++, ++p_indices, ++p_factor, null_bitmap_writer.Next()) { + if (*p_factor == NA_INTEGER) { + null_bitmap_writer.Clear(); + null_count++; + } else { + null_bitmap_writer.Set(); + *p_indices = *p_factor - 1; + } + } + + null_bitmap_writer.Finish(); + buffers[0] = std::move(null_buffer); + } + + auto array_indices_data = + ArrayData::Make(std::make_shared(), n, std::move(buffers), null_count, 0); + auto array_indices = MakeArray(array_indices_data); + + std::shared_ptr out; + STOP_IF_NOT_OK(DictionaryArray::FromArrays(type, array_indices, &out)); + return out; +} + +std::shared_ptr MakeFactorArray(Rcpp::IntegerVector_ factor, const std::shared_ptr& type) { + SEXP levels = factor.attr("levels"); + int n = Rf_length(levels); + if (n < 128) { + return MakeFactorArrayImpl(factor, type); + } else if (n < 32768) { + return MakeFactorArrayImpl(factor, type); + } else { + return MakeFactorArrayImpl(factor, type); + } +} + +template +int64_t time_cast(T value); + +template <> +inline int64_t time_cast(int value) { + return static_cast(value) * 1000; +} + +template <> +inline int64_t time_cast(double value) { + return static_cast(value * 1000); +} + +} +} + +// ---------------- new api + + + + + +namespace arrow{ +using internal::checked_cast; + +namespace internal{ + +template +Status int_cast(T x, Target* out) { + if (x < std::numeric_limits::min() || x > std::numeric_limits::max()) { + return Status::Invalid("Value is too large to fit in C integer type"); + } + *out = static_cast(x); + return Status::OK(); +} + +template +Status double_cast(Int x, double* out) { + *out = static_cast(x); + return Status::OK(); +} + +template <> +Status double_cast(int64_t x, double* out) { + constexpr int64_t kDoubleMax = 1LL << 53; + constexpr int64_t kDoubleMin = -(1LL << 53); + + if (x < kDoubleMin || x > kDoubleMax) { + return Status::Invalid("integer value ", x, " is outside of the range exactly", + " representable by a IEEE 754 double precision value"); + } + *out = static_cast(x); + return Status::OK(); +} + +// used for int and int64_t +template +Status float_cast(T x, float* out) { + constexpr int64_t kHalfFloatMax = 1LL << 24; + constexpr int64_t kHalfFloatMin = -(1LL << 24); + + int64_t x64 = static_cast(x); + if (x64 < kHalfFloatMin || x64 > kHalfFloatMax) { + return Status::Invalid("integer value ", x, " is outside of the range exactly", + " representable by a IEEE 754 half precision value"); + } + + *out = static_cast(x); + return Status::OK(); +} + +template <> +Status float_cast(double x, float* out) { + // TODO: is there some sort of floating point overflow ? + *out = static_cast(x); + return Status::OK(); +} + +} + +namespace r{ + +class VectorConverter; + +Status GetConverter(const std::shared_ptr& type, std::unique_ptr* out); + +class VectorConverter { +public: + virtual ~VectorConverter() = default; + + virtual Status Init(ArrayBuilder* builder) = 0; + + virtual Status Ingest(SEXP obj) = 0; + + virtual Status GetResult(std::shared_ptr* result) { + RETURN_NOT_OK(builder_->Finish(result)); + return Status::OK(); + } + + ArrayBuilder* builder() const { return builder_; } + +protected: + ArrayBuilder* builder_; +}; + +template +struct Unbox {}; + +// unboxer for int type +template +struct Unbox> { + using BuilderType = typename TypeTraits::BuilderType; + using ArrayType = typename TypeTraits::ArrayType; + using CType = typename ArrayType::value_type; + + static inline Status Ingest(BuilderType* builder, SEXP obj) { + switch(TYPEOF(obj)) { + case INTSXP: + return IngestRange(builder, INTEGER(obj), XLENGTH(obj), NA_INTEGER); + case REALSXP: + if (Rf_inherits(obj, "integer64")) { + return IngestRange(builder, reinterpret_cast(REAL(obj)), XLENGTH(obj), NA_INT64); + } + // TODO: handle aw and logical + default: + break; + } + + return Status::Invalid( + tfm::format("Cannot convert R vector of type %s to integer Arrow array", type2name(obj)) + ); + + } + + template + static inline Status IngestRange(BuilderType* builder, T* p, R_xlen_t n, T na) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + CType value; + RETURN_NOT_OK(internal::int_cast(*p, &value)); + builder->UnsafeAppend(value); + } + } + return Status::OK(); + } +}; + +template<> +struct Unbox { + + static inline Status Ingest(DoubleBuilder* builder, SEXP obj) { + switch(TYPEOF(obj)) { + // TODO: handle RAW + case INTSXP: + return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj), NA_INTEGER); + case REALSXP: + if(Rf_inherits(obj, "integer64")) { + return IngestIntRange(builder, reinterpret_cast(REAL(obj)), XLENGTH(obj), NA_INT64); + } + return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); + } + return Status::Invalid("Cannot convert R object to double type"); + } + + template + static inline Status IngestIntRange(DoubleBuilder* builder, T* p, R_xlen_t n, T na) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + double value; + RETURN_NOT_OK(internal::double_cast(*p, &value)); + builder->UnsafeAppend(value); + } + } + return Status::OK(); + } + + static inline Status IngestDoubleRange(DoubleBuilder* builder, double* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(*p); + } + } + return Status::OK(); + } + +}; + +template<> +struct Unbox { + + static inline Status Ingest(FloatBuilder* builder, SEXP obj) { + switch(TYPEOF(obj)) { + // TODO: handle RAW + case INTSXP: + return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj), NA_INTEGER); + case REALSXP: + if(Rf_inherits(obj, "integer64")) { + return IngestIntRange(builder, reinterpret_cast(REAL(obj)), XLENGTH(obj), NA_INT64); + } + return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); + } + return Status::Invalid("Cannot convert R object to double type"); + } + + template + static inline Status IngestIntRange(FloatBuilder* builder, T* p, R_xlen_t n, T na) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + float value; + RETURN_NOT_OK(internal::float_cast(*p, &value)); + builder->UnsafeAppend(value); + } + } + return Status::OK(); + } + + static inline Status IngestDoubleRange(FloatBuilder* builder, double* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + float value; + RETURN_NOT_OK(internal::float_cast(*p, &value)); + builder->UnsafeAppend(value); + } + } + return Status::OK(); + } + +}; + +template <> +struct Unbox { + + static inline Status Ingest(BooleanBuilder* builder, SEXP obj) { + switch(TYPEOF(obj)) { + case LGLSXP: + { + R_xlen_t n = XLENGTH(obj); + RETURN_NOT_OK(builder->Resize(n)); + int* p = LOGICAL(obj); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(*p == 1); + } + } + return Status::OK(); + } + + default: break; + } + + // TODO: include more information about the R object and the target type + return Status::Invalid("Cannot convert R object to boolean type"); + } + +}; + +template <> +struct Unbox { + + static inline Status Ingest(Date32Builder* builder, SEXP obj) { + switch(TYPEOF(obj)) { + case INTSXP: + if (Rf_inherits(obj, "Date")) { + return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj)); + } + break; + case REALSXP: + if (Rf_inherits(obj, "Date")) { + return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); + } + break; + default: + break; + } + return Status::Invalid("Cannot convert R object to date32 type"); + } + + static inline Status IngestIntRange(Date32Builder* builder, int* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(*p); + } + } + return Status::OK(); + } + + static inline Status IngestDoubleRange(Date32Builder* builder, double* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(static_cast(*p)); + } + } + return Status::OK(); + } + +}; + +template <> +struct Unbox { + + constexpr static int64_t kMillisecondsPerDay = 86400000; + + static inline Status Ingest(Date64Builder* builder, SEXP obj) { + switch(TYPEOF(obj)) { + case INTSXP: + // number of days since epoch + if (Rf_inherits(obj, "Date")) { + return IngestDateInt32Range(builder, INTEGER(obj), XLENGTH(obj)); + } + break; + + case REALSXP: + // (fractional number of days since epoch) + if (Rf_inherits(obj, "Date")) { + return IngestDateDoubleRange(builder, REAL(obj), XLENGTH(obj)); + } + + // number of seconds since epoch + if (Rf_inherits(obj, "POSIXct")) { + return IngestDateDoubleRange<1000>(builder, REAL(obj), XLENGTH(obj)); + } + } + return Status::Invalid("Cannot convert R object to date64 type"); + } + + // ingest a integer vector that represents number of days since epoch + static inline Status IngestDateInt32Range(Date64Builder* builder, int* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(*p * kMillisecondsPerDay); + } + } + return Status::OK(); + } + + // ingest a numeric vector that represents (fractional) number of days since epoch + template + static inline Status IngestDateDoubleRange(Date64Builder* builder, double* p, R_xlen_t n) { + RETURN_NOT_OK(builder->Resize(n)); + + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + builder->UnsafeAppend(static_cast(*p * MULTIPLIER)); + } + } + return Status::OK(); + } + +}; + +template +class TypedVectorConverter : public VectorConverter { +public: + using BuilderType = typename TypeTraits::BuilderType; + + Status Init(ArrayBuilder* builder) override { + builder_ = builder; + typed_builder_ = checked_cast(builder_); + return Status::OK(); + } + + Status Ingest(SEXP obj) override { + return Unbox::Ingest(typed_builder_, obj); + } + +protected: + BuilderType* typed_builder_; +}; + +template +class NumericVectorConverter : public TypedVectorConverter>{}; + +class BooleanVectorConverter : public TypedVectorConverter{}; + +class Date32Converter : public TypedVectorConverter {}; +class Date64Converter : public TypedVectorConverter {}; + +inline int64_t get_time_multiplier(TimeUnit::type unit){ + switch(unit){ + case TimeUnit::SECOND: return 1; + case TimeUnit::MILLI: return 1000; + case TimeUnit::MICRO: return 1000000; + case TimeUnit::NANO: return 1000000000; + } +} + +template +class TimeConverter : public VectorConverter { + using BuilderType = typename TypeTraits::BuilderType; + +public: + TimeConverter(TimeUnit::type unit) : unit_(unit), multiplier_(get_time_multiplier(unit)){} + + Status Init(ArrayBuilder* builder) override { + builder_ = builder; + typed_builder_ = checked_cast(builder); + return Status::OK(); + } + + Status Ingest(SEXP obj) override { + + if(valid_R_object(obj)) { + int difftime_multiplier; + RETURN_NOT_OK(GetDifftimeMultiplier(obj, &difftime_multiplier)); + return Ingest_POSIXct(REAL(obj), XLENGTH(obj), difftime_multiplier); + } + + return Status::Invalid("Cannot convert R object to timestamp type"); + } + +protected: + TimeUnit::type unit_; + BuilderType* typed_builder_; + int64_t multiplier_; + + Status Ingest_POSIXct(double* p, R_xlen_t n, int difftime_multiplier) { + RETURN_NOT_OK(typed_builder_->Resize(n)); + + for (R_xlen_t i=0; iUnsafeAppendNull(); + } else { + typed_builder_->UnsafeAppend(static_cast(*p * multiplier_ * difftime_multiplier)); + } + } + return Status::OK(); + } + + virtual bool valid_R_object(SEXP obj) = 0 ; + + // only used for Time32 and Time64 + virtual Status GetDifftimeMultiplier(SEXP obj, int* res) { + std::string unit(CHAR(STRING_ELT(Rf_getAttrib(obj, symbols::units), 0))); + if (unit == "secs") { + *res = 1; + } else if (unit == "mins") { + *res = 60; + } else if (unit == "hours") { + *res = 3600; + } else if (unit == "days") { + *res = 86400; + } else if (unit == "weeks") { + *res = 604800; + } else { + return Status::Invalid("unknown difftime unit"); + } + return Status::OK(); + } +}; + +class TimestampConverter : public TimeConverter { +public: + TimestampConverter(TimeUnit::type unit) : TimeConverter(unit){} + +protected: + virtual bool valid_R_object(SEXP obj) override { + return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "POSIXct"); + } + + virtual Status GetDifftimeMultiplier(SEXP obj, int* res) override { + *res = 1; + return Status::OK(); + } + +}; + +class Time32Converter : public TimeConverter { +public: + Time32Converter(TimeUnit::type unit) : TimeConverter(unit){} + +protected: + virtual bool valid_R_object(SEXP obj) override { + return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); + } +}; + +class Time64Converter : public TimeConverter { +public: + Time64Converter(TimeUnit::type unit) : TimeConverter(unit){} + +protected: + virtual bool valid_R_object(SEXP obj) override { + return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); + } + +}; + + +#define NUMERIC_CONVERTER(TYPE_ENUM, TYPE) \ +case Type::TYPE_ENUM: \ + *out = std::unique_ptr>(new NumericVectorConverter); \ + return Status::OK() + +#define SIMPLE_CONVERTER_CASE(TYPE_ENUM, TYPE) \ +case Type::TYPE_ENUM: \ + *out = std::unique_ptr(new TYPE); \ + return Status::OK() + +#define TIME_CONVERTER_CASE(TYPE_ENUM, DATA_TYPE, TYPE) \ +case Type::TYPE_ENUM: \ + *out = std::unique_ptr(new TYPE(checked_cast(type.get())->unit())); \ + return Status::OK() + +Status GetConverter(const std::shared_ptr& type, std::unique_ptr* out) { + + switch(type->id()){ + SIMPLE_CONVERTER_CASE(BOOL, BooleanVectorConverter); + NUMERIC_CONVERTER(INT8 , Int8Type); + NUMERIC_CONVERTER(INT16 , Int16Type); + NUMERIC_CONVERTER(INT32 , Int32Type); + NUMERIC_CONVERTER(INT64 , Int64Type); + NUMERIC_CONVERTER(UINT8 , UInt8Type); + NUMERIC_CONVERTER(UINT16, UInt16Type); + NUMERIC_CONVERTER(UINT32, UInt32Type); + NUMERIC_CONVERTER(UINT64, UInt64Type); + + // TODO: not sure how to handle half floats + // the python code uses npy_half + // NUMERIC_CONVERTER(HALF_FLOAT, HalfFloatType); + NUMERIC_CONVERTER(FLOAT, FloatType); + NUMERIC_CONVERTER(DOUBLE, DoubleType); + + SIMPLE_CONVERTER_CASE(DATE32, Date32Converter); + SIMPLE_CONVERTER_CASE(DATE64, Date64Converter); + + // TODO: probably after we merge ARROW-3628 + // case Type::DECIMAL: + + case Type::DICTIONARY: + + TIME_CONVERTER_CASE(TIME32, Time32Type, Time32Converter); + TIME_CONVERTER_CASE(TIME64, Time64Type, Time64Converter); + TIME_CONVERTER_CASE(TIMESTAMP, TimestampType, TimestampConverter); + + default: + break; + } + return Status::NotImplemented("type not implemented"); +} + + +template +std::shared_ptr GetFactorTypeImpl(Rcpp::IntegerVector_ factor) { + auto dict_values = MakeStringArray(Rf_getAttrib(factor, R_LevelsSymbol)); + auto dict_type = + dictionary(std::make_shared(), dict_values, Rf_inherits(factor, "ordered")); + return dict_type; +} + +std::shared_ptr GetFactorType(SEXP factor) { + SEXP levels = Rf_getAttrib(factor, R_LevelsSymbol); + int n = Rf_length(levels); + if (n < 128) { + return GetFactorTypeImpl(factor); + } else if (n < 32768) { + return GetFactorTypeImpl(factor); + } else { + return GetFactorTypeImpl(factor); + } +} + +std::shared_ptr InferType(SEXP x) { + switch (TYPEOF(x)) { + case LGLSXP: + return boolean(); + case INTSXP: + if (Rf_isFactor(x)) { + return GetFactorType(x); + } + if (Rf_inherits(x, "Date")) { + return date32(); + } + if (Rf_inherits(x, "POSIXct")) { + return timestamp(TimeUnit::MICRO, "GMT"); + } + return int32(); + case REALSXP: + if (Rf_inherits(x, "Date")) { + return date32(); + } + if (Rf_inherits(x, "POSIXct")) { + return timestamp(TimeUnit::MICRO, "GMT"); + } + if (Rf_inherits(x, "integer64")) { + return int64(); + } + if (Rf_inherits(x, "difftime")) { + return time32(TimeUnit::SECOND); + } + return float64(); + case RAWSXP: + return int8(); + case STRSXP: + return utf8(); + default: + break; + } + + Rcpp::stop("cannot infer type from data"); +} + +// in some situations we can just use the memory of the R object in an RBuffer +// instead of going through ArrayBuilder, etc ... +bool can_reuse_memory(SEXP x, const std::shared_ptr& type) { + switch(type->id()) { + case Type::INT32: return TYPEOF(x) == INTSXP && !OBJECT(x); + case Type::DOUBLE: return TYPEOF(x) == REALSXP && !OBJECT(x); + case Type::INT8: return TYPEOF(x) == RAWSXP && !OBJECT(x); + case Type::INT64: return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64"); + default: + break; + } + return false; +} + +template +inline bool is_na(T value) { + return false; +} + +template <> +inline bool is_na(int64_t value){ + return value == NA_INT64; +} + +template <> +inline bool is_na(double value){ + return ISNA(value); +} + +template <> +inline bool is_na(int value){ + return value == NA_INTEGER; +} +// this is only used on some special cases when the arrow Array can just use the memory of the R +// object, via an RBuffer, hence be zero copy +template +std::shared_ptr MakeSimpleArray(SEXP x) { + using value_type = typename arrow::TypeTraits::ArrayType::value_type; + Rcpp::Vector vec(x); + auto n = vec.size(); + auto p_vec_start = reinterpret_cast(vec.begin()); + auto p_vec_end = p_vec_start + n; + std::vector> buffers{nullptr, + std::make_shared>(vec)}; + + int null_count = 0; + std::shared_ptr null_bitmap; + + auto first_na = std::find_if(p_vec_start, p_vec_end, is_na); + if (first_na < p_vec_end) { + STOP_IF_NOT_OK(AllocateBuffer(BitUtil::BytesForBits(n), &null_bitmap)); + internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); + + // first loop to clear all the bits before the first NA + auto j = std::distance(p_vec_start, first_na); + int i = 0; + for (; i < j; i++, bitmap_writer.Next()) { + bitmap_writer.Set(); + } + + auto p_vec = first_na; + // then finish + for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { + if (is_na(*p_vec)) { + bitmap_writer.Clear(); + null_count++; + } else { + bitmap_writer.Set(); + } + } + + bitmap_writer.Finish(); + buffers[0] = std::move(null_bitmap); + } + + auto data = ArrayData::Make( + std::make_shared(), LENGTH(x), std::move(buffers), null_count, 0 /*offset*/ + ); + + // return the right Array class + return std::make_shared::ArrayType>(data); +} + +std::shared_ptr Array__from_vector_reuse_memory(SEXP x) { + switch(TYPEOF(x)) { + case INTSXP: + return MakeSimpleArray(x); + case REALSXP: + if (Rf_inherits(x, "integer64")) { + return MakeSimpleArray(x); + } + return MakeSimpleArray(x); + case RAWSXP: + return MakeSimpleArray(x); + default: + break; + } + + Rcpp::stop("not implemented"); +} + +bool CheckCompatibleFactor(SEXP obj, const std::shared_ptr& type) { + if (!Rf_inherits(obj, "factor")) return false; + + arrow::DictionaryType* dict_type = arrow::checked_cast(type.get()); + auto dictionary = dict_type->dictionary(); + if(dictionary->type() != utf8()) return false; + + // then compare levels + auto typed_dict = checked_cast(dictionary.get()); + SEXP levels = Rf_getAttrib(obj, R_LevelsSymbol); + + R_xlen_t n = XLENGTH(levels); + if (n != typed_dict->length()) return false; + + for( R_xlen_t i=0; iGetString(i) != CHAR(STRING_ELT(levels, i))) return false; + } + + return true; +} + +std::shared_ptr Array__from_vector(SEXP x, const std::shared_ptr& type, bool type_infered){ + // special case when we can just use the data from the R vector + // directly. This still needs to handle the null bitmap + if (arrow::r::can_reuse_memory(x, type)) { + return arrow::r::Array__from_vector_reuse_memory(x); + } + + // treat strings separately for now + if (type->id() == Type::STRING) { + STOP_IF_NOT(TYPEOF(x) == STRSXP, "Cannot convert R object to string array"); + return arrow::r::MakeStringArray(x); + } + + // factors only when type has been infered + if (type->id() == Type::DICTIONARY) { + if (type_infered || arrow::r::CheckCompatibleFactor(x, type)) { + return arrow::r::MakeFactorArray(x, type); + } + + stop("Object incompatible with dictionary type"); + } + + // general conversion with converter and builder + std::unique_ptr converter; + STOP_IF_NOT_OK(arrow::r::GetConverter(type, &converter)); + + // Create ArrayBuilder for type + std::unique_ptr type_builder; + STOP_IF_NOT_OK(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder)); + STOP_IF_NOT_OK(converter->Init(type_builder.get())); + + // ingest R data and grab the result array + STOP_IF_NOT_OK(converter->Ingest(x)); + std::shared_ptr result; + STOP_IF_NOT_OK(converter->GetResult(&result)); + + return result; +} + +} // namespace r +} // namespace arrow + +// [[Rcpp::export]] +std::shared_ptr Array__infer_type(SEXP x) { + return arrow::r::InferType(x); +} + +// [[Rcpp::export]] +std::shared_ptr Array__from_vector(SEXP x, SEXP s_type) { + // the type might be NULL, in which case we need to infer it from the data + // we keep track of whether it was infered or supplied + bool type_infered = Rf_isNull(s_type); + std::shared_ptr type; + if (type_infered) { + type = arrow::r::InferType(x); + } else { + type = arrow::r::extract(s_type); + } + + return arrow::r::Array__from_vector(x, type, type_infered); +} + +// [[Rcpp::export]] +std::shared_ptr ChunkedArray__from_list(List chunks, SEXP s_type) { + std::vector> vec; + + // the type might be NULL, in which case we need to infer it from the data + // we keep track of whether it was infered or supplied + bool type_infered = Rf_isNull(s_type); + R_xlen_t n = XLENGTH(chunks); + + std::shared_ptr type; + if (type_infered) { + if (n == 0) { + stop("type must be specified for empty list"); + } + type = arrow::r::InferType(VECTOR_ELT(chunks, 0)); + } else { + type = arrow::r::extract(s_type); + } + + if (n == 0) { + std::shared_ptr array; + std::unique_ptr type_builder; + STOP_IF_NOT_OK(arrow::MakeBuilder(arrow::default_memory_pool(), type, &type_builder)); + STOP_IF_NOT_OK(type_builder->Finish(&array)); + vec.push_back(array); + } else { + // the first - might differ from the rest of the loop + // because we might have infered the type from the first element of the list + // + // this only really matters for dictionary arrays + vec.push_back(arrow::r::Array__from_vector(VECTOR_ELT(chunks, 0), type, type_infered)); + + for (R_xlen_t i=1; i(std::move(vec)); +} diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 4843f95ace3..454c42a647d 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -32,9 +32,9 @@ #include #include -#define STOP_IF_NOT(TEST, MSG) \ - do { \ - if (!TEST) Rcpp::stop(MSG); \ +#define STOP_IF_NOT(TEST, MSG) \ + do { \ + if (!(TEST)) Rcpp::stop(MSG); \ } while (0) #define STOP_IF_NOT_OK(s) STOP_IF_NOT(s.ok(), s.ToString()) @@ -174,8 +174,9 @@ inline constexpr Rbyte default_value() { SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); SEXP Array__as_vector(const std::shared_ptr& array); -std::shared_ptr Array__from_vector(SEXP x); +std::shared_ptr Array__from_vector(SEXP x, SEXP type); std::shared_ptr RecordBatch__from_dataframe(Rcpp::DataFrame tbl); +std::shared_ptr Array__infer_type(SEXP x); namespace arrow { namespace r { @@ -198,5 +199,10 @@ class RBuffer : public MutableBuffer { Vec vec_; }; +template +inline std::shared_ptr extract(SEXP x) { + return Rcpp::ConstReferenceSmartPtrInputParameter>(x); +} + } // namespace r } // namespace arrow diff --git a/r/src/chunkedarray.cpp b/r/src/chunkedarray.cpp index 06b32d3454c..6d5126755be 100644 --- a/r/src/chunkedarray.cpp +++ b/r/src/chunkedarray.cpp @@ -63,12 +63,3 @@ std::shared_ptr ChunkArray__Slice2( const std::shared_ptr& chunked_array, int offset, int length) { return chunked_array->Slice(offset, length); } - -// [[Rcpp::export]] -std::shared_ptr ChunkedArray__from_list(List chunks) { - std::vector> vec; - for (SEXP chunk : chunks) { - vec.push_back(Array__from_vector(chunk)); - } - return std::make_shared(std::move(vec)); -} diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index b776d2ae575..1889143397e 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -65,7 +65,8 @@ std::shared_ptr RecordBatch__from_dataframe(DataFrame tbl) { std::vector> arrays; for (int i = 0; i < tbl.size(); i++) { - arrays.push_back(Array__from_vector(tbl[i])); + SEXP x = tbl[i]; + arrays.push_back(Array__from_vector(x, R_NilValue)); fields.push_back( std::make_shared(std::string(names[i]), arrays[i]->type())); } diff --git a/r/tests/testthat/test-Array.R b/r/tests/testthat/test-Array.R index e456fe88654..3a9d079df73 100644 --- a/r/tests/testthat/test-Array.R +++ b/r/tests/testthat/test-Array.R @@ -18,7 +18,7 @@ context("arrow::Array") test_that("Array", { - x <- array(1:10, 1:10, 1:5) + x <- array(c(1:10, 1:10, 1:5)) expect_equal(x$type, int32()) expect_equal(x$length(), 25L) expect_equal(x$as_vector(), c(1:10, 1:10, 1:5)) @@ -35,7 +35,7 @@ test_that("Array", { expect_equal(z$as_vector(), c(1:5)) expect_true(x$RangeEquals(z, 10, 15, 0)) - x_dbl <- array(c(1,2,3), c(4,5,6)) + x_dbl <- array(c(1,2,3,4,5,6)) expect_equal(x_dbl$type, float64()) expect_equal(x_dbl$length(), 6L) expect_equal(x_dbl$as_vector(), as.numeric(1:6)) @@ -152,8 +152,7 @@ test_that("Array supports unordered factors (ARROW-3355)", { # with NA f <- factor(c("itsy", "bitsy", NA, "spider", "spider")) - # TODO: rm the suppressWarnings when https://github.com/r-lib/vctrs/issues/109 - arr_fac <- suppressWarnings(array(f)) + arr_fac <- array(f) expect_equal(arr_fac$length(), 5L) expect_equal(arr_fac$type$index_type, int8()) expect_identical(arr_fac$as_vector(), f) @@ -188,8 +187,7 @@ test_that("Array supports ordered factors (ARROW-3355)", { # with NA f <- ordered(c("itsy", "bitsy", NA, "spider", "spider")) - # TODO: rm the suppressWarnings when https://github.com/r-lib/vctrs/issues/109 - arr_fac <- suppressWarnings(array(f)) + arr_fac <- array(f) expect_equal(arr_fac$length(), 5L) expect_equal(arr_fac$type$index_type, int8()) expect_identical(arr_fac$as_vector(), f) @@ -267,12 +265,12 @@ test_that("array$as_vector() correctly handles all NA inte64 (ARROW-3795)", { test_that("array supports difftime", { time <- hms::hms(56, 34, 12) - a <- array(time, time) + a <- array(c(time, time)) expect_equal(a$type, time32(unit = TimeUnit$SECOND)) expect_equal(a$length(), 2L) expect_equal(a$as_vector(), c(time, time)) - a <- array(time, NA) + a <- array(vctrs::vec_c(time, NA)) expect_equal(a$type, time32(unit = TimeUnit$SECOND)) expect_equal(a$length(), 2L) expect_true(a$IsNull(1)) @@ -287,14 +285,8 @@ test_that("support for NaN (ARROW-3615)", { expect_equal(y$null_count, 1L) }) -test_that("array ignores the type argument (ARROW-3784)", { - a <- expect_warning(array(1:10, type = int16())) - b <- array(1:10) - expect_equal(a, b) -}) - test_that("integer types casts (ARROW-3741)", { - a <- array(1:10, NA) + a <- array(c(1:10, NA)) a_int8 <- a$cast(int8()) a_int16 <- a$cast(int16()) a_int32 <- a$cast(int32()) @@ -361,3 +353,49 @@ test_that("cast to half float works", { a_f16 <- a$cast(float16()) expect_equal(a_16$type, float16()) }) + +test_that("array() supports the type= argument. conversion from INTSXP and int64 to all int types", { + num_int32 <- 12L + num_int64 <- bit64::as.integer64(10) + + types <- list( + int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64(), + float32(), float64() + ) + for(type in types) { + expect_equal(array(num_int32, type = type)$type, type) + expect_equal(array(num_int64, type = type)$type, type) + } +}) + +test_that("array() aborts on overflow", { + expect_error(array(128L, type = int8())$type, "Invalid.*downsize") + expect_error(array(-129L, type = int8())$type, "Invalid.*downsize") + + expect_error(array(256L, type = uint8())$type, "Invalid.*downsize") + expect_error(array(-1L, type = uint8())$type, "Invalid.*downsize") + + expect_error(array(32768L, type = int16())$type, "Invalid.*downsize") + expect_error(array(-32769L, type = int16())$type, "Invalid.*downsize") + + expect_error(array(65536L, type = uint16())$type, "Invalid.*downsize") + expect_error(array(-1L, type = uint16())$type, "Invalid.*downsize") + + expect_error(array(65536L, type = uint16())$type, "Invalid.*downsize") + expect_error(array(-1L, type = uint16())$type, "Invalid.*downsize") + + expect_error(array(bit64::as.integer64(2^31), type = int32()), "Invalid.*downsize") + expect_error(array(bit64::as.integer64(2^32), type = uint32()), "Invalid.*downsize") +}) + +test_that("array() does not convert doubles to integer", { + types <- list( + int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64() + ) + for(type in types) { + expect_error(array(10, type = type)$type, "Cannot convert.*REALSXP") + } +}) + diff --git a/r/tests/testthat/test-chunkedarray.R b/r/tests/testthat/test-chunkedarray.R index 11a196d039d..5e22e5cf31e 100644 --- a/r/tests/testthat/test-chunkedarray.R +++ b/r/tests/testthat/test-chunkedarray.R @@ -160,12 +160,6 @@ test_that("ChunkedArray supports difftime", { expect_equal(a$as_vector(), c(time, time)) }) -test_that("chunked_array ignores the type argument (ARROW-3784)", { - a <- expect_warning(chunked_array(1:10, type = int16())) - b <- chunked_array(1:10) - expect_equal(a, b) -}) - test_that("integer types casts for ChunkedArray (ARROW-3741)", { a <- chunked_array(1:10, 1:10) a_int8 <- a$cast(int8()) @@ -197,3 +191,77 @@ test_that("integer types casts for ChunkedArray (ARROW-3741)", { expect_equal(a_uint32$type, uint32()) expect_equal(a_uint64$type, uint64()) }) + +test_that("chunked_array() supports the type= argument. conversion from INTSXP and int64 to all int types", { + num_int32 <- 12L + num_int64 <- bit64::as.integer64(10) + + types <- list( + int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64(), + float32(), float64() + ) + for(type in types) { + expect_equal(chunked_array(num_int32, type = type)$type, type) + expect_equal(chunked_array(num_int64, type = type)$type, type) + } +}) + +test_that("array() aborts on overflow", { + expect_error(chunked_array(128L, type = int8())$type, "Invalid.*downsize") + expect_error(chunked_array(-129L, type = int8())$type, "Invalid.*downsize") + + expect_error(chunked_array(256L, type = uint8())$type, "Invalid.*downsize") + expect_error(chunked_array(-1L, type = uint8())$type, "Invalid.*downsize") + + expect_error(chunked_array(32768L, type = int16())$type, "Invalid.*downsize") + expect_error(chunked_array(-32769L, type = int16())$type, "Invalid.*downsize") + + expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*downsize") + expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*downsize") + + expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*downsize") + expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*downsize") + + expect_error(chunked_array(bit64::as.integer64(2^31), type = int32()), "Invalid.*downsize") + expect_error(chunked_array(bit64::as.integer64(2^32), type = uint32()), "Invalid.*downsize") +}) + +test_that("chunked_array() does not convert doubles to integer", { + types <- list( + int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64() + ) + for(type in types) { + expect_error(chunked_array(10, type = type)$type, "Cannot convert.*REALSXP") + } +}) + +test_that("chunked_array() uses the first ... to infer type", { + a <- chunked_array(10, 10L) + expect_equal(a$type, float64()) +}) + +test_that("chunked_array() fails if need downcast", { + expect_error(chunked_array(10L, 10)) +}) + +test_that("chunked_array() makes chunks of the same type", { + a <- chunked_array(10L, bit64::as.integer64(13), type = int64()) + for(chunk in a$chunks) { + expect_equal(chunk$type, int64()) + } +}) + +test_that("chunked_array() handles 0 chunks if given a type", { + types <- list( + int8(), int16(), int32(), int64(), + uint8(), uint16(), uint32(), uint64(), + float32(), float64() + ) + for(type in types) { + a <- chunked_array(type = type) + expect_equal(a$type, type) + expect_equal(a$length(), 0L) + } +}) diff --git a/r/tests/testthat/test-type.R b/r/tests/testthat/test-type.R new file mode 100644 index 00000000000..a319033a853 --- /dev/null +++ b/r/tests/testthat/test-type.R @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +context("test-type") + +test_that("type() gets the right type for arrow::Array", { + a <- array(1:10) + expect_equal(type(a), a$type) +}) + +test_that("type() gets the right type for arrow::ChunkedArray", { + a <- chunked_array(1:10, 1:10) + expect_equal(type(a), a$type) +}) + +test_that("type() infers from R type", { + expect_equal(type(1:10), int32()) + expect_equal(type(1), float64()) + expect_equal(type(TRUE), boolean()) + expect_equal(type(raw()), int8()) + expect_equal(type(""), utf8()) + expect_equal( + type(iris$Species), + dictionary(int8(), array(levels(iris$Species)), FALSE) + ) + expect_equal( + type(lubridate::ymd_hms("2019-02-14 13:55:05")), + timestamp(TimeUnit$MICRO, "GMT") + ) + expect_equal( + type(hms::hms(56, 34, 12)), + time32(unit = TimeUnit$SECOND) + ) + expect_equal( + type(bit64::integer64()), + int64() + ) +})