diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 8477f949f1b..b59aa12dac7 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1364,6 +1364,10 @@ ExportRecordBatchReader <- function(reader, stream_ptr){ invisible(.Call(`_arrow_ExportRecordBatchReader`, reader, stream_ptr)) } +Table__from_dots <- function(lst, schema_sxp, use_threads){ + .Call(`_arrow_Table__from_dots`, lst, schema_sxp, use_threads) +} + vec_to_arrow <- function(x, s_type){ .Call(`_arrow_vec_to_arrow`, x, s_type) } @@ -1704,10 +1708,6 @@ Table__from_record_batches <- function(batches, schema_sxp){ .Call(`_arrow_Table__from_record_batches`, batches, schema_sxp) } -Table__from_dots <- function(lst, schema_sxp){ - .Call(`_arrow_Table__from_dots`, lst, schema_sxp) -} - GetCpuThreadPoolCapacity <- function(){ .Call(`_arrow_GetCpuThreadPoolCapacity`) } diff --git a/r/R/table.R b/r/R/table.R index 2c432ac8983..8f9fee065f2 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -168,17 +168,17 @@ Table$create <- function(..., schema = NULL) { names(dots) <- rep_len("", length(dots)) } stopifnot(length(dots) > 0) - + # Preserve any grouping if (length(dots) == 1 && inherits(dots[[1]], "grouped_df")) { - out <- Table__from_dots(dots, schema) + out <- Table__from_dots(dots, schema, option_use_threads()) return(dplyr::group_by(out, !!!dplyr::groups(dots[[1]]))) } - + if (all_record_batches(dots)) { Table__from_record_batches(dots, schema) } else { - Table__from_dots(dots, schema) + Table__from_dots(dots, schema, option_use_threads()) } } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 1ccfd593d2c..b25dd5a2a0d 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5352,6 +5352,23 @@ extern "C" SEXP _arrow_ExportRecordBatchReader(SEXP reader_sexp, SEXP stream_ptr } #endif +// r_to_arrow.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, bool use_threads); +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp, SEXP use_threads_sexp){ +BEGIN_CPP11 + arrow::r::Input::type lst(lst_sexp); + arrow::r::Input::type schema_sxp(schema_sxp_sexp); + arrow::r::Input::type use_threads(use_threads_sexp); + return cpp11::as_sexp(Table__from_dots(lst, schema_sxp, use_threads)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp, SEXP use_threads_sexp){ + Rf_error("Cannot call Table__from_dots(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // r_to_arrow.cpp #if defined(ARROW_R_WITH_ARROW) SEXP vec_to_arrow(SEXP x, SEXP s_type); @@ -6696,22 +6713,6 @@ extern "C" SEXP _arrow_Table__from_record_batches(SEXP batches_sexp, SEXP schema } #endif -// table.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ -BEGIN_CPP11 - arrow::r::Input::type lst(lst_sexp); - arrow::r::Input::type schema_sxp(schema_sxp_sexp); - return cpp11::as_sexp(Table__from_dots(lst, schema_sxp)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ - Rf_error("Cannot call Table__from_dots(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // threadpool.cpp #if defined(ARROW_R_WITH_ARROW) int GetCpuThreadPoolCapacity(); @@ -7165,6 +7166,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, { "_arrow_ExportRecordBatch", (DL_FUNC) &_arrow_ExportRecordBatch, 3}, { "_arrow_ExportRecordBatchReader", (DL_FUNC) &_arrow_ExportRecordBatchReader, 2}, + { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 3}, { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, @@ -7250,7 +7252,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_Table__SelectColumns", (DL_FUNC) &_arrow_Table__SelectColumns, 2}, { "_arrow_all_record_batches", (DL_FUNC) &_arrow_all_record_batches, 1}, { "_arrow_Table__from_record_batches", (DL_FUNC) &_arrow_Table__from_record_batches, 2}, - { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, { "_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 5aa26eebd71..ca4ca9519c3 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -149,6 +149,14 @@ void TraverseDots(cpp11::list dots, int num_fields, Lambda lambda) { } } +inline cpp11::writable::list FlattenDots(cpp11::list dots, int num_fields) { + std::vector out(num_fields); + auto set = [&](int j, SEXP x, cpp11::r_string) { out[j] = x; }; + TraverseDots(dots, num_fields, set); + + return cpp11::writable::list(out.begin(), out.end()); +} + arrow::Status InferSchemaFromDots(SEXP lst, SEXP schema_sxp, int num_fields, std::shared_ptr& schema); diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 3df5db87efa..a8d2256cfe3 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -22,7 +22,6 @@ #include #include #include - #include // [[arrow::export]] diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 0ab9718da26..d0f4f3a6def 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -25,10 +25,14 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include namespace arrow { @@ -46,6 +50,90 @@ using internal::MakeConverter; namespace r { +class RTasks { + public: + using Task = internal::FnOnce; + + explicit RTasks(bool use_threads) + : use_threads_(use_threads), + stop_source_(), + parallel_tasks_( + use_threads ? arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()) + : nullptr) {} + + // This Finish() method must never be called from a thread pool thread + // as this would deadlock. + // + // Usage is to : + // - create an RTasks instance on the main thread + // - add some tasks with .Append() + // - and then call .Finish() so that the parallel tasks are finished + Status Finish() { + Status status = Status::OK(); + + // run the delayed tasks now + for (auto& task : delayed_serial_tasks_) { + status &= std::move(task)(); + } + + // then wait for the parallel tasks to finish + if (use_threads_) { + status &= parallel_tasks_->Finish(); + } + + return status; + } + + void Append(bool parallel, Task&& task) { + StoppingTask stopping_task(stop_source_, std::move(task)); + if (parallel && use_threads_) { + parallel_tasks_->Append(std::move(stopping_task)); + } else { + delayed_serial_tasks_.push_back(std::move(stopping_task)); + } + } + + void Reset() { + delayed_serial_tasks_.clear(); + + stop_source_.Reset(); + if (use_threads_) { + parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()); + } + } + + bool use_threads_; + StopSource stop_source_; + std::shared_ptr parallel_tasks_; + std::vector delayed_serial_tasks_; + + private: + class StoppingTask { + public: + StoppingTask(StopSource stop_source, Task&& task) : task_(std::move(task)) {} + + Status operator()() { + Status status; + StopToken token = stop_source_.token(); + if (token.IsStopRequested()) { + status &= token.Poll(); + } else { + Status status = std::move(task_)(); + if (!status.ok()) { + stop_source_.RequestStop(); + } + } + return status; + } + + private: + StopSource stop_source_; + Task task_; + }; +}; + struct RConversionOptions { RConversionOptions() = default; @@ -168,46 +256,85 @@ bool is_NA(int64_t value) { } template -struct RVectorVisitor { +class RVectorIterator { + public: + using value_type = T; + RVectorIterator(SEXP x, int64_t start) + : ptr_x_(reinterpret_cast(DATAPTR_RO(x)) + start) {} + + RVectorIterator& operator++() { + ++ptr_x_; + return *this; + } + + const T operator*() const { return *ptr_x_; } + + private: + const T* ptr_x_; +}; + +template +class RVectorIterator_ALTREP { + public: + using value_type = T; using data_type = typename std::conditional::value, double, T>::type; using r_vector_type = cpp11::r_vector; + using r_vector_iterator = typename r_vector_type::const_iterator; - template - static Status Visit(SEXP x, int64_t size, AppendNull&& append_null, - AppendValue&& append_value) { - r_vector_type values(x); - auto it = values.begin(); - - for (R_xlen_t i = 0; i < size; i++, ++it) { - auto value = GetValue(*it); + RVectorIterator_ALTREP(SEXP x, int64_t start) + : vector_(x), it_(vector_.begin() + start) {} - if (is_NA(value)) { - RETURN_NOT_OK(append_null()); - } else { - RETURN_NOT_OK(append_value(value)); - } - } - - return Status::OK(); + RVectorIterator_ALTREP& operator++() { + ++it_; + return *this; } + const T operator*() const { return GetValue(*it_); } + static T GetValue(data_type x) { return x; } + + private: + r_vector_type vector_; + r_vector_iterator it_; }; template <> -int64_t RVectorVisitor::GetValue(double x) { +int64_t RVectorIterator_ALTREP::GetValue(double x) { int64_t value; memcpy(&value, &x, sizeof(int64_t)); return value; } +template +Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null, + AppendValue&& append_value) { + for (R_xlen_t i = 0; i < n; i++, ++it) { + auto value = *it; + + if (is_NA(value)) { + RETURN_NOT_OK(append_null()); + } else { + RETURN_NOT_OK(append_value(value)); + } + } + + return Status::OK(); +} + class RConverter : public Converter { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } virtual Status Extend(SEXP values, int64_t size) { - return Status::NotImplemented("ExtendMasked"); + return Status::NotImplemented("Extend"); + } + + // by default, just delay the ->Extend(), i.e. not run in parallel + // implementations might redefine so that ->Extend() is run in parallel + virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(false, task); } virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { @@ -312,6 +439,7 @@ class RPrimitiveConverter> } }; +// TODO: extend this to BooleanType, but this needs some work in RConvert template class RPrimitiveConverter< T, enable_if_t::value || is_floating_type::value>> @@ -321,13 +449,13 @@ class RPrimitiveConverter< auto rtype = GetVectorType(x); switch (rtype) { case UINT8: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case INT32: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case FLOAT64: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case INT64: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); default: break; @@ -336,83 +464,49 @@ class RPrimitiveConverter< return Status::Invalid("cannot convert"); } - private: - template - Status AppendRangeLoopDifferentType(SEXP x, int64_t size) { - RETURN_NOT_OK(this->Reserve(size)); - - auto append_value = [this](r_value_type value) { - ARROW_ASSIGN_OR_RAISE(auto converted, - RConvert::Convert(this->primitive_type_, value)); - this->primitive_builder_->UnsafeAppend(converted); - return Status::OK(); - }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); } + private: template - Status AppendRangeSameTypeNotALTREP(SEXP x, int64_t size) { - auto p = reinterpret_cast(DATAPTR_RO(x)); - auto p_end = p + size; - - auto first_na = std::find_if(p, p_end, is_NA); - - if (first_na == p_end) { - // no nulls, so we can use AppendValues() directly - return this->primitive_builder_->AppendValues(p, p_end); - } - - // Append all values up until the first NULL - RETURN_NOT_OK(this->primitive_builder_->AppendValues(p, first_na)); - - // loop for the remaining - RETURN_NOT_OK(this->primitive_builder_->Reserve(p_end - first_na)); - p = first_na; - for (; p < p_end; ++p) { - r_value_type value = *p; - if (is_NA(value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - this->primitive_builder_->UnsafeAppend(value); - } + Status ExtendDispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { + // `x` is an ALTREP R vector storing `r_value_type` + // and that type matches exactly the type of the array this is building + return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + } else { + // `x` is not an ALTREP vector so we have direct access to a range of values + return Extend_impl(RVectorIterator(x, 0), size); } - return Status::OK(); } - template - Status AppendRangeSameTypeALTREP(SEXP x, int64_t size) { - // if it is altrep, then we use cpp11 looping - // without needing to convert + template + Status Extend_impl(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); - typename RVectorVisitor::r_vector_type vec(x); - auto it = vec.begin(); - for (R_xlen_t i = 0; i < size; i++, ++it) { - r_value_type value = RVectorVisitor::GetValue(*it); - if (is_NA(value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - this->primitive_builder_->UnsafeAppend(value); - } - } - return Status::OK(); - } - template - Status AppendRangeDispatch(SEXP x, int64_t size) { + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + if (std::is_same::value) { - if (!ALTREP(x)) { - return AppendRangeSameTypeNotALTREP(x, size); - } else { - return AppendRangeSameTypeALTREP(x, size); - } + auto append_value = [this](r_value_type value) { + this->primitive_builder_->UnsafeAppend(value); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); + } else { + auto append_value = [this](r_value_type value) { + ARROW_ASSIGN_OR_RAISE(auto converted, + RConvert::Convert(this->primitive_type_, value)); + this->primitive_builder_->UnsafeAppend(converted); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); } - - // here if underlying types differ so going - return AppendRangeLoopDifferentType(x, size); } }; @@ -425,17 +519,33 @@ class RPrimitiveConverter::value>> if (rtype != BOOLEAN) { return Status::Invalid("Expecting a logical vector"); } + + if (ALTREP(x)) { + return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + } else { + return Extend_impl(RVectorIterator(x, 0), size); + } + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } + + private: + template + Status Extend_impl(Iterator it, int64_t size) { RETURN_NOT_OK(this->Reserve(size)); - auto append_value = [this](cpp11::r_bool value) { - this->primitive_builder_->UnsafeAppend(value == 1); - return Status::OK(); - }; auto append_null = [this]() { this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + auto append_value = [this](cpp11::r_bool value) { + this->primitive_builder_->UnsafeAppend(value == 1); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); } }; @@ -444,17 +554,15 @@ class RPrimitiveConverter::value>> : public PrimitiveConverter { public: Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); - switch (GetVectorType(x)) { case DATE_INT: - return AppendRange_Date(x, size); + return AppendRange_Date_dispatch(x, size); case DATE_DBL: - return AppendRange_Date(x, size); + return AppendRange_Date_dispatch(x, size); case POSIXCT: - return AppendRange_Posixct(x, size); + return AppendRange_Posixct_dispatch(x, size); default: break; @@ -463,9 +571,26 @@ class RPrimitiveConverter::value>> return Status::Invalid("cannot convert to date type "); } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } + private: template - Status AppendRange_Date(SEXP x, int64_t size) { + Status AppendRange_Date_dispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { + return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); + } else { + return AppendRange_Date(RVectorIterator(x, 0), size); + } + } + + template + Status AppendRange_Date(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; + RETURN_NOT_OK(this->Reserve(size)); + auto append_null = [this]() { this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); @@ -474,21 +599,31 @@ class RPrimitiveConverter::value>> this->primitive_builder_->UnsafeAppend(FromRDate(this->primitive_type_, value)); return Status::OK(); }; + return VisitVector(it, size, append_null, append_value); + } - return RVectorVisitor::Visit(x, size, append_null, append_value); + Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { + return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); + } else { + return AppendRange_Posixct(RVectorIterator(x, 0), size); + } } - Status AppendRange_Posixct(SEXP x, int64_t size) { + template + Status AppendRange_Posixct(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; + RETURN_NOT_OK(this->Reserve(size)); + auto append_null = [this]() { this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); }; - auto append_value = [this](double value) { + auto append_value = [this](r_value_type value) { this->primitive_builder_->UnsafeAppend(FromPosixct(this->primitive_type_, value)); return Status::OK(); }; - - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(it, size, append_null, append_value); } static int FromRDate(const Date32Type*, int from) { return from; } @@ -553,16 +688,27 @@ class RPrimitiveConverter::value>> auto multiplier = get_TimeUnit_multiplier(this->primitive_type_->unit()) * difftime_multiplier; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; auto append_value = [this, multiplier](double value) { auto converted = static_cast(value * multiplier); this->primitive_builder_->UnsafeAppend(converted); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + if (ALTREP(x)) { + return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + append_value); + } else { + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -589,7 +735,18 @@ class RPrimitiveConverter::value>> this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + if (ALTREP(x)) { + return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + append_value); + } else { + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -633,17 +790,23 @@ class RPrimitiveConverter> RETURN_NOT_OK(this->Reserve(size)); RETURN_NOT_OK(check_binary(x, size)); + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](SEXP raw) { R_xlen_t n = XLENGTH(raw); ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -655,6 +818,11 @@ class RPrimitiveConverter::v RETURN_NOT_OK(this->Reserve(size)); RETURN_NOT_OK(check_binary(x, size)); + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](SEXP raw) { R_xlen_t n = XLENGTH(raw); @@ -665,11 +833,12 @@ class RPrimitiveConverter::v this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -680,33 +849,41 @@ class RPrimitiveConverter> using OffsetType = typename T::offset_type; Status Extend(SEXP x, int64_t size) override { - int64_t start = 0; RVectorType rtype = GetVectorType(x); if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine this., e.g. extract setup from Extend() + tasks.Append(false, std::move(task)); + } - cpp11::strings s(arrow::r::utf8_strings(x)); + private: + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); - auto it = s.begin() + start; + const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get // a definite size and then use UnsafeAppend*() int64_t total_length = 0; - for (R_xlen_t i = 0; i < size; i++, ++it) { - cpp11::r_string si = *it; - total_length += cpp11::is_na(si) ? 0 : si.size(); + for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + SEXP si = *p_strings; + total_length += si == NA_STRING ? 0 : LENGTH(si); } RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); // append - it = s.begin() + start; - for (R_xlen_t i = 0; i < size; i++, ++it) { - cpp11::r_string si = *it; + p_strings = reinterpret_cast(DATAPTR_RO(s)); + for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + SEXP si = *p_strings; if (si == NA_STRING) { this->primitive_builder_->UnsafeAppendNull(); } else { - this->primitive_builder_->UnsafeAppend(CHAR(si), si.size()); + this->primitive_builder_->UnsafeAppend(CHAR(si), LENGTH(si)); } } @@ -746,25 +923,24 @@ class RDictionaryConverter> using BuilderType = DictionaryBuilder; Status Extend(SEXP x, int64_t size) override { - // first we need to handle the levels - cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol)); - auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false); - RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); + RETURN_NOT_OK(ExtendSetup(x, size)); + return ExtendImpl(x, size, GetCharLevels(x)); + } - // then we can proceed - RETURN_NOT_OK(this->Reserve(size)); + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + // the setup runs synchronously first + Status setup = ExtendSetup(values, size); - RVectorType rtype = GetVectorType(x); - if (rtype != FACTOR) { - return Status::Invalid("invalid R type to convert to dictionary"); - } + if (!setup.ok()) { + // if that fails, propagate the error + tasks.Append(false, [setup]() { return setup; }); + } else { + auto char_levels = GetCharLevels(values); - auto append_value = [this, levels](int value) { - SEXP s = STRING_ELT(levels, value - 1); - return this->value_builder_->Append(CHAR(s)); - }; - auto append_null = [this]() { return this->value_builder_->AppendNull(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + tasks.Append(true, [this, values, size, char_levels]() { + return this->ExtendImpl(values, size, char_levels); + }); + } } Result> ToArray() override { @@ -780,6 +956,44 @@ class RDictionaryConverter> return std::make_shared(result->data()); } + + private: + std::vector GetCharLevels(SEXP x) { + SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); + R_xlen_t n_levels = XLENGTH(levels); + std::vector char_levels(XLENGTH(levels)); + const SEXP* p_levels = reinterpret_cast(DATAPTR_RO(levels)); + for (R_xlen_t i = 0; i < n_levels; i++, ++p_levels) { + char_levels[i] = CHAR(*p_levels); + } + + return char_levels; + } + + Status ExtendSetup(SEXP x, int64_t size) { + RVectorType rtype = GetVectorType(x); + if (rtype != FACTOR) { + return Status::Invalid("invalid R type to convert to dictionary"); + } + + // first we need to handle the levels + SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); + auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false); + RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); + + // then we can proceed + return this->Reserve(size); + } + + Status ExtendImpl(SEXP values, int64_t size, + const std::vector& char_levels) { + auto append_null = [this]() { return this->value_builder_->AppendNull(); }; + auto append_value = [this, &char_levels](int value) { + return this->value_builder_->Append(char_levels[value - 1]); + }; + + return VisitVector(RVectorIterator(values, 0), size, append_null, append_value); + } }; template @@ -808,15 +1022,28 @@ class RListConverter : public ListConverter { return Status::Invalid("Cannot convert to list type"); } + auto append_null = [this]() { return this->list_builder_->AppendNull(); }; + auto append_value = [this](SEXP value) { + // TODO: if we decide that this can be run concurrently + // we'll have to do vec_size() upfront int n = vctrs::vec_size(value); RETURN_NOT_OK(this->list_builder_->ValidateOverflow(n)); RETURN_NOT_OK(this->list_builder_->Append()); return this->value_converter_.get()->Extend(value, n); }; - auto append_null = [this]() { return this->list_builder_->AppendNull(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + // NOTE: because Extend::[]append_value() calls Extend() on the + // value converter, which might require a setup step, it feels + // complicated to run this task concurrently. + // + // TODO: perhaps allow running concurrently in some cases, e.g. list(int32(!altrep)) + tasks.Append(false, [this, values, size]() { return this->Extend(values, size); }); } }; @@ -830,6 +1057,45 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(ExtendSetup(x, size)); + + auto fields = this->struct_type_->fields(); + R_xlen_t n_columns = XLENGTH(x); + for (R_xlen_t i = 0; i < n_columns; i++) { + auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); + if (!status.ok()) { + return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), + "): ", status.ToString()); + } + } + + return Status::OK(); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + // the setup runs synchronously first + Status setup = ExtendSetup(values, size); + + if (!setup.ok()) { + // if that fails, propagate the error + tasks.Append(false, [setup]() { return setup; }); + } else { + // otherwise deal with each column, maybe concurrently + auto fields = this->struct_type_->fields(); + R_xlen_t n_columns = XLENGTH(values); + + for (R_xlen_t i = 0; i < n_columns; i++) { + children_[i]->DelayedExtend(VECTOR_ELT(values, i), size, tasks); + } + } + } + + protected: + Status Init(MemoryPool* pool) override { + return StructConverter::Init(pool); + } + + Status ExtendSetup(SEXP x, int64_t size) { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -860,15 +1126,6 @@ class RStructConverter : public StructConverter { return Status::OK(); })); - for (R_xlen_t i = 0; i < n_columns; i++) { - std::string name(x_names[i]); - if (name != fields[i]->name()) { - return Status::RError( - "Field name in position ", i, " (", fields[i]->name(), - ") does not match the name of the column of the data frame (", name, ")"); - } - } - for (R_xlen_t i = 0; i < n_columns; i++) { SEXP x_i = VECTOR_ELT(x, i); if (vctrs::vec_size(x_i) < size) { @@ -882,21 +1139,8 @@ class RStructConverter : public StructConverter { RETURN_NOT_OK(struct_builder_->Append()); } - for (R_xlen_t i = 0; i < n_columns; i++) { - auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); - if (!status.ok()) { - return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), - "): ", status.ToString()); - } - } - return Status::OK(); } - - protected: - Status Init(MemoryPool* pool) override { - return StructConverter::Init(pool); - } }; template <> @@ -992,6 +1236,37 @@ std::shared_ptr vec_to_arrow__reuse_memory(SEXP x) { cpp11::stop("Unreachable: you might need to fix can_reuse_memory()"); } +Status vector_to_Array(SEXP x, const std::shared_ptr& type, + bool type_inferred, + std::shared_ptr& task_group, + std::shared_ptr& out) { + // short circuit if `x` is already an Array + if (Rf_inherits(x, "Array")) { + out = cpp11::as_cpp>(x); + return Status::OK(); + } + + RConversionOptions options; + options.strict = !type_inferred; + options.type = type; + options.size = vctrs::vec_size(x); + + // maybe short circuit when zero-copy is possible + if (can_reuse_memory(x, options.type)) { + out = vec_to_arrow__reuse_memory(x); + return Status::OK(); + } + + // otherwise go through the converter api + auto converter = ValueOrStop(MakeConverter( + options.type, options, gc_memory_pool())); + + RETURN_NOT_OK(converter->Extend(x, options.size)); + ARROW_ASSIGN_OR_RAISE(out, converter->ToArray()); + + return Status::OK(); +} + std::shared_ptr vec_to_arrow(SEXP x, const std::shared_ptr& type, bool type_inferred) { @@ -1015,12 +1290,230 @@ std::shared_ptr vec_to_arrow(SEXP x, options.type, options, gc_memory_pool())); StopIfNotOk(converter->Extend(x, options.size)); + return ValueOrStop(converter->ToArray()); } +// TODO: most of this is very similar to MakeSimpleArray, just adapted to +// leverage concurrency. Maybe some refactoring needed. +template +bool vector_from_r_memory_impl(SEXP x, const std::shared_ptr& type, + std::vector>& columns, + int j, RTasks& tasks) { + RVector vec(x); + using value_type = typename arrow::TypeTraits::ArrayType::value_type; + auto buffer = std::make_shared>(vec); + + tasks.Append(true, [buffer, x, &columns, j]() { + std::vector> buffers{nullptr, buffer}; + + auto n = XLENGTH(x); + auto p_x_start = reinterpret_cast(DATAPTR_RO(x)); + auto p_x_end = p_x_start + n; + + int null_count = 0; + auto first_na = std::find_if(p_x_start, p_x_end, is_NA); + + if (first_na < p_x_end) { + auto null_bitmap = + ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); + internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); + + // first loop to clear all the bits before the first NA + auto k = std::distance(p_x_start, first_na); + int i = 0; + for (; i < k; i++, bitmap_writer.Next()) { + bitmap_writer.Set(); + } + + auto p_vec = first_na; + // then finish + for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { + if (is_NA(*p_vec)) { + bitmap_writer.Clear(); + null_count++; + } else { + bitmap_writer.Set(); + } + } + + bitmap_writer.Finish(); + buffers[0] = std::move(null_bitmap); + } + + auto data = ArrayData::Make(std::make_shared(), n, std::move(buffers), + null_count, 0 /*offset*/); + auto array = std::make_shared::ArrayType>(data); + columns[j] = std::make_shared(array); + + return Status::OK(); + }); + + return true; +} + +bool vector_from_r_memory(SEXP x, const std::shared_ptr& type, + std::vector>& columns, + int j, RTasks& tasks) { + if (ALTREP(x)) return false; + + switch (type->id()) { + case Type::INT32: + return TYPEOF(x) == INTSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::DOUBLE: + return TYPEOF(x) == REALSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::UINT8: + return TYPEOF(x) == RAWSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::INT64: + return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64") && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + default: + break; + } + + return false; +} + } // namespace r } // namespace arrow +arrow::Status check_consistent_column_length( + const std::vector>& columns) { + if (columns.size()) { + int64_t num_rows = columns[0]->length(); + + for (const auto& column : columns) { + if (column->length() != num_rows) { + return arrow::Status::Invalid("All columns must have the same length"); + } + } + } + + return arrow::Status::OK(); +} + +// [[arrow::export]] +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, + bool use_threads) { + bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); + + int num_fields; + StopIfNotOk(arrow::r::count_fields(lst, &num_fields)); + + // schema + metadata + std::shared_ptr schema; + StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); + StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); + + if (!infer_schema && schema->num_fields() != num_fields) { + cpp11::stop("incompatible. schema has %d fields, and %d columns are supplied", + schema->num_fields(), num_fields); + } + + // table + std::vector> columns(num_fields); + + if (!infer_schema) { + auto check_name = [&](int j, SEXP, cpp11::r_string name) { + std::string cpp_name(name); + if (schema->field(j)->name() != cpp_name) { + cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, + schema->field(j)->name().c_str(), cpp_name.c_str()); + } + }; + arrow::r::TraverseDots(lst, num_fields, check_name); + } + + // must be careful to avoid R stop() until the tasks + // are finished, i.e. after tasks.Finish() + arrow::r::RTasks tasks(use_threads); + + arrow::Status status = arrow::Status::OK(); + + auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); + std::vector> converters(num_fields); + + // init converters + for (int j = 0; j < num_fields && status.ok(); j++) { + SEXP x = flatten_lst[j]; + + if (Rf_inherits(x, "ChunkedArray")) { + columns[j] = cpp11::as_cpp>(x); + } else if (Rf_inherits(x, "Array")) { + columns[j] = std::make_shared( + cpp11::as_cpp>(x)); + } else { + arrow::r::RConversionOptions options; + options.strict = !infer_schema; + options.type = schema->field(j)->type(); + options.size = vctrs::vec_size(x); + + // first try to add a task to do a zero copy in parallel + if (arrow::r::vector_from_r_memory(x, options.type, columns, j, tasks)) { + continue; + } + + // if unsuccessful: use RConverter api + auto converter_result = + arrow::MakeConverter( + options.type, options, gc_memory_pool()); + if (!converter_result.ok()) { + status = converter_result.status(); + break; + } + converters[j] = std::move(converter_result.ValueUnsafe()); + } + } + + // if the previous loop didn't break early, spawn + // tasks to Extend, maybe in parallel + if (status.ok()) { + for (int j = 0; j < num_fields; j++) { + auto& converter = converters[j]; + if (converter != nullptr) { + converter->DelayedExtend(flatten_lst[j], converter->options().size, tasks); + } + } + } + + // in any case, this needs to wait until all tasks are finished + status &= tasks.Finish(); + + // nothing is running in parallel here, so we have an opportunity to stop + StopIfNotOk(status); + + // then finally convert to chunked arrays in parallel + tasks.Reset(); + + for (int j = 0; j < num_fields; j++) { + tasks.Append(true, [&columns, j, &converters]() { + auto& converter = converters[j]; + if (converter != nullptr) { + ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); + columns[j] = std::make_shared(array); + } + return arrow::Status::OK(); + }); + } + status &= tasks.Finish(); + StopIfNotOk(status); + + status &= check_consistent_column_length(columns); + StopIfNotOk(status); + + return arrow::Table::Make(schema, columns); +} + // [[arrow::export]] SEXP vec_to_arrow(SEXP x, SEXP s_type) { if (Rf_inherits(x, "Array")) return x; diff --git a/r/src/table.cpp b/r/src/table.cpp index 997d8f137cb..68adefcfd4a 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -150,21 +150,6 @@ std::shared_ptr Table__SelectColumns( namespace arrow { namespace r { -arrow::Status check_consistent_column_length( - const std::vector>& columns) { - if (columns.size()) { - int64_t num_rows = columns[0]->length(); - - for (const auto& column : columns) { - if (column->length() != num_rows) { - return arrow::Status::Invalid("All columns must have the same length"); - } - } - } - - return arrow::Status::OK(); -} - arrow::Status InferSchemaFromDots(SEXP lst, SEXP schema_sxp, int num_fields, std::shared_ptr& schema) { // maybe a schema was given @@ -269,33 +254,6 @@ arrow::Status AddMetadataFromDots(SEXP lst, int num_fields, return arrow::Status::OK(); } -arrow::Status CollectTableColumns( - SEXP lst, const std::shared_ptr& schema, int num_fields, bool inferred, - std::vector>& columns) { - if (!inferred && schema->num_fields() != num_fields) { - cpp11::stop("incompatible. schema has %d fields, and %d columns are supplied", - schema->num_fields(), num_fields); - } - auto extract_one_column = [&columns, &schema, inferred](int j, SEXP x, - std::string name) { - if (!inferred && schema->field(j)->name() != name) { - cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, - schema->field(j)->name().c_str(), name.c_str()); - } - if (Rf_inherits(x, "ChunkedArray")) { - columns[j] = cpp11::as_cpp>(x); - } else if (Rf_inherits(x, "Array")) { - columns[j] = std::make_shared( - cpp11::as_cpp>(x)); - } else { - auto array = arrow::r::vec_to_arrow(x, schema->field(j)->type(), inferred); - columns[j] = std::make_shared(array); - } - }; - arrow::r::TraverseDots(lst, num_fields, extract_one_column); - return arrow::Status::OK(); -} - } // namespace r } // namespace arrow @@ -325,26 +283,4 @@ std::shared_ptr Table__from_record_batches( return tab; } -// [[arrow::export]] -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { - bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); - - int num_fields; - StopIfNotOk(arrow::r::count_fields(lst, &num_fields)); - - // schema + metadata - std::shared_ptr schema; - StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); - StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); - - // table - std::vector> columns(num_fields); - StopIfNotOk( - arrow::r::CollectTableColumns(lst, schema, num_fields, infer_schema, columns)); - - StopIfNotOk(arrow::r::check_consistent_column_length(columns)); - - return arrow::Table::Make(schema, columns); -} - #endif diff --git a/r/src/type_infer.cpp b/r/src/type_infer.cpp index 93e51be6462..022a29ea5b2 100644 --- a/r/src/type_infer.cpp +++ b/r/src/type_infer.cpp @@ -179,7 +179,7 @@ std::shared_ptr InferArrowType(SEXP x) { case REALSXP: return InferArrowTypeFromVector(x); case RAWSXP: - return int8(); + return uint8(); case STRSXP: return InferArrowTypeFromVector(x); case VECSXP: diff --git a/r/tests/testthat/test-type.R b/r/tests/testthat/test-type.R index 56cef722556..343170edb82 100644 --- a/r/tests/testthat/test-type.R +++ b/r/tests/testthat/test-type.R @@ -31,7 +31,7 @@ test_that("type() infers from R type", { expect_type_equal(type(1:10), int32()) expect_type_equal(type(1), float64()) expect_type_equal(type(TRUE), boolean()) - expect_type_equal(type(raw()), int8()) + expect_type_equal(type(raw()), uint8()) expect_type_equal(type(""), utf8()) expect_type_equal( type(example_data$fct),