diff --git a/cpp/src/arrow/python/python_to_arrow.cc b/cpp/src/arrow/python/python_to_arrow.cc index b136bec9709..b2d9f1cb5a3 100644 --- a/cpp/src/arrow/python/python_to_arrow.cc +++ b/cpp/src/arrow/python/python_to_arrow.cc @@ -388,36 +388,36 @@ class PyValue { } }; -template -Status Extend(T* converter, PyObject* values, int64_t size) { - /// Ensure we've allocated enough space - RETURN_NOT_OK(converter->Reserve(size)); - // Iterate over the items adding each one - return internal::VisitSequence(values, [converter](PyObject* item, bool* /* unused */) { - return converter->Append(item); - }); -} - -// Convert and append a sequence of values masked with a numpy array -template -Status ExtendMasked(T* converter, PyObject* values, PyObject* mask, int64_t size) { - /// Ensure we've allocated enough space - RETURN_NOT_OK(converter->Reserve(size)); - // Iterate over the items adding each one - return internal::VisitSequenceMasked( - values, mask, [converter](PyObject* item, bool is_masked, bool* /* unused */) { - if (is_masked) { - return converter->AppendNull(); - } else { - // This will also apply the null-checking convention in the event - // that the value is not masked - return converter->Append(item); // perhaps use AppendValue instead? - } - }); -} - // The base Converter class is a mixin with predefined behavior and constructors. -using PyConverter = Converter; +class PyConverter : public Converter { + public: + // Iterate over the input values and defer the conversion to the Append method + Status Extend(PyObject* values, int64_t size) override { + /// Ensure we've allocated enough space + RETURN_NOT_OK(this->Reserve(size)); + // Iterate over the items adding each one + return internal::VisitSequence(values, [this](PyObject* item, bool* /* unused */) { + return this->Append(item); + }); + } + + // Convert and append a sequence of values masked with a numpy array + Status ExtendMasked(PyObject* values, PyObject* mask, int64_t size) override { + /// Ensure we've allocated enough space + RETURN_NOT_OK(this->Reserve(size)); + // Iterate over the items adding each one + return internal::VisitSequenceMasked( + values, mask, [this](PyObject* item, bool is_masked, bool* /* unused */) { + if (is_masked) { + return this->AppendNull(); + } else { + // This will also apply the null-checking convention in the event + // that the value is not masked + return this->Append(item); // perhaps use AppendValue instead? + } + }); + } +}; template class PyPrimitiveConverter; @@ -669,7 +669,7 @@ class PyListConverter : public ListConverter { Status AppendSequence(PyObject* value) { int64_t size = static_cast(PySequence_Size(value)); RETURN_NOT_OK(this->list_builder_->ValidateOverflow(size)); - return Extend(this->value_converter_.get(), value, size); + return this->value_converter_->Extend(value, size); } Status AppendNdarray(PyObject* value) { @@ -684,12 +684,12 @@ class PyListConverter : public ListConverter { switch (value_type->id()) { // If the value type does not match the expected NumPy dtype, then fall through // to a slower PySequence-based path -#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ - case Type::TYPE_ID: { \ - if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ - return Extend(this->value_converter_.get(), value, size); \ - } \ - return AppendNdarrayTyped(ndarray); \ +#define LIST_FAST_CASE(TYPE_ID, TYPE, NUMPY_TYPE) \ + case Type::TYPE_ID: { \ + if (PyArray_DESCR(ndarray)->type_num != NUMPY_TYPE) { \ + return this->value_converter_->Extend(value, size); \ + } \ + return AppendNdarrayTyped(ndarray); \ } LIST_FAST_CASE(BOOL, BooleanType, NPY_BOOL) LIST_FAST_CASE(UINT8, UInt8Type, NPY_UINT8) @@ -707,7 +707,7 @@ class PyListConverter : public ListConverter { LIST_FAST_CASE(DURATION, DurationType, NPY_TIMEDELTA) #undef LIST_FAST_CASE default: { - return Extend(this->value_converter_.get(), value, size); + return this->value_converter_->Extend(value, size); } } } @@ -1041,18 +1041,18 @@ Result> ConvertPySequence(PyObject* obj, PyObject* // the overflow and automatically creates new chunks. ARROW_ASSIGN_OR_RAISE(auto chunked_converter, MakeChunker(std::move(converter))); if (mask != nullptr && mask != Py_None) { - RETURN_NOT_OK(ExtendMasked(chunked_converter.get(), seq, mask, size)); + RETURN_NOT_OK(chunked_converter->ExtendMasked(seq, mask, size)); } else { - RETURN_NOT_OK(Extend(chunked_converter.get(), seq, size)); + RETURN_NOT_OK(chunked_converter->Extend(seq, size)); } return chunked_converter->ToChunkedArray(); } else { // If the converter can't overflow spare the capacity error checking on the hot-path, // this improves the performance roughly by ~10% for primitive types. if (mask != nullptr && mask != Py_None) { - RETURN_NOT_OK(ExtendMasked(converter.get(), seq, mask, size)); + RETURN_NOT_OK(converter->ExtendMasked(seq, mask, size)); } else { - RETURN_NOT_OK(Extend(converter.get(), seq, size)); + RETURN_NOT_OK(converter->Extend(seq, size)); } return converter->ToChunkedArray(); } diff --git a/cpp/src/arrow/util/converter.h b/cpp/src/arrow/util/converter.h index e18f6e350d7..2c40a48726b 100644 --- a/cpp/src/arrow/util/converter.h +++ b/cpp/src/arrow/util/converter.h @@ -52,7 +52,15 @@ class Converter { return Init(pool); } - virtual Status Append(InputType value) = 0; + virtual Status Append(InputType value) { return Status::NotImplemented("Append"); } + + virtual Status Extend(InputType values, int64_t size) { + return Status::NotImplemented("Extend"); + } + + virtual Status ExtendMasked(InputType values, InputType mask, int64_t size) { + return Status::NotImplemented("ExtendMasked"); + } const std::shared_ptr& builder() const { return builder_; } @@ -294,6 +302,34 @@ class Chunker { return status; } + // we could get bit smarter here since the whole batch of appendable values + // will be rejected if a capacity error is raised + Status Extend(InputType values, int64_t size) { + auto status = converter_->Extend(values, size); + if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { + if (converter_->builder()->length() == 0) { + return status; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + return Extend(values, size); + } + length_ += size; + return status; + } + + Status ExtendMasked(InputType values, InputType mask, int64_t size) { + auto status = converter_->ExtendMasked(values, mask, size); + if (ARROW_PREDICT_FALSE(status.IsCapacityError())) { + if (converter_->builder()->length() == 0) { + return status; + } + ARROW_RETURN_NOT_OK(FinishChunk()); + return ExtendMasked(values, mask, size); + } + length_ += size; + return status; + } + Status FinishChunk() { ARROW_ASSIGN_OR_RAISE(auto chunk, converter_->ToArray(length_)); chunks_.push_back(chunk); diff --git a/r/R/array.R b/r/R/array.R index ec2b545dfae..090f1d53f57 100644 --- a/r/R/array.R +++ b/r/R/array.R @@ -143,7 +143,7 @@ Array$create <- function(x, type = NULL) { if (!is.null(type)) { type <- as_type(type) } - Array__from_vector(x, type) + vec_to_arrow(x, type) } #' @rdname array diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index d6a5f9356e8..643f2467303 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -132,22 +132,6 @@ LargeListArray__raw_value_offsets <- function(array){ .Call(`_arrow_LargeListArray__raw_value_offsets`, array) } -Array__infer_type <- function(x){ - .Call(`_arrow_Array__infer_type`, x) -} - -Array__from_vector <- function(x, s_type){ - .Call(`_arrow_Array__from_vector`, x, s_type) -} - -ChunkedArray__from_list <- function(chunks, s_type){ - .Call(`_arrow_ChunkedArray__from_list`, chunks, s_type) -} - -DictionaryArray__FromArrays <- function(type, indices, dict){ - .Call(`_arrow_DictionaryArray__FromArrays`, type, indices, dict) -} - Array__as_vector <- function(array){ .Call(`_arrow_Array__as_vector`, array) } @@ -260,6 +244,10 @@ ChunkedArray__ToString <- function(x){ .Call(`_arrow_ChunkedArray__ToString`, x) } +ChunkedArray__from_list <- function(chunks, s_type){ + .Call(`_arrow_ChunkedArray__from_list`, chunks, s_type) +} + util___Codec__Create <- function(codec, compression_level){ .Call(`_arrow_util___Codec__Create`, codec, compression_level) } @@ -1268,6 +1256,14 @@ ExportRecordBatch <- function(batch, array_ptr, schema_ptr){ invisible(.Call(`_arrow_ExportRecordBatch`, batch, array_ptr, schema_ptr)) } +vec_to_arrow <- function(x, s_type){ + .Call(`_arrow_vec_to_arrow`, x, s_type) +} + +DictionaryArray__FromArrays <- function(type, indices, dict){ + .Call(`_arrow_DictionaryArray__FromArrays`, type, indices, dict) +} + RecordBatch__num_columns <- function(x){ .Call(`_arrow_RecordBatch__num_columns`, x) } @@ -1588,5 +1584,9 @@ SetCpuThreadPoolCapacity <- function(threads){ invisible(.Call(`_arrow_SetCpuThreadPoolCapacity`, threads)) } +Array__infer_type <- function(x){ + .Call(`_arrow_Array__infer_type`, x) +} + diff --git a/r/src/array_from_vector.cpp b/r/src/array_from_vector.cpp deleted file mode 100644 index c2d22868535..00000000000 --- a/r/src/array_from_vector.cpp +++ /dev/null @@ -1,1599 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include - -#include "./arrow_types.h" -#include "./arrow_vctrs.h" - -#if defined(ARROW_R_WITH_ARROW) -#include -#include -#include -#include -#include - -using arrow::internal::checked_cast; - -namespace arrow { -namespace r { - -template -inline bool is_na(T value) { - return false; -} - -template <> -inline bool is_na(int64_t value) { - return value == NA_INT64; -} - -template <> -inline bool is_na(double value) { - return ISNA(value); -} - -template <> -inline bool is_na(int value) { - return value == NA_INTEGER; -} - -struct VectorToArrayConverter { - Status Visit(const arrow::NullType& type) { - auto* null_builder = checked_cast(builder); - return null_builder->AppendNulls(XLENGTH(x)); - } - - Status Visit(const arrow::BooleanType& type) { - ARROW_RETURN_IF(TYPEOF(x) != LGLSXP, Status::RError("Expecting a logical vector")); - R_xlen_t n = XLENGTH(x); - - auto* bool_builder = checked_cast(builder); - auto* p = LOGICAL(x); - - RETURN_NOT_OK(bool_builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - auto value = p[i]; - if (value == NA_LOGICAL) { - bool_builder->UnsafeAppendNull(); - } else { - bool_builder->UnsafeAppend(value == 1); - } - } - return Status::OK(); - } - - Status Visit(const arrow::Int32Type& type) { - ARROW_RETURN_IF(TYPEOF(x) != INTSXP, Status::RError("Expecting an integer vector")); - - auto* int_builder = checked_cast(builder); - - R_xlen_t n = XLENGTH(x); - const auto* data = INTEGER(x); - - RETURN_NOT_OK(int_builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - const auto value = data[i]; - if (value == NA_INTEGER) { - int_builder->UnsafeAppendNull(); - } else { - int_builder->UnsafeAppend(value); - } - } - - return Status::OK(); - } - - Status Visit(const arrow::Int64Type& type) { - ARROW_RETURN_IF(TYPEOF(x) != REALSXP, Status::RError("Expecting a numeric vector")); - ARROW_RETURN_IF(Rf_inherits(x, "integer64"), - Status::RError("Expecting a vector that inherits integer64")); - - auto* int_builder = checked_cast(builder); - - R_xlen_t n = XLENGTH(x); - const auto* data = (REAL(x)); - - RETURN_NOT_OK(int_builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - const auto value = arrow::util::SafeCopy(data[i]); - if (value == NA_INT64) { - int_builder->UnsafeAppendNull(); - } else { - int_builder->UnsafeAppend(value); - } - } - - return Status::OK(); - } - - Status Visit(const arrow::DoubleType& type) { - ARROW_RETURN_IF(TYPEOF(x) != REALSXP, Status::RError("Expecting a numeric vector")); - - auto* double_builder = checked_cast(builder); - - R_xlen_t n = XLENGTH(x); - const auto* data = (REAL(x)); - - RETURN_NOT_OK(double_builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - const auto value = data[i]; - if (ISNA(value)) { - double_builder->UnsafeAppendNull(); - } else { - double_builder->UnsafeAppend(value); - } - } - - return Status::OK(); - } - - Status Visit(const arrow::BinaryType& type) { - if (!(Rf_inherits(x, "vctrs_list_of") && - TYPEOF(Rf_getAttrib(x, symbols::ptype)) == RAWSXP)) { - return Status::RError("Expecting a list of raw vectors"); - } - return Status::OK(); - } - - Status Visit(const arrow::FixedSizeBinaryType& type) { - if (!(Rf_inherits(x, "vctrs_list_of") && - TYPEOF(Rf_getAttrib(x, symbols::ptype)) == RAWSXP)) { - return Status::RError("Expecting a list of raw vectors"); - } - - return Status::OK(); - } - - template - arrow::enable_if_base_binary Visit(const T& type) { - using BuilderType = typename TypeTraits::BuilderType; - - ARROW_RETURN_IF(TYPEOF(x) != STRSXP, Status::RError("Expecting a character vector")); - - auto* binary_builder = checked_cast(builder); - - R_xlen_t n = XLENGTH(x); - RETURN_NOT_OK(builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - SEXP si = STRING_ELT(x, i); - if (si == NA_STRING) { - RETURN_NOT_OK(binary_builder->AppendNull()); - continue; - } - std::string s = cpp11::r_string(si); - RETURN_NOT_OK(binary_builder->Append(s.c_str(), s.size())); - } - - return Status::OK(); - } - - template - arrow::enable_if_base_list Visit(const T& type) { - using BuilderType = typename TypeTraits::BuilderType; - - ARROW_RETURN_IF(TYPEOF(x) != VECSXP, Status::RError("Expecting a list vector")); - - auto* list_builder = checked_cast(builder); - auto* value_builder = list_builder->value_builder(); - auto value_type = type.value_type(); - - R_xlen_t n = XLENGTH(x); - RETURN_NOT_OK(builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - SEXP vector = VECTOR_ELT(x, i); - if (Rf_isNull(vector)) { - RETURN_NOT_OK(list_builder->AppendNull()); - continue; - } - - RETURN_NOT_OK(list_builder->Append()); - - // Recurse. - VectorToArrayConverter converter{vector, value_builder}; - Status status = arrow::VisitTypeInline(*value_type, &converter); - if (!status.ok()) { - return Status::RError("Cannot convert list element ", (i + 1), - " to an Array of type `", value_type->ToString(), - "` : ", status.message()); - } - } - - return Status::OK(); - } - - Status Visit(const FixedSizeListType& type) { - ARROW_RETURN_IF(TYPEOF(x) != VECSXP, Status::RError("Expecting a list vector")); - - auto* fixed_size_list_builder = checked_cast(builder); - auto* value_builder = fixed_size_list_builder->value_builder(); - auto value_type = type.value_type(); - int list_size = type.list_size(); - - R_xlen_t n = XLENGTH(x); - RETURN_NOT_OK(builder->Reserve(n)); - for (R_xlen_t i = 0; i < n; i++) { - SEXP vector = VECTOR_ELT(x, i); - if (Rf_isNull(vector)) { - RETURN_NOT_OK(fixed_size_list_builder->AppendNull()); - continue; - } - RETURN_NOT_OK(fixed_size_list_builder->Append()); - - auto vect_type = arrow::r::InferArrowType(vector); - if (!value_type->Equals(vect_type)) { - return Status::RError("FixedSizeList vector expecting elements vector of type ", - value_type->ToString(), " but got ", vect_type->ToString()); - } - int vector_size = vctrs::short_vec_size(vector); - if (vector_size != list_size) { - return Status::RError("FixedSizeList vector expecting elements vector of size ", - list_size, ", not ", vector_size); - } - - // Recurse. - VectorToArrayConverter converter{vector, value_builder}; - RETURN_NOT_OK(arrow::VisitTypeInline(*value_type, &converter)); - } - - return Status::OK(); - } - - template - arrow::enable_if_t::value, Status> Visit(const T& type) { - using BuilderType = typename TypeTraits::BuilderType; - ARROW_RETURN_IF(!Rf_inherits(x, "data.frame"), - Status::RError("Expecting a data frame")); - - auto* struct_builder = checked_cast(builder); - - int64_t n = vctrs::short_vec_size(x); - RETURN_NOT_OK(struct_builder->Reserve(n)); - RETURN_NOT_OK(struct_builder->AppendValues(n, NULLPTR)); - - int num_fields = struct_builder->num_fields(); - - // Visit each column of the data frame using the associated - // field builder - for (R_xlen_t i = 0; i < num_fields; i++) { - auto column_builder = struct_builder->field_builder(i); - SEXP x_i = VECTOR_ELT(x, i); - int64_t n_i = vctrs::short_vec_size(x_i); - if (n_i != n) { - SEXP name_i = STRING_ELT(Rf_getAttrib(x, R_NamesSymbol), i); - return Status::RError("Degenerated data frame. Column '", CHAR(name_i), - "' has size ", n_i, " instead of the number of rows: ", n); - } - - VectorToArrayConverter converter{x_i, column_builder}; - RETURN_NOT_OK(arrow::VisitTypeInline(*column_builder->type().get(), &converter)); - } - - return Status::OK(); - } - - template - arrow::enable_if_t::value, Status> Visit( - const T& type) { - // TODO: perhaps this replaces MakeFactorArrayImpl ? - - ARROW_RETURN_IF(!Rf_isFactor(x), Status::RError("Expecting a factor")); - int64_t n = vctrs::short_vec_size(x); - - auto* dict_builder = checked_cast(builder); - RETURN_NOT_OK(dict_builder->Reserve(n)); - - SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); - auto memo = VectorToArrayConverter::Visit(levels, utf8()); - RETURN_NOT_OK(dict_builder->InsertMemoValues(*memo)); - - int* p_values = INTEGER(x); - for (int64_t i = 0; i < n; i++, ++p_values) { - int v = *p_values; - if (v == NA_INTEGER) { - RETURN_NOT_OK(dict_builder->AppendNull()); - } else { - RETURN_NOT_OK(dict_builder->Append(CHAR(STRING_ELT(levels, v - 1)))); - } - } - - return Status::OK(); - } - - Status Visit(const arrow::DataType& type) { - return Status::NotImplemented("Converting vector to arrow type ", type.ToString(), - " not implemented"); - } - - static std::shared_ptr Visit(SEXP x, const std::shared_ptr& type) { - std::unique_ptr builder; - StopIfNotOk(MakeBuilder(gc_memory_pool(), type, &builder)); - - VectorToArrayConverter converter{x, builder.get()}; - StopIfNotOk(arrow::VisitTypeInline(*type, &converter)); - - std::shared_ptr result; - StopIfNotOk(builder->Finish(&result)); - return result; - } - - SEXP x; - arrow::ArrayBuilder* builder; -}; - -template -std::shared_ptr MakeFactorArrayImpl(cpp11::integers factor, - const std::shared_ptr& type) { - using value_type = typename arrow::TypeTraits::ArrayType::value_type; - auto n = factor.size(); - - std::shared_ptr indices_buffer = - ValueOrStop(AllocateBuffer(n * sizeof(value_type), gc_memory_pool())); - - std::vector> buffers{nullptr, indices_buffer}; - - int64_t null_count = 0; - R_xlen_t i = 0; - auto p_factor = factor.begin(); - auto p_indices = reinterpret_cast(indices_buffer->mutable_data()); - for (; i < n; i++, ++p_indices, ++p_factor) { - if (*p_factor == NA_INTEGER) break; - *p_indices = *p_factor - 1; - } - - if (i < n) { - // there are NA's so we need a null buffer - auto null_buffer = - ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); - internal::FirstTimeBitmapWriter null_bitmap_writer(null_buffer->mutable_data(), 0, n); - - // catch up - for (R_xlen_t j = 0; j < i; j++, null_bitmap_writer.Next()) { - null_bitmap_writer.Set(); - } - - // resume offset filling - for (; i < n; i++, ++p_indices, ++p_factor, null_bitmap_writer.Next()) { - if (*p_factor == NA_INTEGER) { - null_bitmap_writer.Clear(); - null_count++; - } else { - null_bitmap_writer.Set(); - *p_indices = *p_factor - 1; - } - } - - null_bitmap_writer.Finish(); - buffers[0] = std::move(null_buffer); - } - - auto array_indices_data = - ArrayData::Make(std::make_shared(), n, std::move(buffers), null_count, 0); - auto array_indices = MakeArray(array_indices_data); - - SEXP levels = Rf_getAttrib(factor, R_LevelsSymbol); - auto dict = VectorToArrayConverter::Visit(levels, utf8()); - - return ValueOrStop(DictionaryArray::FromArrays(type, array_indices, dict)); -} - -std::shared_ptr MakeFactorArray(cpp11::integers factor, - const std::shared_ptr& type) { - const auto& dict_type = checked_cast(*type); - switch (dict_type.index_type()->id()) { - case Type::INT8: - return MakeFactorArrayImpl(factor, type); - case Type::INT16: - return MakeFactorArrayImpl(factor, type); - case Type::INT32: - return MakeFactorArrayImpl(factor, type); - case Type::INT64: - return MakeFactorArrayImpl(factor, type); - default: - break; - } - - cpp11::stop("Cannot convert to dictionary with index_type '%s'", - dict_type.index_type()->ToString().c_str()); -} - -std::shared_ptr MakeStructArray(SEXP df, const std::shared_ptr& type) { - int n = type->num_fields(); - std::vector> children(n); - for (int i = 0; i < n; i++) { - children[i] = Array__from_vector(VECTOR_ELT(df, i), type->field(i)->type(), true); - } - - int64_t rows = n ? children[0]->length() : 0; - return std::make_shared(type, rows, children); -} - -template -int64_t time_cast(T value); - -template <> -inline int64_t time_cast(int value) { - return static_cast(value) * 1000; -} - -template <> -inline int64_t time_cast(double value) { - return static_cast(value * 1000); -} - -} // namespace r -} // namespace arrow - -// ---------------- new api - -namespace arrow { - -namespace internal { - -template ::value, Target>::type = 0> -Status int_cast(T x, Target* out) { - if (static_cast(x) < std::numeric_limits::min() || - static_cast(x) > std::numeric_limits::max()) { - return Status::Invalid("Value is too large to fit in C integer type"); - } - *out = static_cast(x); - return Status::OK(); -} - -template -struct usigned_type; - -template ::value, Target>::type = 0> -Status int_cast(T x, Target* out) { - // we need to compare between unsigned integers - uint64_t x64 = x; - if (x64 < 0 || x64 > std::numeric_limits::max()) { - return Status::Invalid("Value is too large to fit in C integer type"); - } - *out = static_cast(x); - return Status::OK(); -} - -template -Status double_cast(Int x, double* out) { - *out = static_cast(x); - return Status::OK(); -} - -template <> -Status double_cast(int64_t x, double* out) { - constexpr int64_t kDoubleMax = 1LL << 53; - constexpr int64_t kDoubleMin = -(1LL << 53); - - if (x < kDoubleMin || x > kDoubleMax) { - return Status::Invalid("integer value ", x, " is outside of the range exactly", - " representable by a IEEE 754 double precision value"); - } - *out = static_cast(x); - return Status::OK(); -} - -// used for int and int64_t -template -Status float_cast(T x, float* out) { - constexpr int64_t kHalfFloatMax = 1LL << 24; - constexpr int64_t kHalfFloatMin = -(1LL << 24); - - int64_t x64 = static_cast(x); - if (x64 < kHalfFloatMin || x64 > kHalfFloatMax) { - return Status::Invalid("integer value ", x, " is outside of the range exactly", - " representable by a IEEE 754 half precision value"); - } - - *out = static_cast(x); - return Status::OK(); -} - -template <> -Status float_cast(double x, float* out) { - // TODO: is there some sort of floating point overflow ? - *out = static_cast(x); - return Status::OK(); -} - -} // namespace internal - -namespace r { - -class VectorConverter; - -Status GetConverter(const std::shared_ptr& type, - std::unique_ptr* out); - -class VectorConverter { - public: - virtual ~VectorConverter() = default; - - virtual Status Init(ArrayBuilder* builder) = 0; - - virtual Status Ingest(SEXP obj) = 0; - - virtual Status GetResult(std::shared_ptr* result) { - return builder_->Finish(result); - } - - ArrayBuilder* builder() const { return builder_; } - - protected: - ArrayBuilder* builder_; -}; - -class NullVectorConverter : public VectorConverter { - public: - using BuilderType = NullBuilder; - - ~NullVectorConverter() {} - - Status Init(ArrayBuilder* builder) override { - builder_ = builder; - typed_builder_ = checked_cast(builder_); - return Status::OK(); - } - - Status Ingest(SEXP obj) override { - RETURN_NOT_OK(typed_builder_->AppendNulls(XLENGTH(obj))); - return Status::OK(); - } - - protected: - BuilderType* typed_builder_; -}; - -template -struct Unbox {}; - -// unboxer for int type -template -struct Unbox> { - using BuilderType = typename TypeTraits::BuilderType; - using ArrayType = typename TypeTraits::ArrayType; - using CType = typename ArrayType::value_type; - - static inline Status Ingest(BuilderType* builder, SEXP obj) { - switch (TYPEOF(obj)) { - case INTSXP: - return IngestRange(builder, INTEGER(obj), XLENGTH(obj)); - case REALSXP: - if (Rf_inherits(obj, "integer64")) { - return IngestRange(builder, reinterpret_cast(REAL(obj)), - XLENGTH(obj)); - } - return IngestRange(builder, REAL(obj), XLENGTH(obj)); - - // TODO: handle raw and logical - default: - break; - } - - return Status::Invalid("Cannot convert R vector of type <", Rf_type2char(TYPEOF(obj)), - "> to integer Arrow array"); - } - - template - static inline Status IngestRange(BuilderType* builder, T* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (is_na(*p)) { - builder->UnsafeAppendNull(); - } else { - CType value = 0; - RETURN_NOT_OK(internal::int_cast(*p, &value)); - builder->UnsafeAppend(value); - } - } - return Status::OK(); - } -}; - -template <> -struct Unbox { - static inline Status Ingest(DoubleBuilder* builder, SEXP obj) { - switch (TYPEOF(obj)) { - // TODO: handle RAW - case INTSXP: - return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj), NA_INTEGER); - case REALSXP: - if (Rf_inherits(obj, "integer64")) { - return IngestIntRange(builder, reinterpret_cast(REAL(obj)), - XLENGTH(obj), NA_INT64); - } - return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); - } - return Status::Invalid("Cannot convert R object to double type"); - } - - template - static inline Status IngestIntRange(DoubleBuilder* builder, T* p, R_xlen_t n, T na) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (*p == NA_INTEGER) { - builder->UnsafeAppendNull(); - } else { - double value = 0; - RETURN_NOT_OK(internal::double_cast(*p, &value)); - builder->UnsafeAppend(value); - } - } - return Status::OK(); - } - - static inline Status IngestDoubleRange(DoubleBuilder* builder, double* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (ISNA(*p)) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(*p); - } - } - return Status::OK(); - } -}; - -template <> -struct Unbox { - static inline Status Ingest(FloatBuilder* builder, SEXP obj) { - switch (TYPEOF(obj)) { - // TODO: handle RAW - case INTSXP: - return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj), NA_INTEGER); - case REALSXP: - if (Rf_inherits(obj, "integer64")) { - return IngestIntRange(builder, reinterpret_cast(REAL(obj)), - XLENGTH(obj), NA_INT64); - } - return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); - } - return Status::Invalid("Cannot convert R object to double type"); - } - - template - static inline Status IngestIntRange(FloatBuilder* builder, T* p, R_xlen_t n, T na) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (*p == NA_INTEGER) { - builder->UnsafeAppendNull(); - } else { - float value = 0; - RETURN_NOT_OK(internal::float_cast(*p, &value)); - builder->UnsafeAppend(value); - } - } - return Status::OK(); - } - - static inline Status IngestDoubleRange(FloatBuilder* builder, double* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (ISNA(*p)) { - builder->UnsafeAppendNull(); - } else { - float value; - RETURN_NOT_OK(internal::float_cast(*p, &value)); - builder->UnsafeAppend(value); - } - } - return Status::OK(); - } -}; - -template <> -struct Unbox { - static inline Status Ingest(BooleanBuilder* builder, SEXP obj) { - switch (TYPEOF(obj)) { - case LGLSXP: { - R_xlen_t n = XLENGTH(obj); - RETURN_NOT_OK(builder->Resize(n)); - int* p = LOGICAL(obj); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (*p == NA_LOGICAL) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(*p == 1); - } - } - return Status::OK(); - } - - default: - break; - } - - // TODO: include more information about the R object and the target type - return Status::Invalid("Cannot convert R object to boolean type"); - } -}; - -template <> -struct Unbox { - static inline Status Ingest(Date32Builder* builder, SEXP obj) { - switch (TYPEOF(obj)) { - case INTSXP: - if (Rf_inherits(obj, "Date")) { - return IngestIntRange(builder, INTEGER(obj), XLENGTH(obj)); - } - break; - case REALSXP: - if (Rf_inherits(obj, "Date")) { - return IngestDoubleRange(builder, REAL(obj), XLENGTH(obj)); - } - break; - default: - break; - } - return Status::Invalid("Cannot convert R object to date32 type"); - } - - static inline Status IngestIntRange(Date32Builder* builder, int* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (*p == NA_INTEGER) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(*p); - } - } - return Status::OK(); - } - - static inline Status IngestDoubleRange(Date32Builder* builder, double* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (ISNA(*p)) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(static_cast(*p)); - } - } - return Status::OK(); - } -}; - -template <> -struct Unbox { - constexpr static int64_t kMillisecondsPerDay = 86400000; - - static inline Status Ingest(Date64Builder* builder, SEXP obj) { - switch (TYPEOF(obj)) { - case INTSXP: - // number of days since epoch - if (Rf_inherits(obj, "Date")) { - return IngestDateInt32Range(builder, INTEGER(obj), XLENGTH(obj)); - } - break; - - case REALSXP: - // (fractional number of days since epoch) - if (Rf_inherits(obj, "Date")) { - return IngestDateDoubleRange(builder, REAL(obj), - XLENGTH(obj)); - } - - // number of seconds since epoch - if (Rf_inherits(obj, "POSIXct")) { - return IngestDateDoubleRange<1000>(builder, REAL(obj), XLENGTH(obj)); - } - } - return Status::Invalid("Cannot convert R object to date64 type"); - } - - // ingest a integer vector that represents number of days since epoch - static inline Status IngestDateInt32Range(Date64Builder* builder, int* p, R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (*p == NA_INTEGER) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(*p * kMillisecondsPerDay); - } - } - return Status::OK(); - } - - // ingest a numeric vector that represents (fractional) number of days since epoch - template - static inline Status IngestDateDoubleRange(Date64Builder* builder, double* p, - R_xlen_t n) { - RETURN_NOT_OK(builder->Resize(n)); - - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (ISNA(*p)) { - builder->UnsafeAppendNull(); - } else { - builder->UnsafeAppend(static_cast(*p * MULTIPLIER)); - } - } - return Status::OK(); - } -}; - -template -class TypedVectorConverter : public VectorConverter { - public: - using BuilderType = typename TypeTraits::BuilderType; - - Status Init(ArrayBuilder* builder) override { - builder_ = builder; - typed_builder_ = checked_cast(builder_); - return Status::OK(); - } - - Status Ingest(SEXP obj) override { return Unbox::Ingest(typed_builder_, obj); } - - protected: - BuilderType* typed_builder_; -}; - -template -class NumericVectorConverter - : public TypedVectorConverter> {}; - -class BooleanVectorConverter - : public TypedVectorConverter {}; - -class Date32Converter : public TypedVectorConverter {}; -class Date64Converter : public TypedVectorConverter {}; - -inline int64_t get_time_multiplier(TimeUnit::type unit) { - switch (unit) { - case TimeUnit::SECOND: - return 1; - case TimeUnit::MILLI: - return 1000; - case TimeUnit::MICRO: - return 1000000; - case TimeUnit::NANO: - return 1000000000; - default: - return 0; - } -} - -template -class TimeConverter : public VectorConverter { - using BuilderType = typename TypeTraits::BuilderType; - - public: - explicit TimeConverter(TimeUnit::type unit) - : unit_(unit), multiplier_(get_time_multiplier(unit)) {} - - Status Init(ArrayBuilder* builder) override { - builder_ = builder; - typed_builder_ = checked_cast(builder); - return Status::OK(); - } - - Status Ingest(SEXP obj) override { - if (valid_R_object(obj)) { - int difftime_multiplier; - RETURN_NOT_OK(GetDifftimeMultiplier(obj, &difftime_multiplier)); - return Ingest_POSIXct(REAL(obj), XLENGTH(obj), difftime_multiplier); - } - - return Status::Invalid("Cannot convert R object to timestamp type"); - } - - protected: - TimeUnit::type unit_; - BuilderType* typed_builder_; - int64_t multiplier_; - - Status Ingest_POSIXct(double* p, R_xlen_t n, int difftime_multiplier) { - RETURN_NOT_OK(typed_builder_->Resize(n)); - - for (R_xlen_t i = 0; i < n; i++, ++p) { - if (ISNA(*p)) { - typed_builder_->UnsafeAppendNull(); - } else { - typed_builder_->UnsafeAppend( - static_cast(*p * multiplier_ * difftime_multiplier)); - } - } - return Status::OK(); - } - - virtual bool valid_R_object(SEXP obj) = 0; - - // only used for Time32 and Time64 - virtual Status GetDifftimeMultiplier(SEXP obj, int* res) { - std::string unit(CHAR(STRING_ELT(Rf_getAttrib(obj, symbols::units), 0))); - if (unit == "secs") { - *res = 1; - } else if (unit == "mins") { - *res = 60; - } else if (unit == "hours") { - *res = 3600; - } else if (unit == "days") { - *res = 86400; - } else if (unit == "weeks") { - *res = 604800; - } else { - return Status::Invalid("unknown difftime unit"); - } - return Status::OK(); - } -}; - -class TimestampConverter : public TimeConverter { - public: - explicit TimestampConverter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - bool valid_R_object(SEXP obj) override { - return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "POSIXct"); - } - - Status GetDifftimeMultiplier(SEXP obj, int* res) override { - *res = 1; - return Status::OK(); - } -}; - -class Time32Converter : public TimeConverter { - public: - explicit Time32Converter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - bool valid_R_object(SEXP obj) override { - return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); - } -}; - -class Time64Converter : public TimeConverter { - public: - explicit Time64Converter(TimeUnit::type unit) : TimeConverter(unit) {} - - protected: - bool valid_R_object(SEXP obj) override { - return TYPEOF(obj) == REALSXP && Rf_inherits(obj, "difftime"); - } -}; - -template -class BinaryVectorConverter : public VectorConverter { - public: - ~BinaryVectorConverter() {} - - Status Init(ArrayBuilder* builder) { - typed_builder_ = checked_cast(builder); - return Status::OK(); - } - - Status Ingest(SEXP obj) { - ARROW_RETURN_IF(TYPEOF(obj) != VECSXP, Status::RError("Expecting a list")); - R_xlen_t n = XLENGTH(obj); - - // Reserve enough space before appending - int64_t size = 0; - for (R_xlen_t i = 0; i < n; i++) { - SEXP obj_i = VECTOR_ELT(obj, i); - if (!Rf_isNull(obj_i)) { - ARROW_RETURN_IF(TYPEOF(obj_i) != RAWSXP, - Status::RError("Expecting a raw vector")); - size += XLENGTH(obj_i); - } - } - RETURN_NOT_OK(typed_builder_->Reserve(size)); - - // append - for (R_xlen_t i = 0; i < n; i++) { - SEXP obj_i = VECTOR_ELT(obj, i); - if (Rf_isNull(obj_i)) { - RETURN_NOT_OK(typed_builder_->AppendNull()); - } else { - RETURN_NOT_OK(typed_builder_->Append(RAW(obj_i), XLENGTH(obj_i))); - } - } - return Status::OK(); - } - - Status GetResult(std::shared_ptr* result) { - return typed_builder_->Finish(result); - } - - private: - Builder* typed_builder_; -}; - -class FixedSizeBinaryVectorConverter : public VectorConverter { - public: - ~FixedSizeBinaryVectorConverter() {} - - Status Init(ArrayBuilder* builder) { - typed_builder_ = checked_cast(builder); - return Status::OK(); - } - - Status Ingest(SEXP obj) { - ARROW_RETURN_IF(TYPEOF(obj) != VECSXP, Status::RError("Expecting a list")); - R_xlen_t n = XLENGTH(obj); - - // Reserve enough space before appending - int32_t byte_width = typed_builder_->byte_width(); - for (R_xlen_t i = 0; i < n; i++) { - SEXP obj_i = VECTOR_ELT(obj, i); - if (!Rf_isNull(obj_i)) { - ARROW_RETURN_IF(TYPEOF(obj_i) != RAWSXP, - Status::RError("Expecting a raw vector")); - ARROW_RETURN_IF(XLENGTH(obj_i) != byte_width, - Status::RError("Expecting a raw vector of ", byte_width, - " bytes, not ", XLENGTH(obj_i))); - } - } - RETURN_NOT_OK(typed_builder_->Reserve(n * byte_width)); - - // append - for (R_xlen_t i = 0; i < n; i++) { - SEXP obj_i = VECTOR_ELT(obj, i); - if (Rf_isNull(obj_i)) { - RETURN_NOT_OK(typed_builder_->AppendNull()); - } else { - RETURN_NOT_OK(typed_builder_->Append(RAW(obj_i))); - } - } - return Status::OK(); - } - - Status GetResult(std::shared_ptr* result) { - return typed_builder_->Finish(result); - } - - private: - FixedSizeBinaryBuilder* typed_builder_; -}; - -template -class StringVectorConverter : public VectorConverter { - public: - ~StringVectorConverter() {} - - Status Init(ArrayBuilder* builder) { - typed_builder_ = checked_cast(builder); - return Status::OK(); - } - - Status Ingest(SEXP obj) { - ARROW_RETURN_IF(TYPEOF(obj) != STRSXP, - Status::RError("Expecting a character vector")); - - cpp11::strings s(arrow::r::utf8_strings(obj)); - RETURN_NOT_OK(typed_builder_->Reserve(s.size())); - - // we know all the R strings are utf8 already, so we can get - // a definite size and then use UnsafeAppend*() - int64_t total_length = 0; - for (cpp11::r_string si : s) { - total_length += cpp11::is_na(si) ? 0 : si.size(); - } - RETURN_NOT_OK(typed_builder_->ReserveData(total_length)); - - // append - for (cpp11::r_string si : s) { - if (si == NA_STRING) { - typed_builder_->UnsafeAppendNull(); - } else { - typed_builder_->UnsafeAppend(CHAR(si), si.size()); - } - } - - return Status::OK(); - } - - Status GetResult(std::shared_ptr* result) { - return typed_builder_->Finish(result); - } - - private: - StringBuilder* typed_builder_; -}; - -#define NUMERIC_CONVERTER(TYPE_ENUM, TYPE) \ - case Type::TYPE_ENUM: \ - *out = \ - std::unique_ptr>(new NumericVectorConverter); \ - return Status::OK() - -#define SIMPLE_CONVERTER_CASE(TYPE_ENUM, TYPE) \ - case Type::TYPE_ENUM: \ - *out = std::unique_ptr(new TYPE); \ - return Status::OK() - -#define TIME_CONVERTER_CASE(TYPE_ENUM, DATA_TYPE, TYPE) \ - case Type::TYPE_ENUM: \ - *out = \ - std::unique_ptr(new TYPE(checked_cast(type.get())->unit())); \ - return Status::OK() - -Status GetConverter(const std::shared_ptr& type, - std::unique_ptr* out) { - switch (type->id()) { - SIMPLE_CONVERTER_CASE(BINARY, BinaryVectorConverter); - SIMPLE_CONVERTER_CASE(LARGE_BINARY, BinaryVectorConverter); - SIMPLE_CONVERTER_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryVectorConverter); - SIMPLE_CONVERTER_CASE(BOOL, BooleanVectorConverter); - SIMPLE_CONVERTER_CASE(STRING, StringVectorConverter); - SIMPLE_CONVERTER_CASE(LARGE_STRING, StringVectorConverter); - NUMERIC_CONVERTER(INT8, Int8Type); - NUMERIC_CONVERTER(INT16, Int16Type); - NUMERIC_CONVERTER(INT32, Int32Type); - NUMERIC_CONVERTER(INT64, Int64Type); - NUMERIC_CONVERTER(UINT8, UInt8Type); - NUMERIC_CONVERTER(UINT16, UInt16Type); - NUMERIC_CONVERTER(UINT32, UInt32Type); - NUMERIC_CONVERTER(UINT64, UInt64Type); - - // TODO: not sure how to handle half floats - // the python code uses npy_half - // NUMERIC_CONVERTER(HALF_FLOAT, HalfFloatType); - NUMERIC_CONVERTER(FLOAT, FloatType); - NUMERIC_CONVERTER(DOUBLE, DoubleType); - - SIMPLE_CONVERTER_CASE(DATE32, Date32Converter); - SIMPLE_CONVERTER_CASE(DATE64, Date64Converter); - - // TODO: probably after we merge ARROW-3628 - // case Type::DECIMAL: - - TIME_CONVERTER_CASE(TIME32, Time32Type, Time32Converter); - TIME_CONVERTER_CASE(TIME64, Time64Type, Time64Converter); - TIME_CONVERTER_CASE(TIMESTAMP, TimestampType, TimestampConverter); - - case Type::NA: - *out = std::unique_ptr(new NullVectorConverter); - return Status::OK(); - - default: - break; - } - return Status::NotImplemented("type not implemented"); -} - -static inline std::shared_ptr IndexTypeForFactors(int n_factors) { - if (n_factors < INT8_MAX) { - return arrow::int8(); - } else if (n_factors < INT16_MAX) { - return arrow::int16(); - } else { - return arrow::int32(); - } -} - -std::shared_ptr InferArrowTypeFromFactor(SEXP factor) { - SEXP factors = Rf_getAttrib(factor, R_LevelsSymbol); - auto index_type = IndexTypeForFactors(Rf_length(factors)); - bool is_ordered = Rf_inherits(factor, "ordered"); - return dictionary(index_type, arrow::utf8(), is_ordered); -} - -template -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - cpp11::stop("Unknown vector type: ", VectorType); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - if (Rf_inherits(x, "Array")) { - return cpp11::as_cpp>(x)->type(); - } - - cpp11::stop("Unrecognized vector instance for type ENVSXP"); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - return Rf_inherits(x, "vctrs_unspecified") ? null() : boolean(); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - if (Rf_isFactor(x)) { - return InferArrowTypeFromFactor(x); - } else if (Rf_inherits(x, "Date")) { - return date32(); - } else if (Rf_inherits(x, "POSIXct")) { - auto tzone_sexp = Rf_getAttrib(x, symbols::tzone); - if (Rf_isNull(tzone_sexp)) { - return timestamp(TimeUnit::MICRO); - } else { - return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0))); - } - } - return int32(); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - if (Rf_inherits(x, "Date")) { - return date32(); - } - if (Rf_inherits(x, "POSIXct")) { - auto tzone_sexp = Rf_getAttrib(x, symbols::tzone); - if (Rf_isNull(tzone_sexp)) { - return timestamp(TimeUnit::MICRO); - } else { - return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0))); - } - } - if (Rf_inherits(x, "integer64")) { - return int64(); - } - if (Rf_inherits(x, "difftime")) { - return time32(TimeUnit::SECOND); - } - return float64(); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - return cpp11::unwind_protect([&] { - R_xlen_t n = XLENGTH(x); - - int64_t size = 0; - - for (R_xlen_t i = 0; i < n; i++) { - size += arrow::r::unsafe::r_string_size(STRING_ELT(x, i)); - if (size > arrow::kBinaryMemoryLimit) { - // Exceeds 2GB capacity of utf8 type, so use large - return large_utf8(); - } - } - - return utf8(); - }); -} - -static inline std::shared_ptr InferArrowTypeFromDataFrame( - cpp11::list x) { - R_xlen_t n = x.size(); - cpp11::strings names(x.attr(R_NamesSymbol)); - std::vector> fields(n); - for (R_xlen_t i = 0; i < n; i++) { - fields[i] = arrow::field(names[i], InferArrowType(x[i])); - } - return arrow::struct_(std::move(fields)); -} - -template <> -std::shared_ptr InferArrowTypeFromVector(SEXP x) { - if (Rf_inherits(x, "data.frame") || Rf_inherits(x, "POSIXlt")) { - return InferArrowTypeFromDataFrame(x); - } else { - // some known special cases - if (Rf_inherits(x, "arrow_fixed_size_binary")) { - SEXP byte_width = Rf_getAttrib(x, symbols::byte_width); - if (Rf_isNull(byte_width) || TYPEOF(byte_width) != INTSXP || - XLENGTH(byte_width) != 1) { - cpp11::stop("malformed arrow_fixed_size_binary object"); - } - return arrow::fixed_size_binary(INTEGER(byte_width)[0]); - } - - if (Rf_inherits(x, "arrow_binary")) { - return arrow::binary(); - } - - if (Rf_inherits(x, "arrow_large_binary")) { - return arrow::large_binary(); - } - - SEXP ptype = Rf_getAttrib(x, symbols::ptype); - if (Rf_isNull(ptype)) { - if (XLENGTH(x) == 0) { - cpp11::stop( - "Requires at least one element to infer the values' type of a list vector"); - } - - ptype = VECTOR_ELT(x, 0); - } - - return arrow::list(InferArrowType(ptype)); - } -} - -std::shared_ptr InferArrowType(SEXP x) { - switch (TYPEOF(x)) { - case ENVSXP: - return InferArrowTypeFromVector(x); - case LGLSXP: - return InferArrowTypeFromVector(x); - case INTSXP: - return InferArrowTypeFromVector(x); - case REALSXP: - return InferArrowTypeFromVector(x); - case RAWSXP: - return int8(); - case STRSXP: - return InferArrowTypeFromVector(x); - case VECSXP: - return InferArrowTypeFromVector(x); - default: - break; - } - - cpp11::stop("Cannot infer type from vector"); -} - -// in some situations we can just use the memory of the R object in an RBuffer -// instead of going through ArrayBuilder, etc ... -bool can_reuse_memory(SEXP x, const std::shared_ptr& type) { - switch (type->id()) { - case Type::INT32: - return TYPEOF(x) == INTSXP && !OBJECT(x); - case Type::DOUBLE: - return TYPEOF(x) == REALSXP && !OBJECT(x); - case Type::INT8: - return TYPEOF(x) == RAWSXP && !OBJECT(x); - case Type::INT64: - return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64"); - default: - break; - } - return false; -} - -// this is only used on some special cases when the arrow Array can just use the memory of -// the R object, via an RBuffer, hence be zero copy -template -std::shared_ptr MakeSimpleArray(SEXP x) { - using value_type = typename arrow::TypeTraits::ArrayType::value_type; - RVector vec(x); - auto n = vec.size(); - auto p_vec_start = reinterpret_cast(DATAPTR(vec)); - auto p_vec_end = p_vec_start + n; - std::vector> buffers{nullptr, - std::make_shared>(vec)}; - - int null_count = 0; - - auto first_na = std::find_if(p_vec_start, p_vec_end, is_na); - if (first_na < p_vec_end) { - auto null_bitmap = - ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); - internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); - - // first loop to clear all the bits before the first NA - auto j = std::distance(p_vec_start, first_na); - int i = 0; - for (; i < j; i++, bitmap_writer.Next()) { - bitmap_writer.Set(); - } - - auto p_vec = first_na; - // then finish - for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { - if (is_na(*p_vec)) { - bitmap_writer.Clear(); - null_count++; - } else { - bitmap_writer.Set(); - } - } - - bitmap_writer.Finish(); - buffers[0] = std::move(null_bitmap); - } - - auto data = ArrayData::Make(std::make_shared(), LENGTH(x), std::move(buffers), - null_count, 0 /*offset*/); - - // return the right Array class - return std::make_shared::ArrayType>(data); -} - -std::shared_ptr Array__from_vector_reuse_memory(SEXP x) { - auto type = TYPEOF(x); - - if (type == INTSXP) { - return MakeSimpleArray(x); - } else if (type == REALSXP && Rf_inherits(x, "integer64")) { - return MakeSimpleArray(x); - } else if (type == REALSXP) { - return MakeSimpleArray(x); - } else if (type == RAWSXP) { - return MakeSimpleArray(x); - } - - cpp11::stop("Unreachable: you might need to fix can_reuse_memory()"); -} - -bool CheckCompatibleFactor(SEXP obj, const std::shared_ptr& type) { - if (!Rf_inherits(obj, "factor")) { - return false; - } - - const auto& dict_type = checked_cast(*type); - return dict_type.value_type()->Equals(utf8()); -} - -arrow::Status CheckCompatibleStruct(SEXP obj, - const std::shared_ptr& type) { - if (!Rf_inherits(obj, "data.frame")) { - return Status::RError("Conversion to struct arrays requires a data.frame"); - } - - // check the number of columns - int num_fields = type->num_fields(); - if (XLENGTH(obj) != num_fields) { - return Status::RError("Number of fields in struct (", num_fields, - ") incompatible with number of columns in the data frame (", - XLENGTH(obj), ")"); - } - - // check the names of each column - // - // the columns themselves are not checked against the - // types of the fields, because Array__from_vector will error - // when not compatible. - cpp11::strings names = Rf_getAttrib(obj, R_NamesSymbol); - - return cpp11::unwind_protect([&] { - for (int i = 0; i < num_fields; i++) { - const char* name_i = arrow::r::unsafe::utf8_string(names[i]); - auto field_name = type->field(i)->name(); - if (field_name != name_i) { - return Status::RError( - "Field name in position ", i, " (", field_name, - ") does not match the name of the column of the data frame (", name_i, ")"); - } - } - - return Status::OK(); - }); -} - -std::shared_ptr Array__from_vector( - SEXP x, const std::shared_ptr& type, bool type_inferred) { - // short circuit if `x` is already an Array - if (Rf_inherits(x, "Array")) { - return cpp11::as_cpp>(x); - } - - // special case when we can just use the data from the R vector - // directly. This still needs to handle the null bitmap - if (arrow::r::can_reuse_memory(x, type)) { - return arrow::r::Array__from_vector_reuse_memory(x); - } - - // factors only when type has been inferred - if (type->id() == Type::DICTIONARY) { - if (type_inferred || arrow::r::CheckCompatibleFactor(x, type)) { - // TODO: use VectorToArrayConverter instead, but it does not appear to work - // correctly with ordered dictionary yet - // - // return VectorToArrayConverter::Visit(x, type); - return arrow::r::MakeFactorArray(x, type); - } - - cpp11::stop("Object incompatible with dictionary type"); - } - - if (type->id() == Type::LIST || type->id() == Type::LARGE_LIST || - type->id() == Type::FIXED_SIZE_LIST) { - return VectorToArrayConverter::Visit(x, type); - } - - // struct types - if (type->id() == Type::STRUCT) { - if (!type_inferred) { - StopIfNotOk(arrow::r::CheckCompatibleStruct(x, type)); - } - // TODO: when the type has been infered, we could go through - // VectorToArrayConverter: - // - // else { - // return VectorToArrayConverter::Visit(df, type); - // } - - return arrow::r::MakeStructArray(x, type); - } - - // general conversion with converter and builder - std::unique_ptr converter; - StopIfNotOk(arrow::r::GetConverter(type, &converter)); - - // Create ArrayBuilder for type - std::unique_ptr type_builder; - StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder)); - StopIfNotOk(converter->Init(type_builder.get())); - - // ingest R data and grab the result array - StopIfNotOk(converter->Ingest(x)); - std::shared_ptr result; - StopIfNotOk(converter->GetResult(&result)); - return result; -} - -} // namespace r -} // namespace arrow - -// [[arrow::export]] -std::shared_ptr Array__infer_type(SEXP x) { - return arrow::r::InferArrowType(x); -} - -// [[arrow::export]] -std::shared_ptr Array__from_vector(SEXP x, SEXP s_type) { - // the type might be NULL, in which case we need to infer it from the data - // we keep track of whether it was inferred or supplied - bool type_inferred = Rf_isNull(s_type); - std::shared_ptr type; - if (type_inferred) { - type = arrow::r::InferArrowType(x); - } else { - type = cpp11::as_cpp>(s_type); - } - - return arrow::r::Array__from_vector(x, type, type_inferred); -} - -// [[arrow::export]] -std::shared_ptr ChunkedArray__from_list(cpp11::list chunks, - SEXP s_type) { - std::vector> vec; - - // the type might be NULL, in which case we need to infer it from the data - // we keep track of whether it was inferred or supplied - bool type_inferred = Rf_isNull(s_type); - R_xlen_t n = XLENGTH(chunks); - - std::shared_ptr type; - if (type_inferred) { - if (n == 0) { - cpp11::stop("type must be specified for empty list"); - } - type = arrow::r::InferArrowType(VECTOR_ELT(chunks, 0)); - } else { - type = cpp11::as_cpp>(s_type); - } - - if (n == 0) { - std::shared_ptr array; - std::unique_ptr type_builder; - StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder)); - StopIfNotOk(type_builder->Finish(&array)); - vec.push_back(array); - } else { - // the first - might differ from the rest of the loop - // because we might have inferred the type from the first element of the list - // - // this only really matters for dictionary arrays - vec.push_back(arrow::r::Array__from_vector(chunks[0], type, type_inferred)); - - for (R_xlen_t i = 1; i < n; i++) { - vec.push_back(arrow::r::Array__from_vector(chunks[i], type, false)); - } - } - - return std::make_shared(std::move(vec)); -} - -// [[arrow::export]] -std::shared_ptr DictionaryArray__FromArrays( - const std::shared_ptr& type, - const std::shared_ptr& indices, - const std::shared_ptr& dict) { - return ValueOrStop(arrow::DictionaryArray::FromArrays(type, indices, dict)); -} - -#endif diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index c9a1f6cf9ee..ddcb7494697 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -18,8 +18,6 @@ #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) -#include - #include #include #include @@ -30,6 +28,8 @@ #include #include +#include + namespace arrow { using internal::checked_cast; diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index ae90abd5adf..732004067dd 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -289,42 +289,6 @@ BEGIN_CPP11 return cpp11::as_sexp(LargeListArray__raw_value_offsets(array)); END_CPP11 } -// array_from_vector.cpp -std::shared_ptr Array__infer_type(SEXP x); -extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ -BEGIN_CPP11 - arrow::r::Input::type x(x_sexp); - return cpp11::as_sexp(Array__infer_type(x)); -END_CPP11 -} -// array_from_vector.cpp -std::shared_ptr Array__from_vector(SEXP x, SEXP s_type); -extern "C" SEXP _arrow_Array__from_vector(SEXP x_sexp, SEXP s_type_sexp){ -BEGIN_CPP11 - arrow::r::Input::type x(x_sexp); - arrow::r::Input::type s_type(s_type_sexp); - return cpp11::as_sexp(Array__from_vector(x, s_type)); -END_CPP11 -} -// array_from_vector.cpp -std::shared_ptr ChunkedArray__from_list(cpp11::list chunks, SEXP s_type); -extern "C" SEXP _arrow_ChunkedArray__from_list(SEXP chunks_sexp, SEXP s_type_sexp){ -BEGIN_CPP11 - arrow::r::Input::type chunks(chunks_sexp); - arrow::r::Input::type s_type(s_type_sexp); - return cpp11::as_sexp(ChunkedArray__from_list(chunks, s_type)); -END_CPP11 -} -// array_from_vector.cpp -std::shared_ptr DictionaryArray__FromArrays(const std::shared_ptr& type, const std::shared_ptr& indices, const std::shared_ptr& dict); -extern "C" SEXP _arrow_DictionaryArray__FromArrays(SEXP type_sexp, SEXP indices_sexp, SEXP dict_sexp){ -BEGIN_CPP11 - arrow::r::Input&>::type type(type_sexp); - arrow::r::Input&>::type indices(indices_sexp); - arrow::r::Input&>::type dict(dict_sexp); - return cpp11::as_sexp(DictionaryArray__FromArrays(type, indices, dict)); -END_CPP11 -} // array_to_vector.cpp SEXP Array__as_vector(const std::shared_ptr& array); extern "C" SEXP _arrow_Array__as_vector(SEXP array_sexp){ @@ -560,6 +524,15 @@ BEGIN_CPP11 return cpp11::as_sexp(ChunkedArray__ToString(x)); END_CPP11 } +// chunkedarray.cpp +std::shared_ptr ChunkedArray__from_list(cpp11::list chunks, SEXP s_type); +extern "C" SEXP _arrow_ChunkedArray__from_list(SEXP chunks_sexp, SEXP s_type_sexp){ +BEGIN_CPP11 + arrow::r::Input::type chunks(chunks_sexp); + arrow::r::Input::type s_type(s_type_sexp); + return cpp11::as_sexp(ChunkedArray__from_list(chunks, s_type)); +END_CPP11 +} // compression.cpp std::shared_ptr util___Codec__Create(arrow::Compression::type codec, R_xlen_t compression_level); extern "C" SEXP _arrow_util___Codec__Create(SEXP codec_sexp, SEXP compression_level_sexp){ @@ -2764,6 +2737,25 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// r_to_arrow.cpp +SEXP vec_to_arrow(SEXP x, SEXP s_type); +extern "C" SEXP _arrow_vec_to_arrow(SEXP x_sexp, SEXP s_type_sexp){ +BEGIN_CPP11 + arrow::r::Input::type x(x_sexp); + arrow::r::Input::type s_type(s_type_sexp); + return cpp11::as_sexp(vec_to_arrow(x, s_type)); +END_CPP11 +} +// r_to_arrow.cpp +std::shared_ptr DictionaryArray__FromArrays(const std::shared_ptr& type, const std::shared_ptr& indices, const std::shared_ptr& dict); +extern "C" SEXP _arrow_DictionaryArray__FromArrays(SEXP type_sexp, SEXP indices_sexp, SEXP dict_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type type(type_sexp); + arrow::r::Input&>::type indices(indices_sexp); + arrow::r::Input&>::type dict(dict_sexp); + return cpp11::as_sexp(DictionaryArray__FromArrays(type, indices, dict)); +END_CPP11 +} // recordbatch.cpp int RecordBatch__num_columns(const std::shared_ptr& x); extern "C" SEXP _arrow_RecordBatch__num_columns(SEXP x_sexp){ @@ -3464,6 +3456,14 @@ BEGIN_CPP11 return R_NilValue; END_CPP11 } +// type_infer.cpp +std::shared_ptr Array__infer_type(SEXP x); +extern "C" SEXP _arrow_Array__infer_type(SEXP x_sexp){ +BEGIN_CPP11 + arrow::r::Input::type x(x_sexp); + return cpp11::as_sexp(Array__infer_type(x)); +END_CPP11 +} extern "C" SEXP _arrow_Table__Reset(SEXP r6) { BEGIN_CPP11 arrow::r::r6_reset_pointer(r6); @@ -3533,10 +3533,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_FixedSizeListArray__value_offset", (DL_FUNC) &_arrow_FixedSizeListArray__value_offset, 2}, { "_arrow_ListArray__raw_value_offsets", (DL_FUNC) &_arrow_ListArray__raw_value_offsets, 1}, { "_arrow_LargeListArray__raw_value_offsets", (DL_FUNC) &_arrow_LargeListArray__raw_value_offsets, 1}, - { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, - { "_arrow_Array__from_vector", (DL_FUNC) &_arrow_Array__from_vector, 2}, - { "_arrow_ChunkedArray__from_list", (DL_FUNC) &_arrow_ChunkedArray__from_list, 2}, - { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1}, { "_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 1}, { "_arrow_RecordBatch__to_dataframe", (DL_FUNC) &_arrow_RecordBatch__to_dataframe, 2}, @@ -3565,6 +3561,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ChunkedArray__Validate", (DL_FUNC) &_arrow_ChunkedArray__Validate, 1}, { "_arrow_ChunkedArray__Equals", (DL_FUNC) &_arrow_ChunkedArray__Equals, 2}, { "_arrow_ChunkedArray__ToString", (DL_FUNC) &_arrow_ChunkedArray__ToString, 1}, + { "_arrow_ChunkedArray__from_list", (DL_FUNC) &_arrow_ChunkedArray__from_list, 2}, { "_arrow_util___Codec__Create", (DL_FUNC) &_arrow_util___Codec__Create, 2}, { "_arrow_util___Codec__name", (DL_FUNC) &_arrow_util___Codec__name, 1}, { "_arrow_util___Codec__IsAvailable", (DL_FUNC) &_arrow_util___Codec__IsAvailable, 1}, @@ -3817,6 +3814,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExportSchema", (DL_FUNC) &_arrow_ExportSchema, 2}, { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, { "_arrow_ExportRecordBatch", (DL_FUNC) &_arrow_ExportRecordBatch, 3}, + { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, + { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, { "_arrow_RecordBatch__num_rows", (DL_FUNC) &_arrow_RecordBatch__num_rows, 1}, { "_arrow_RecordBatch__schema", (DL_FUNC) &_arrow_RecordBatch__schema, 1}, @@ -3897,6 +3896,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, { "_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, + { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, { "_arrow_Table__Reset", (DL_FUNC) &_arrow_Table__Reset, 1}, { "_arrow_RecordBatch__Reset", (DL_FUNC) &_arrow_RecordBatch__Reset, 1}, {NULL, NULL, 0} diff --git a/r/src/arrow_cpp11.h b/r/src/arrow_cpp11.h index 2329db11e99..1d0e26e1a38 100644 --- a/r/src/arrow_cpp11.h +++ b/r/src/arrow_cpp11.h @@ -23,6 +23,7 @@ #undef Free #include +#include #include "./nameof.h" diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 909ccfb217a..b37c01c7621 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -23,14 +23,14 @@ #if defined(ARROW_R_WITH_ARROW) -#include -#include -#include - #include // for RBuffer definition below #include #include +#include +#include +#include + // forward declaration-only headers #include #include @@ -49,7 +49,6 @@ namespace fs = ::arrow::fs; SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); SEXP Array__as_vector(const std::shared_ptr& array); -std::shared_ptr Array__from_vector(SEXP x, SEXP type); std::shared_ptr RecordBatch__from_arrays(SEXP, SEXP); arrow::MemoryPool* gc_memory_pool(); @@ -64,6 +63,8 @@ arrow::MemoryPool* gc_memory_pool(); #define DATAPTR(x) (void*)STRING_PTR(x) #endif +#define VECTOR_PTR_RO(x) ((const SEXP*)DATAPTR_RO(x)) + namespace arrow { static inline void StopIfNotOk(const Status& status) { @@ -81,13 +82,15 @@ auto ValueOrStop(R&& result) -> decltype(std::forward(result).ValueOrDie()) { namespace r { std::shared_ptr InferArrowType(SEXP x); +std::shared_ptr vec_to_arrow__reuse_memory(SEXP x); +bool can_reuse_memory(SEXP x, const std::shared_ptr& type); Status count_fields(SEXP lst, int* out); -std::shared_ptr Array__from_vector( - SEXP x, const std::shared_ptr& type, bool type_inferred); - void inspect(SEXP obj); +std::shared_ptr vec_to_arrow(SEXP x, + const std::shared_ptr& type, + bool type_inferred); // the integer64 sentinel constexpr int64_t NA_INT64 = std::numeric_limits::min(); diff --git a/r/src/chunkedarray.cpp b/r/src/chunkedarray.cpp index 52ceff7d914..10c6e84b3bb 100644 --- a/r/src/chunkedarray.cpp +++ b/r/src/chunkedarray.cpp @@ -19,6 +19,7 @@ #if defined(ARROW_R_WITH_ARROW) +#include #include // [[arrow::export]] @@ -94,4 +95,45 @@ std::string ChunkedArray__ToString(const std::shared_ptr& x return x->ToString(); } +// [[arrow::export]] +std::shared_ptr ChunkedArray__from_list(cpp11::list chunks, + SEXP s_type) { + std::vector> vec; + + // the type might be NULL, in which case we need to infer it from the data + // we keep track of whether it was inferred or supplied + bool type_inferred = Rf_isNull(s_type); + R_xlen_t n = XLENGTH(chunks); + + std::shared_ptr type; + if (type_inferred) { + if (n == 0) { + cpp11::stop("type must be specified for empty list"); + } + type = arrow::r::InferArrowType(VECTOR_ELT(chunks, 0)); + } else { + type = cpp11::as_cpp>(s_type); + } + + if (n == 0) { + std::shared_ptr array; + std::unique_ptr type_builder; + StopIfNotOk(arrow::MakeBuilder(gc_memory_pool(), type, &type_builder)); + StopIfNotOk(type_builder->Finish(&array)); + vec.push_back(array); + } else { + // the first - might differ from the rest of the loop + // because we might have inferred the type from the first element of the list + // + // this only really matters for dictionary arrays + vec.push_back(arrow::r::vec_to_arrow(chunks[0], type, type_inferred)); + + for (R_xlen_t i = 1; i < n; i++) { + vec.push_back(arrow::r::vec_to_arrow(chunks[i], type, false)); + } + } + + return std::make_shared(std::move(vec)); +} + #endif diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp new file mode 100644 index 00000000000..2137e0f2ad9 --- /dev/null +++ b/r/src/r_to_arrow.cpp @@ -0,0 +1,1054 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "./arrow_types.h" +#include "./arrow_vctrs.h" + +#if defined(ARROW_R_WITH_ARROW) +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace arrow { + +using internal::checked_cast; +using internal::checked_pointer_cast; + +using internal::Converter; +using internal::DictionaryConverter; +using internal::ListConverter; +using internal::PrimitiveConverter; +using internal::StructConverter; + +using internal::MakeChunker; +using internal::MakeConverter; + +namespace r { + +struct RConversionOptions { + RConversionOptions() = default; + + std::shared_ptr type; + bool strict; + int64_t size; +}; + +enum RVectorType { + BOOLEAN, + UINT8, + INT32, + FLOAT64, + INT64, + COMPLEX, + STRING, + DATAFRAME, + DATE_INT, + DATE_DBL, + TIME, + POSIXCT, + POSIXLT, + BINARY, + LIST, + FACTOR, + + OTHER +}; + +// this flattens out a logical type of what an R object is +// because TYPEOF() is not detailed enough +// we can't use arrow types though as there is no 1-1 mapping +RVectorType GetVectorType(SEXP x) { + switch (TYPEOF(x)) { + case LGLSXP: + return BOOLEAN; + case RAWSXP: + return UINT8; + case INTSXP: + if (Rf_inherits(x, "factor")) { + return FACTOR; + } else if (Rf_inherits(x, "Date")) { + return DATE_INT; + } + return INT32; + case STRSXP: + return STRING; + case CPLXSXP: + return COMPLEX; + case REALSXP: { + if (Rf_inherits(x, "Date")) { + return DATE_DBL; + } else if (Rf_inherits(x, "integer64")) { + return INT64; + } else if (Rf_inherits(x, "POSIXct")) { + return POSIXCT; + } else if (Rf_inherits(x, "difftime")) { + return TIME; + } else { + return FLOAT64; + } + } + case VECSXP: { + if (Rf_inherits(x, "data.frame")) { + return DATAFRAME; + } + + if (Rf_inherits(x, "POSIXlt")) { + return POSIXLT; + } + + if (Rf_inherits(x, "arrow_binary")) { + return BINARY; + } + + return LIST; + } + default: + break; + } + return OTHER; +} + +template +bool is_NA(T value); + +template <> +bool is_NA(int value) { + return value == NA_INTEGER; +} + +template <> +bool is_NA(double value) { + return ISNA(value); +} + +template <> +bool is_NA(uint8_t value) { + return false; +} + +template <> +bool is_NA(cpp11::r_bool value) { + return value == NA_LOGICAL; +} + +template <> +bool is_NA(cpp11::r_string value) { + return value == NA_STRING; +} + +template <> +bool is_NA(SEXP value) { + return Rf_isNull(value); +} + +template <> +bool is_NA(int64_t value) { + return value == NA_INT64; +} + +template +struct RVectorVisitor { + using data_type = + typename std::conditional::value, double, T>::type; + using r_vector_type = cpp11::r_vector; + + template + static Status Visit(SEXP x, int64_t size, AppendNull&& append_null, + AppendValue&& append_value) { + r_vector_type values(x); + auto it = values.begin(); + + for (R_xlen_t i = 0; i < size; i++, ++it) { + auto value = GetValue(*it); + + if (is_NA(value)) { + RETURN_NOT_OK(append_null()); + } else { + RETURN_NOT_OK(append_value(value)); + } + } + + return Status::OK(); + } + + static T GetValue(data_type x) { return x; } +}; + +template <> +int64_t RVectorVisitor::GetValue(double x) { + int64_t value; + memcpy(&value, &x, sizeof(int64_t)); + return value; +} + +class RConverter : public Converter { + public: + virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } + + virtual Status Extend(SEXP values, int64_t size) { + return Status::NotImplemented("ExtendMasked"); + } + + virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { + return Status::NotImplemented("ExtendMasked"); + } +}; + +template +class RPrimitiveConverter; + +template +Result CIntFromRScalarImpl(int64_t value) { + if (value < std::numeric_limits::min() || value > std::numeric_limits::max()) { + return Status::Invalid("value outside of range"); + } + return static_cast(value); +} + +template <> +Result CIntFromRScalarImpl(int64_t value) { + if (value < 0) { + return Status::Invalid("value outside of range"); + } + return static_cast(value); +} + +// utility to convert R single values from (int, raw, double and int64) vectors +// to arrow integers and floating point +struct RConvert { + // ---- convert to an arrow integer + template + static enable_if_integer> Convert(Type*, + From from) { + return CIntFromRScalarImpl(from); + } + + // ---- convert R integer types to double + template + static enable_if_t::value && + !std::is_same::value, + Result> + Convert(Type*, From from) { + constexpr int64_t kDoubleMax = 1LL << 53; + constexpr int64_t kDoubleMin = -(1LL << 53); + + if (from < kDoubleMin || from > kDoubleMax) { + return Status::Invalid("Integer value ", from, " is outside of the range exactly", + " representable by a IEEE 754 double precision value"); + } + return static_cast(from); + } + + // ---- convert double to double + template + static enable_if_t::value && + std::is_same::value, + Result> + Convert(Type*, From from) { + return from; + } + + // ---- convert R integer types to float + template + static enable_if_t::value && + !std::is_same::value, + Result> + Convert(Type*, From from) { + constexpr int64_t kFloatMax = 1LL << 24; + constexpr int64_t kFloatMin = -(1LL << 24); + + if (from < kFloatMin || from > kFloatMax) { + return Status::Invalid("Integer value ", from, " is outside of the range exactly", + " representable by a IEEE 754 single precision value"); + } + return static_cast(from); + } + + // ---- convert double to float + template + static enable_if_t::value && + std::is_same::value, + Result> + Convert(Type*, From from) { + return static_cast(from); + } + + // ---- convert to half float: not implemented + template + static enable_if_t::value, + Result> + Convert(Type*, From from) { + return Status::Invalid("Cannot convert to Half Float"); + } +}; + +template +class RPrimitiveConverter> + : public PrimitiveConverter { + public: + Status Extend(SEXP, int64_t size) override { + return this->primitive_builder_->AppendNulls(size); + } +}; + +template +class RPrimitiveConverter< + T, enable_if_t::value || is_floating_type::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + auto rtype = GetVectorType(x); + switch (rtype) { + case UINT8: + return AppendRangeDispatch(x, size); + case INT32: + return AppendRangeDispatch(x, size); + case FLOAT64: + return AppendRangeDispatch(x, size); + case INT64: + return AppendRangeDispatch(x, size); + + default: + break; + } + // TODO: mention T in the error + return Status::Invalid("cannot convert"); + } + + private: + template + Status AppendRangeLoopDifferentType(SEXP x, int64_t size) { + RETURN_NOT_OK(this->Reserve(size)); + + auto append_value = [this](r_value_type value) { + ARROW_ASSIGN_OR_RAISE(auto converted, + RConvert::Convert(this->primitive_type_, value)); + this->primitive_builder_->UnsafeAppend(converted); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } + + template + Status AppendRangeSameTypeNotALTREP(SEXP x, int64_t size) { + auto p = reinterpret_cast(DATAPTR_RO(x)); + auto p_end = p + size; + + auto first_na = std::find_if(p, p_end, is_NA); + + if (first_na == p_end) { + // no nulls, so we can use AppendValues() directly + return this->primitive_builder_->AppendValues(p, p_end); + } + + // Append all values up until the first NULL + RETURN_NOT_OK(this->primitive_builder_->AppendValues(p, first_na)); + + // loop for the remaining + RETURN_NOT_OK(this->primitive_builder_->Reserve(p_end - first_na)); + p = first_na; + for (; p < p_end; ++p) { + r_value_type value = *p; + if (is_NA(value)) { + this->primitive_builder_->UnsafeAppendNull(); + } else { + this->primitive_builder_->UnsafeAppend(value); + } + } + return Status::OK(); + } + + template + Status AppendRangeSameTypeALTREP(SEXP x, int64_t size) { + // if it is altrep, then we use cpp11 looping + // without needing to convert + RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); + typename RVectorVisitor::r_vector_type vec(x); + auto it = vec.begin(); + for (R_xlen_t i = 0; i < size; i++, ++it) { + r_value_type value = RVectorVisitor::GetValue(*it); + if (is_NA(value)) { + this->primitive_builder_->UnsafeAppendNull(); + } else { + this->primitive_builder_->UnsafeAppend(value); + } + } + return Status::OK(); + } + + template + Status AppendRangeDispatch(SEXP x, int64_t size) { + if (std::is_same::value) { + if (!ALTREP(x)) { + return AppendRangeSameTypeNotALTREP(x, size); + } else { + return AppendRangeSameTypeALTREP(x, size); + } + } + + // here if underlying types differ so going + return AppendRangeLoopDifferentType(x, size); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + auto rtype = GetVectorType(x); + if (rtype != BOOLEAN) { + return Status::Invalid("Expecting a logical vector"); + } + RETURN_NOT_OK(this->Reserve(size)); + + auto append_value = [this](cpp11::r_bool value) { + this->primitive_builder_->UnsafeAppend(value == 1); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + + switch (GetVectorType(x)) { + case DATE_INT: + return AppendRange_Date(x, size); + + case DATE_DBL: + return AppendRange_Date(x, size); + + case POSIXCT: + return AppendRange_Posixct(x, size); + + default: + break; + } + + return Status::Invalid("cannot convert to date type "); + } + + private: + template + Status AppendRange_Date(SEXP x, int64_t size) { + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](r_value_type value) { + this->primitive_builder_->UnsafeAppend(FromRDate(this->primitive_type_, value)); + return Status::OK(); + }; + + return RVectorVisitor::Visit(x, size, append_null, append_value); + } + + Status AppendRange_Posixct(SEXP x, int64_t size) { + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](double value) { + this->primitive_builder_->UnsafeAppend(FromPosixct(this->primitive_type_, value)); + return Status::OK(); + }; + + return RVectorVisitor::Visit(x, size, append_null, append_value); + } + + static int FromRDate(const Date32Type*, int from) { return from; } + + static int64_t FromRDate(const Date64Type*, int from) { + constexpr int64_t kMilliSecondsPerDay = 86400000; + return from * kMilliSecondsPerDay; + } + + static int FromPosixct(const Date32Type*, double from) { + constexpr int64_t kSecondsPerDay = 86400; + return from / kSecondsPerDay; + } + + static int64_t FromPosixct(const Date64Type*, double from) { return from * 1000; } +}; + +int64_t get_TimeUnit_multiplier(TimeUnit::type unit) { + switch (unit) { + case TimeUnit::SECOND: + return 1; + case TimeUnit::MILLI: + return 1000; + case TimeUnit::MICRO: + return 1000000; + case TimeUnit::NANO: + return 1000000000; + default: + return 0; + } +} + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + auto rtype = GetVectorType(x); + if (rtype != TIME) { + return Status::Invalid("Invalid conversion to time"); + } + + // multiplier to get the number of seconds from the value stored in the R vector + int difftime_multiplier; + std::string unit(CHAR(STRING_ELT(Rf_getAttrib(x, symbols::units), 0))); + if (unit == "secs") { + difftime_multiplier = 1; + } else if (unit == "mins") { + difftime_multiplier = 60; + } else if (unit == "hours") { + difftime_multiplier = 3600; + } else if (unit == "days") { + difftime_multiplier = 86400; + } else if (unit == "weeks") { + difftime_multiplier = 604800; + } else { + return Status::Invalid("unknown difftime unit"); + } + + // then multiply the seconds by this to match the time unit + auto multiplier = + get_TimeUnit_multiplier(this->primitive_type_->unit()) * difftime_multiplier; + + auto append_value = [this, multiplier](double value) { + auto converted = static_cast(value * multiplier); + this->primitive_builder_->UnsafeAppend(converted); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + + RVectorType rtype = GetVectorType(x); + if (rtype != POSIXCT) { + return Status::Invalid("Invalid conversion to timestamp"); + } + + int64_t multiplier = get_TimeUnit_multiplier(this->primitive_type_->unit()); + + auto append_value = [this, multiplier](double value) { + auto converted = static_cast(value * multiplier); + this->primitive_builder_->UnsafeAppend(converted); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + return Status::NotImplemented("Extend"); + } +}; + +Status check_binary(SEXP x, int64_t size) { + RVectorType rtype = GetVectorType(x); + switch (rtype) { + case BINARY: + break; + case LIST: { + // check this is a list of raw vectors + const SEXP* p_x = VECTOR_PTR_RO(x); + for (R_xlen_t i = 0; i < size; i++, ++p_x) { + if (TYPEOF(*p_x) != RAWSXP) { + return Status::Invalid("invalid R type to convert to binary"); + } + } + break; + } + default: + return Status::Invalid("invalid R type to convert to binary"); + } + return Status::OK(); +} + +template +class RPrimitiveConverter> + : public PrimitiveConverter { + public: + using OffsetType = typename T::offset_type; + + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(check_binary(x, size)); + + auto append_value = [this](SEXP raw) { + R_xlen_t n = XLENGTH(raw); + ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); + this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + RETURN_NOT_OK(check_binary(x, size)); + + auto append_value = [this](SEXP raw) { + R_xlen_t n = XLENGTH(raw); + + if (n != this->primitive_builder_->byte_width()) { + return Status::Invalid("invalid size"); + } + ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); + this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); + return Status::OK(); + }; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +template +class RPrimitiveConverter> + : public PrimitiveConverter { + public: + using OffsetType = typename T::offset_type; + + Status Extend(SEXP x, int64_t size) override { + int64_t start = 0; + RVectorType rtype = GetVectorType(x); + if (rtype != STRING) { + return Status::Invalid("Expecting a character vector"); + } + + cpp11::strings s(arrow::r::utf8_strings(x)); + RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); + auto it = s.begin() + start; + + // we know all the R strings are utf8 already, so we can get + // a definite size and then use UnsafeAppend*() + int64_t total_length = 0; + for (R_xlen_t i = 0; i < size; i++, ++it) { + cpp11::r_string si = *it; + total_length += cpp11::is_na(si) ? 0 : si.size(); + } + RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); + + // append + it = s.begin() + start; + for (R_xlen_t i = 0; i < size; i++, ++it) { + cpp11::r_string si = *it; + if (si == NA_STRING) { + this->primitive_builder_->UnsafeAppendNull(); + } else { + this->primitive_builder_->UnsafeAppend(CHAR(si), si.size()); + } + } + + return Status::OK(); + } +}; + +template +class RPrimitiveConverter::value>> + : public PrimitiveConverter { + public: + Status Extend(SEXP x, int64_t size) override { + // TODO: look in lubridate + return Status::NotImplemented("Extend"); + } +}; + +template +class RListConverter; + +template +class RDictionaryConverter; + +template +class RDictionaryConverter> + : public DictionaryConverter { + public: + Status Extend(SEXP x, int64_t size) override { + return Status::NotImplemented("Extend"); + } +}; + +template +class RDictionaryConverter> + : public DictionaryConverter { + public: + using BuilderType = DictionaryBuilder; + + Status Extend(SEXP x, int64_t size) override { + // first we need to handle the levels + cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol)); + auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false); + RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); + + // then we can proceed + RETURN_NOT_OK(this->Reserve(size)); + + RVectorType rtype = GetVectorType(x); + if (rtype != FACTOR) { + return Status::Invalid("invalid R type to convert to dictionary"); + } + + auto append_value = [this, levels](int value) { + SEXP s = STRING_ELT(levels, value - 1); + return this->value_builder_->Append(CHAR(s)); + }; + auto append_null = [this]() { return this->value_builder_->AppendNull(); }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } + + Result> ToArray() override { + ARROW_ASSIGN_OR_RAISE(auto result, this->builder_->Finish()); + + auto result_type = checked_cast(result->type().get()); + if (this->dict_type_->ordered() && !result_type->ordered()) { + // TODO: we should not have to do that, there is probably something wrong + // in the DictionaryBuilder code + result->data()->type = + arrow::dictionary(result_type->index_type(), result_type->value_type(), true); + } + + return result; + } +}; + +template +struct RConverterTrait; + +template +struct RConverterTrait< + T, enable_if_t::value && !is_interval_type::value && + !is_extension_type::value>> { + using type = RPrimitiveConverter; +}; + +template +struct RConverterTrait> { + using type = RListConverter; +}; + +template +class RListConverter : public ListConverter { + public: + Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(this->Reserve(size)); + + RVectorType rtype = GetVectorType(x); + if (rtype != LIST) { + return Status::Invalid("Cannot convert to list type"); + } + + auto append_value = [this](SEXP value) { + // TODO: this should always use vctrs::short_vec_size + // but that introduced a regression: + // https://github.com/apache/arrow/pull/8650#issuecomment-786940734 + int n; + if (TYPEOF(value) == VECSXP && !Rf_inherits(value, "data.frame")) { + n = Rf_length(value); + } else { + n = vctrs::short_vec_size(value); + } + + RETURN_NOT_OK(this->list_builder_->ValidateOverflow(n)); + RETURN_NOT_OK(this->list_builder_->Append()); + return this->value_converter_.get()->Extend(value, n); + }; + auto append_null = [this]() { return this->list_builder_->AppendNull(); }; + return RVectorVisitor::Visit(x, size, append_null, append_value); + } +}; + +class RStructConverter; + +template <> +struct RConverterTrait { + using type = RStructConverter; +}; + +class RStructConverter : public StructConverter { + public: + Status Extend(SEXP x, int64_t size) override { + // check that x is compatible + R_xlen_t n_columns = XLENGTH(x); + + if (!Rf_inherits(x, "data.frame") && !Rf_inherits(x, "POSIXlt")) { + return Status::Invalid("Can only convert data frames to Struct type"); + } + + auto fields = this->struct_type_->fields(); + if (n_columns != static_cast(fields.size())) { + return Status::RError("Number of fields in struct (", fields.size(), + ") incompatible with number of columns in the data frame (", + n_columns, ")"); + } + + cpp11::strings x_names = Rf_getAttrib(x, R_NamesSymbol); + + RETURN_NOT_OK(cpp11::unwind_protect([&] { + for (int i = 0; i < n_columns; i++) { + const char* name_i = arrow::r::unsafe::utf8_string(x_names[i]); + auto field_name = fields[i]->name(); + if (field_name != name_i) { + return Status::RError( + "Field name in position ", i, " (", field_name, + ") does not match the name of the column of the data frame (", name_i, ")"); + } + } + + return Status::OK(); + })); + + for (R_xlen_t i = 0; i < n_columns; i++) { + std::string name(x_names[i]); + if (name != fields[i]->name()) { + return Status::RError( + "Field name in position ", i, " (", fields[i]->name(), + ") does not match the name of the column of the data frame (", name, ")"); + } + } + + for (R_xlen_t i = 0; i < n_columns; i++) { + SEXP x_i = VECTOR_ELT(x, i); + if (vctrs::short_vec_size(x_i) < size) { + return Status::RError("Degenerated data frame"); + } + } + + RETURN_NOT_OK(this->Reserve(size)); + + for (R_xlen_t i = 0; i < size; i++) { + RETURN_NOT_OK(struct_builder_->Append()); + } + + for (R_xlen_t i = 0; i < n_columns; i++) { + auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); + if (!status.ok()) { + return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), + "): ", status.ToString()); + } + } + + return Status::OK(); + } + + protected: + Status Init(MemoryPool* pool) override { + return StructConverter::Init(pool); + } +}; + +template <> +struct RConverterTrait { + template + using dictionary_type = RDictionaryConverter; +}; + +// ---- short circuit the Converter api entirely when we can do zero-copy + +// in some situations we can just use the memory of the R object in an RBuffer +// instead of going through ArrayBuilder, etc ... +bool can_reuse_memory(SEXP x, const std::shared_ptr& type) { + // TODO: this probably should be disabled when x is an ALTREP object + // because MakeSimpleArray below will force materialization + switch (type->id()) { + case Type::INT32: + return TYPEOF(x) == INTSXP && !OBJECT(x); + case Type::DOUBLE: + return TYPEOF(x) == REALSXP && !OBJECT(x); + case Type::INT8: + return TYPEOF(x) == RAWSXP && !OBJECT(x); + case Type::INT64: + return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64"); + default: + break; + } + return false; +} + +// this is only used on some special cases when the arrow Array can just use the memory of +// the R object, via an RBuffer, hence be zero copy +template +std::shared_ptr MakeSimpleArray(SEXP x) { + using value_type = typename arrow::TypeTraits::ArrayType::value_type; + RVector vec(x); + auto n = vec.size(); + auto p_vec_start = reinterpret_cast(DATAPTR_RO(vec)); + auto p_vec_end = p_vec_start + n; + std::vector> buffers{nullptr, + std::make_shared>(vec)}; + + int null_count = 0; + + auto first_na = std::find_if(p_vec_start, p_vec_end, is_NA); + if (first_na < p_vec_end) { + auto null_bitmap = + ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); + internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); + + // first loop to clear all the bits before the first NA + auto j = std::distance(p_vec_start, first_na); + int i = 0; + for (; i < j; i++, bitmap_writer.Next()) { + bitmap_writer.Set(); + } + + auto p_vec = first_na; + // then finish + for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { + if (is_NA(*p_vec)) { + bitmap_writer.Clear(); + null_count++; + } else { + bitmap_writer.Set(); + } + } + + bitmap_writer.Finish(); + buffers[0] = std::move(null_bitmap); + } + + auto data = ArrayData::Make(std::make_shared(), LENGTH(x), std::move(buffers), + null_count, 0 /*offset*/); + + // return the right Array class + return std::make_shared::ArrayType>(data); +} + +std::shared_ptr vec_to_arrow__reuse_memory(SEXP x) { + auto type = TYPEOF(x); + + if (type == INTSXP) { + return MakeSimpleArray(x); + } else if (type == REALSXP && Rf_inherits(x, "integer64")) { + return MakeSimpleArray(x); + } else if (type == REALSXP) { + return MakeSimpleArray(x); + } else if (type == RAWSXP) { + return MakeSimpleArray(x); + } + + cpp11::stop("Unreachable: you might need to fix can_reuse_memory()"); +} + +std::shared_ptr vec_to_arrow(SEXP x, + const std::shared_ptr& type, + bool type_inferred) { + // short circuit if `x` is already an Array + if (Rf_inherits(x, "Array")) { + return cpp11::as_cpp>(x); + } + + RConversionOptions options; + options.strict = !type_inferred; + options.type = type; + options.size = vctrs::short_vec_size(x); + + // maybe short circuit when zero-copy is possible + if (can_reuse_memory(x, options.type)) { + return vec_to_arrow__reuse_memory(x); + } + + // otherwise go through the converter api + auto converter = ValueOrStop(MakeConverter( + options.type, options, gc_memory_pool())); + + StopIfNotOk(converter->Extend(x, options.size)); + return ValueOrStop(converter->ToArray()); +} + +} // namespace r +} // namespace arrow + +// [[arrow::export]] +SEXP vec_to_arrow(SEXP x, SEXP s_type) { + if (Rf_inherits(x, "Array")) return x; + bool type_inferred = Rf_isNull(s_type); + std::shared_ptr type; + + if (type_inferred) { + type = arrow::r::InferArrowType(x); + } else { + type = cpp11::as_cpp>(s_type); + } + return cpp11::to_r6(arrow::r::vec_to_arrow(x, type, type_inferred)); +} + +// [[arrow::export]] +std::shared_ptr DictionaryArray__FromArrays( + const std::shared_ptr& type, + const std::shared_ptr& indices, + const std::shared_ptr& dict) { + return ValueOrStop(arrow::DictionaryArray::FromArrays(type, indices, dict)); +} + +#endif diff --git a/r/src/recordbatch.cpp b/r/src/recordbatch.cpp index 715bf8ac65b..74bc9ec92f1 100644 --- a/r/src/recordbatch.cpp +++ b/r/src/recordbatch.cpp @@ -264,7 +264,7 @@ std::shared_ptr RecordBatch__from_arrays__known_schema( cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, schema->field(j)->name().c_str(), name.c_str()); } - arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), false); + arrays[j] = arrow::r::vec_to_arrow(x, schema->field(j)->type(), false); }; arrow::r::TraverseDots(lst, num_fields, fill_array); @@ -281,7 +281,7 @@ arrow::Status CollectRecordBatchArrays( SEXP lst, const std::shared_ptr& schema, int num_fields, bool inferred, std::vector>& arrays) { auto extract_one_array = [&arrays, &schema, inferred](int j, SEXP x, cpp11::r_string) { - arrays[j] = arrow::r::Array__from_vector(x, schema->field(j)->type(), inferred); + arrays[j] = arrow::r::vec_to_arrow(x, schema->field(j)->type(), inferred); }; arrow::r::TraverseDots(lst, num_fields, extract_one_array); return arrow::Status::OK(); diff --git a/r/src/table.cpp b/r/src/table.cpp index 14e5f4e92b5..081d14a4c5c 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -265,7 +265,7 @@ arrow::Status CollectTableColumns( columns[j] = std::make_shared( cpp11::as_cpp>(x)); } else { - auto array = arrow::r::Array__from_vector(x, schema->field(j)->type(), inferred); + auto array = arrow::r::vec_to_arrow(x, schema->field(j)->type(), inferred); columns[j] = std::make_shared(array); } }; diff --git a/r/src/type_infer.cpp b/r/src/type_infer.cpp new file mode 100644 index 00000000000..93e51be6462 --- /dev/null +++ b/r/src/type_infer.cpp @@ -0,0 +1,202 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "./arrow_types.h" +#include "./arrow_vctrs.h" + +#if defined(ARROW_R_WITH_ARROW) +#include + +namespace arrow { +namespace r { + +static inline std::shared_ptr IndexTypeForFactors(int n_factors) { + if (n_factors < INT8_MAX) { + return arrow::int8(); + } else if (n_factors < INT16_MAX) { + return arrow::int16(); + } else { + return arrow::int32(); + } +} + +std::shared_ptr InferArrowTypeFromFactor(SEXP factor) { + SEXP factors = Rf_getAttrib(factor, R_LevelsSymbol); + auto index_type = IndexTypeForFactors(Rf_length(factors)); + bool is_ordered = Rf_inherits(factor, "ordered"); + return dictionary(index_type, arrow::utf8(), is_ordered); +} + +template +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + cpp11::stop("Unknown vector type: ", VectorType); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + if (Rf_inherits(x, "Array")) { + return cpp11::as_cpp>(x)->type(); + } + + cpp11::stop("Unrecognized vector instance for type ENVSXP"); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + return Rf_inherits(x, "vctrs_unspecified") ? null() : boolean(); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + if (Rf_isFactor(x)) { + return InferArrowTypeFromFactor(x); + } else if (Rf_inherits(x, "Date")) { + return date32(); + } else if (Rf_inherits(x, "POSIXct")) { + auto tzone_sexp = Rf_getAttrib(x, symbols::tzone); + if (Rf_isNull(tzone_sexp)) { + return timestamp(TimeUnit::MICRO); + } else { + return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0))); + } + } + return int32(); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + if (Rf_inherits(x, "Date")) { + return date32(); + } + if (Rf_inherits(x, "POSIXct")) { + auto tzone_sexp = Rf_getAttrib(x, symbols::tzone); + if (Rf_isNull(tzone_sexp)) { + return timestamp(TimeUnit::MICRO); + } else { + return timestamp(TimeUnit::MICRO, CHAR(STRING_ELT(tzone_sexp, 0))); + } + } + if (Rf_inherits(x, "integer64")) { + return int64(); + } + if (Rf_inherits(x, "difftime")) { + return time32(TimeUnit::SECOND); + } + return float64(); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + return cpp11::unwind_protect([&] { + R_xlen_t n = XLENGTH(x); + + int64_t size = 0; + + for (R_xlen_t i = 0; i < n; i++) { + size += arrow::r::unsafe::r_string_size(STRING_ELT(x, i)); + if (size > arrow::kBinaryMemoryLimit) { + // Exceeds 2GB capacity of utf8 type, so use large + return large_utf8(); + } + } + + return utf8(); + }); +} + +static inline std::shared_ptr InferArrowTypeFromDataFrame( + cpp11::list x) { + R_xlen_t n = x.size(); + cpp11::strings names(x.attr(R_NamesSymbol)); + std::vector> fields(n); + for (R_xlen_t i = 0; i < n; i++) { + fields[i] = arrow::field(names[i], InferArrowType(x[i])); + } + return arrow::struct_(std::move(fields)); +} + +template <> +std::shared_ptr InferArrowTypeFromVector(SEXP x) { + if (Rf_inherits(x, "data.frame") || Rf_inherits(x, "POSIXlt")) { + return InferArrowTypeFromDataFrame(x); + } else { + // some known special cases + if (Rf_inherits(x, "arrow_fixed_size_binary")) { + SEXP byte_width = Rf_getAttrib(x, symbols::byte_width); + if (Rf_isNull(byte_width) || TYPEOF(byte_width) != INTSXP || + XLENGTH(byte_width) != 1) { + cpp11::stop("malformed arrow_fixed_size_binary object"); + } + return arrow::fixed_size_binary(INTEGER(byte_width)[0]); + } + + if (Rf_inherits(x, "arrow_binary")) { + return arrow::binary(); + } + + if (Rf_inherits(x, "arrow_large_binary")) { + return arrow::large_binary(); + } + + SEXP ptype = Rf_getAttrib(x, symbols::ptype); + if (Rf_isNull(ptype)) { + if (XLENGTH(x) == 0) { + cpp11::stop( + "Requires at least one element to infer the values' type of a list vector"); + } + + ptype = VECTOR_ELT(x, 0); + } + + return arrow::list(InferArrowType(ptype)); + } +} + +std::shared_ptr InferArrowType(SEXP x) { + switch (TYPEOF(x)) { + case ENVSXP: + return InferArrowTypeFromVector(x); + case LGLSXP: + return InferArrowTypeFromVector(x); + case INTSXP: + return InferArrowTypeFromVector(x); + case REALSXP: + return InferArrowTypeFromVector(x); + case RAWSXP: + return int8(); + case STRSXP: + return InferArrowTypeFromVector(x); + case VECSXP: + return InferArrowTypeFromVector(x); + default: + break; + } + + cpp11::stop("Cannot infer type from vector"); +} + +} // namespace r +} // namespace arrow + +// [[arrow::export]] +std::shared_ptr Array__infer_type(SEXP x) { + return arrow::r::InferArrowType(x); +} + +#endif diff --git a/r/tests/testthat/test-Array.R b/r/tests/testthat/test-Array.R index 03c4f379fd1..5cf0b0dad0f 100644 --- a/r/tests/testthat/test-Array.R +++ b/r/tests/testthat/test-Array.R @@ -385,24 +385,23 @@ test_that("Array$create() supports the type= argument. conversion from INTSXP an }) test_that("Array$create() aborts on overflow", { - msg <- "Invalid.*Value is too large" - expect_error(Array$create(128L, type = int8()), msg) - expect_error(Array$create(-129L, type = int8()), msg) + expect_error(Array$create(128L, type = int8())) + expect_error(Array$create(-129L, type = int8())) - expect_error(Array$create(256L, type = uint8()), msg) - expect_error(Array$create(-1L, type = uint8()), msg) + expect_error(Array$create(256L, type = uint8())) + expect_error(Array$create(-1L, type = uint8())) - expect_error(Array$create(32768L, type = int16()), msg) - expect_error(Array$create(-32769L, type = int16()), msg) + expect_error(Array$create(32768L, type = int16())) + expect_error(Array$create(-32769L, type = int16())) - expect_error(Array$create(65536L, type = uint16()), msg) - expect_error(Array$create(-1L, type = uint16()), msg) + expect_error(Array$create(65536L, type = uint16())) + expect_error(Array$create(-1L, type = uint16())) - expect_error(Array$create(65536L, type = uint16()), msg) - expect_error(Array$create(-1L, type = uint16()), msg) + expect_error(Array$create(65536L, type = uint16())) + expect_error(Array$create(-1L, type = uint16())) - expect_error(Array$create(bit64::as.integer64(2^31), type = int32()), msg) - expect_error(Array$create(bit64::as.integer64(2^32), type = uint32()), msg) + expect_error(Array$create(bit64::as.integer64(2^31), type = int32())) + expect_error(Array$create(bit64::as.integer64(2^32), type = uint32())) }) test_that("Array$create() does not convert doubles to integer", { @@ -483,7 +482,7 @@ test_that("Array$create() can handle data frame with custom struct type (not inf expect_error(Array$create(df, type = type), regexp = "Field name in position.*does not match the name of the column of the data frame") type <- struct(x = float64(), y = utf8()) - expect_error(Array$create(df, type = type), regexp = "Expecting a character vector") + expect_error(Array$create(df, type = type), regexp = "Invalid") }) test_that("Array$create() supports tibble with no columns (ARROW-8354)", { @@ -651,8 +650,6 @@ test_that("Handling string data with embedded nuls", { test_that("Array$create() should have helpful error", { expect_error(Array$create(list(numeric(0)), list_of(bool())), "Expecting a logical vector") - expect_error(Array$create(list(numeric(0)), list_of(int32())), "Expecting an integer vector") - expect_error(Array$create(list(integer(0)), list_of(float64())), "Expecting a numeric vector") lgl <- logical(0) int <- integer(0) @@ -661,7 +658,6 @@ test_that("Array$create() should have helpful error", { expect_error(Array$create(list()), "Requires at least one element to infer") expect_error(Array$create(list(lgl, lgl, int)), "Expecting a logical vector") expect_error(Array$create(list(char, num, char)), "Expecting a character vector") - expect_error(Array$create(list(int, int, num)), "Expecting an integer vector") }) test_that("Array$View() (ARROW-6542)", { diff --git a/r/tests/testthat/test-chunked-array.R b/r/tests/testthat/test-chunked-array.R index 792e140c137..a5ff6ef4812 100644 --- a/r/tests/testthat/test-chunked-array.R +++ b/r/tests/testthat/test-chunked-array.R @@ -220,23 +220,23 @@ test_that("chunked_array() supports the type= argument. conversion from INTSXP a }) test_that("ChunkedArray$create() aborts on overflow", { - expect_error(chunked_array(128L, type = int8())$type, "Invalid.*Value is too large") - expect_error(chunked_array(-129L, type = int8())$type, "Invalid.*Value is too large") + expect_error(chunked_array(128L, type = int8())$type) + expect_error(chunked_array(-129L, type = int8())$type) - expect_error(chunked_array(256L, type = uint8())$type, "Invalid.*Value is too large") - expect_error(chunked_array(-1L, type = uint8())$type, "Invalid.*Value is too large") + expect_error(chunked_array(256L, type = uint8())$type) + expect_error(chunked_array(-1L, type = uint8())$type) - expect_error(chunked_array(32768L, type = int16())$type, "Invalid.*Value is too large") - expect_error(chunked_array(-32769L, type = int16())$type, "Invalid.*Value is too large") + expect_error(chunked_array(32768L, type = int16())$type) + expect_error(chunked_array(-32769L, type = int16())$type) - expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*Value is too large") - expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*Value is too large") + expect_error(chunked_array(65536L, type = uint16())$type) + expect_error(chunked_array(-1L, type = uint16())$type) - expect_error(chunked_array(65536L, type = uint16())$type, "Invalid.*Value is too large") - expect_error(chunked_array(-1L, type = uint16())$type, "Invalid.*Value is too large") + expect_error(chunked_array(65536L, type = uint16())$type) + expect_error(chunked_array(-1L, type = uint16())$type) - expect_error(chunked_array(bit64::as.integer64(2^31), type = int32()), "Invalid.*Value is too large") - expect_error(chunked_array(bit64::as.integer64(2^32), type = uint32()), "Invalid.*Value is too large") + expect_error(chunked_array(bit64::as.integer64(2^31), type = int32())) + expect_error(chunked_array(bit64::as.integer64(2^32), type = uint32())) }) test_that("chunked_array() convert doubles to integers", {