From 76dc1eb75d8375224b0b2d9691b97003444b8bab Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 3 Jun 2021 15:24:57 +0200 Subject: [PATCH 01/12] simplify RTasks as hinted by @westonpace in https://github.com/apache/arrow/pull/9615#discussion_r644193980 --- r/src/r_to_arrow.cpp | 33 ++++++--------------------------- 1 file changed, 6 insertions(+), 27 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 683e8f278e8..aa40c600521 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -75,6 +75,10 @@ class RTasks { // run the delayed tasks now for (auto& task : delayed_serial_tasks_) { status &= std::move(task)(); + if (!status.ok()) { + stop_source_.RequestStop(); + break; + } } // then wait for the parallel tasks to finish @@ -86,11 +90,10 @@ class RTasks { } void Append(bool parallel, Task&& task) { - StoppingTask stopping_task(stop_source_, std::move(task)); if (parallel && use_threads_) { - parallel_tasks_->Append(std::move(stopping_task)); + parallel_tasks_->Append(std::move(task)); } else { - delayed_serial_tasks_.push_back(std::move(stopping_task)); + delayed_serial_tasks_.push_back(std::move(task)); } } @@ -108,30 +111,6 @@ class RTasks { StopSource stop_source_; std::shared_ptr parallel_tasks_; std::vector delayed_serial_tasks_; - - private: - class StoppingTask { - public: - StoppingTask(StopSource stop_source, Task&& task) : task_(std::move(task)) {} - - Status operator()() { - Status status; - StopToken token = stop_source_.token(); - if (token.IsStopRequested()) { - status &= token.Poll(); - } else { - Status status = std::move(task_)(); - if (!status.ok()) { - stop_source_.RequestStop(); - } - } - return status; - } - - private: - StopSource stop_source_; - Task task_; - }; }; struct RConversionOptions { From f81109aa199c98df0e0437db8572693e795ce17d Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 21 Jun 2021 10:36:49 +0200 Subject: [PATCH 02/12] Move declaration of RTasks to arrow-types.h --- r/src/arrow_types.h | 26 +++++++++++++ r/src/r_to_arrow.cpp | 91 ++++++++++++++++++-------------------------- 2 files changed, 63 insertions(+), 54 deletions(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 68e1c8659c4..a9f3ad0ae3c 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -45,6 +45,8 @@ #include #include #include +#include +#include #include #if defined(ARROW_R_WITH_PARQUET) @@ -171,6 +173,30 @@ SEXP MakeInt32ArrayNoNull(const std::shared_ptr& array); SEXP MakeDoubleArrayNoNull(const std::shared_ptr& array); #endif +class RTasks { + public: + using Task = internal::FnOnce; + + explicit RTasks(bool use_threads); + + // This Finish() method must never be called from a thread pool thread + // as this would deadlock. + // + // Usage is to : + // - create an RTasks instance on the main thread + // - add some tasks with .Append() + // - and then call .Finish() so that the parallel tasks are finished + Status Finish(); + void Append(bool parallel, Task&& task); + + void Reset(); + + bool use_threads_; + StopSource stop_source_; + std::shared_ptr parallel_tasks_; + std::vector delayed_serial_tasks_; +}; + } // namespace r } // namespace arrow diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index aa40c600521..52fdedbefc8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -50,68 +50,51 @@ using internal::MakeConverter; namespace r { -class RTasks { - public: - using Task = internal::FnOnce; - - explicit RTasks(bool use_threads) - : use_threads_(use_threads), - stop_source_(), - parallel_tasks_( - use_threads ? arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool(), stop_source_.token()) - : nullptr) {} - - // This Finish() method must never be called from a thread pool thread - // as this would deadlock. - // - // Usage is to : - // - create an RTasks instance on the main thread - // - add some tasks with .Append() - // - and then call .Finish() so that the parallel tasks are finished - Status Finish() { - Status status = Status::OK(); - - // run the delayed tasks now - for (auto& task : delayed_serial_tasks_) { - status &= std::move(task)(); - if (!status.ok()) { - stop_source_.RequestStop(); - break; - } - } - - // then wait for the parallel tasks to finish - if (use_threads_) { - status &= parallel_tasks_->Finish(); +RTasks::RTasks(bool use_threads) + : use_threads_(use_threads), + stop_source_(), + parallel_tasks_(use_threads + ? arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()) + : nullptr) {} + +Status RTasks::Finish() { + Status status = Status::OK(); + + // run the delayed tasks now + for (auto& task : delayed_serial_tasks_) { + status &= std::move(task)(); + if (!status.ok()) { + stop_source_.RequestStop(); + break; } - - return status; } - void Append(bool parallel, Task&& task) { - if (parallel && use_threads_) { - parallel_tasks_->Append(std::move(task)); - } else { - delayed_serial_tasks_.push_back(std::move(task)); - } + // then wait for the parallel tasks to finish + if (use_threads_) { + status &= parallel_tasks_->Finish(); } - void Reset() { - delayed_serial_tasks_.clear(); + return status; +} - stop_source_.Reset(); - if (use_threads_) { - parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool(), stop_source_.token()); - } +void RTasks::Append(bool parallel, RTasks::Task&& task) { + if (parallel && use_threads_) { + parallel_tasks_->Append(std::move(task)); + } else { + delayed_serial_tasks_.push_back(std::move(task)); } +} - bool use_threads_; - StopSource stop_source_; - std::shared_ptr parallel_tasks_; - std::vector delayed_serial_tasks_; -}; +void RTasks::Reset() { + delayed_serial_tasks_.clear(); + + stop_source_.Reset(); + if (use_threads_) { + parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()); + } +} struct RConversionOptions { RConversionOptions() = default; From 810dd22338c2057ca96d44c26eeba2ca2c4513ce Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 21 Jun 2021 10:36:49 +0200 Subject: [PATCH 03/12] Move declaration of RTasks to arrow-types.h From 55b74f95b6239c1477ffec914ff4c4db1f751666 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 21 Jun 2021 10:36:49 +0200 Subject: [PATCH 04/12] Move declaration of RTasks to arrow-types.h From 56e78fecfaa193f73e073df53badcc729d5d9aa9 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 21 Jun 2021 10:53:55 +0200 Subject: [PATCH 05/12] factor out RTasks from r_to_arrow.cpp as it will be also used elsewhere --- r/src/RTasks.cpp | 74 ++++++++++++++++++++++++++++++++++++++++++++ r/src/arrow_types.h | 26 ---------------- r/src/r_task_group.h | 51 ++++++++++++++++++++++++++++++ r/src/r_to_arrow.cpp | 50 ++---------------------------- 4 files changed, 127 insertions(+), 74 deletions(-) create mode 100644 r/src/RTasks.cpp create mode 100644 r/src/r_task_group.h diff --git a/r/src/RTasks.cpp b/r/src/RTasks.cpp new file mode 100644 index 00000000000..25bd944cc62 --- /dev/null +++ b/r/src/RTasks.cpp @@ -0,0 +1,74 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "./r_task_group.h" + +#if defined(ARROW_R_WITH_ARROW) + +namespace arrow { +namespace r { + +RTasks::RTasks(bool use_threads) + : use_threads_(use_threads), + stop_source_(), + parallel_tasks_(use_threads + ? arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()) + : nullptr) {} + +Status RTasks::Finish() { + Status status = Status::OK(); + + // run the delayed tasks now + for (auto& task : delayed_serial_tasks_) { + status &= std::move(task)(); + if (!status.ok()) { + stop_source_.RequestStop(); + break; + } + } + + // then wait for the parallel tasks to finish + if (use_threads_) { + status &= parallel_tasks_->Finish(); + } + + return status; +} + +void RTasks::Append(bool parallel, RTasks::Task&& task) { + if (parallel && use_threads_) { + parallel_tasks_->Append(std::move(task)); + } else { + delayed_serial_tasks_.push_back(std::move(task)); + } +} + +void RTasks::Reset() { + delayed_serial_tasks_.clear(); + + stop_source_.Reset(); + if (use_threads_) { + parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()); + } +} + +} // namespace r +} // namespace arrow + +#endif diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index a9f3ad0ae3c..68e1c8659c4 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -45,8 +45,6 @@ #include #include #include -#include -#include #include #if defined(ARROW_R_WITH_PARQUET) @@ -173,30 +171,6 @@ SEXP MakeInt32ArrayNoNull(const std::shared_ptr& array); SEXP MakeDoubleArrayNoNull(const std::shared_ptr& array); #endif -class RTasks { - public: - using Task = internal::FnOnce; - - explicit RTasks(bool use_threads); - - // This Finish() method must never be called from a thread pool thread - // as this would deadlock. - // - // Usage is to : - // - create an RTasks instance on the main thread - // - add some tasks with .Append() - // - and then call .Finish() so that the parallel tasks are finished - Status Finish(); - void Append(bool parallel, Task&& task); - - void Reset(); - - bool use_threads_; - StopSource stop_source_; - std::shared_ptr parallel_tasks_; - std::vector delayed_serial_tasks_; -}; - } // namespace r } // namespace arrow diff --git a/r/src/r_task_group.h b/r/src/r_task_group.h new file mode 100644 index 00000000000..e1c298b27fc --- /dev/null +++ b/r/src/r_task_group.h @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +namespace arrow { +namespace r { + +class RTasks { + public: + using Task = internal::FnOnce; + + explicit RTasks(bool use_threads); + + // This Finish() method must never be called from a thread pool thread + // as this would deadlock. + // + // Usage is to : + // - create an RTasks instance on the main thread + // - add some tasks with .Append() + // - and then call .Finish() so that the parallel tasks are finished + Status Finish(); + void Append(bool parallel, Task&& task); + + void Reset(); + + bool use_threads_; + StopSource stop_source_; + std::shared_ptr parallel_tasks_; + std::vector delayed_serial_tasks_; +}; + +} // namespace r +} // namespace arrow diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 52fdedbefc8..b4c16211fb5 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -31,8 +31,8 @@ #include #include #include -#include -#include + +#include "./r_task_group.h" namespace arrow { @@ -50,52 +50,6 @@ using internal::MakeConverter; namespace r { -RTasks::RTasks(bool use_threads) - : use_threads_(use_threads), - stop_source_(), - parallel_tasks_(use_threads - ? arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool(), stop_source_.token()) - : nullptr) {} - -Status RTasks::Finish() { - Status status = Status::OK(); - - // run the delayed tasks now - for (auto& task : delayed_serial_tasks_) { - status &= std::move(task)(); - if (!status.ok()) { - stop_source_.RequestStop(); - break; - } - } - - // then wait for the parallel tasks to finish - if (use_threads_) { - status &= parallel_tasks_->Finish(); - } - - return status; -} - -void RTasks::Append(bool parallel, RTasks::Task&& task) { - if (parallel && use_threads_) { - parallel_tasks_->Append(std::move(task)); - } else { - delayed_serial_tasks_.push_back(std::move(task)); - } -} - -void RTasks::Reset() { - delayed_serial_tasks_.clear(); - - stop_source_.Reset(); - if (use_threads_) { - parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool(), stop_source_.token()); - } -} - struct RConversionOptions { RConversionOptions() = default; From c4ea09e906fb8362136182a5f5380e66095ed6c8 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 21 Jun 2021 14:43:17 +0200 Subject: [PATCH 06/12] use RTasks --- r/src/array_to_vector.cpp | 134 ++++++++++++-------------------------- 1 file changed, 40 insertions(+), 94 deletions(-) diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index a8f7191bf18..5f7f47138ef 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -1,7 +1,7 @@ // Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file // distributed with this work for additional information // regarding copyright ownership. The ASF licenses this file +// or more contributor license agreements. See the NOTICE file // to you under the Apache License, Version 2.0 (the // "License"); you may not use this file except in compliance // with the License. You may obtain a copy of the License at @@ -25,12 +25,12 @@ #include #include #include -#include -#include #include #include +#include "./r_task_group.h" + namespace arrow { using internal::checked_cast; @@ -59,44 +59,29 @@ class Converter { R_xlen_t start, R_xlen_t n, size_t chunk_index) const = 0; - // ingest one array - Status IngestOne(SEXP data, const std::shared_ptr& array, R_xlen_t start, - R_xlen_t n, size_t chunk_index) const { - if (array->null_count() == n) { - return Ingest_all_nulls(data, start, n); - } else { - return Ingest_some_nulls(data, array, start, n, chunk_index); - } - } - // can this run in parallel ? virtual bool Parallel() const { return true; } - // Ingest all the arrays serially - Status IngestSerial(SEXP data) { - R_xlen_t k = 0, i = 0; - for (const auto& array : arrays_) { - auto n_chunk = array->length(); - RETURN_NOT_OK(IngestOne(data, array, k, n_chunk, i)); - k += n_chunk; - i++; - } - return Status::OK(); + SEXP LazyConvert(R_xlen_t n, RTasks& tasks) { + SEXP out = PROTECT(Allocate(n)); + IngestAll(out, tasks); + UNPROTECT(1); + return out; } - // ingest the arrays in parallel - // - // for each array, add a task to the task group - // - // The task group is Finish() in the caller - // The converter itself is passed as `self` so that if one of the parallel ops - // hits `stop()`, we don't bail before `tg` is destroyed, which would cause a crash - void IngestParallel(SEXP data, const std::shared_ptr& tg, - std::shared_ptr self) { + void IngestAll(SEXP data, RTasks& tasks) { R_xlen_t k = 0, i = 0; for (const auto& array : arrays_) { auto n_chunk = array->length(); - tg->Append([=] { return self->IngestOne(data, array, k, n_chunk, i); }); + + tasks.Append(Parallel(), [=] { + if (array->null_count() == n_chunk) { + return Ingest_all_nulls(data, k, n_chunk); + } else { + return Ingest_some_nulls(data, array, k, n_chunk, i); + } + }); + k += n_chunk; i++; } @@ -162,9 +147,11 @@ SEXP ArrayVector__as_vector(R_xlen_t n, const std::shared_ptr& type, } #endif + RTasks tasks(false); auto converter = Converter::Make(type, arrays); - SEXP data = PROTECT(converter->Allocate(n)); - StopIfNotOk(converter->IngestSerial(data)); + SEXP data = PROTECT(converter->LazyConvert(n, tasks)); + StopIfNotOk(tasks.Finish()); + UNPROTECT(1); return data; } @@ -1233,54 +1220,23 @@ std::shared_ptr Converter::Make(const std::shared_ptr& type cpp11::stop("cannot handle Array of type ", type->name().c_str()); } -cpp11::writable::list to_dataframe_serial( - int64_t nr, int64_t nc, const cpp11::writable::strings& names, - const std::vector>& converters) { - cpp11::writable::list tbl(nc); - for (int i = 0; i < nc; i++) { - SEXP column = tbl[i] = converters[i]->Allocate(nr); - StopIfNotOk(converters[i]->IngestSerial(column)); - } - tbl.attr(R_NamesSymbol) = names; - tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; - tbl.attr(R_RowNamesSymbol) = arrow::r::short_row_names(nr); - return tbl; -} - -cpp11::writable::list to_dataframe_parallel( +cpp11::writable::list to_dataframe( int64_t nr, int64_t nc, const cpp11::writable::strings& names, - const std::vector>& converters) { + const std::vector& columns, + const std::vector>& types, + bool use_threads) { cpp11::writable::list tbl(nc); - // task group to ingest data in parallel - auto tg = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); + arrow::r::RTasks tasks(use_threads); + std::vector> converters(nc); - // allocate and start ingesting immediately the columns that - // can be ingested in parallel, i.e. when ingestion no longer - // need to happen on the main thread for (int i = 0; i < nc; i++) { - // allocate data for column i - SEXP column = tbl[i] = converters[i]->Allocate(nr); - - // add a task to ingest data of that column if that can be done in parallel - if (converters[i]->Parallel()) { - converters[i]->IngestParallel(column, tg, converters[i]); - } - } + converters[i] = Converter::Make(types[i], columns[i]); - arrow::Status status = arrow::Status::OK(); - - // ingest the columns that cannot be dealt with in parallel - for (int i = 0; i < nc; i++) { - if (!converters[i]->Parallel()) { - status &= converters[i]->IngestSerial(tbl[i]); - } + tbl[i] = converters[i]->LazyConvert(nr, tasks); } - // wait for the ingestion to be finished - status &= tg->Finish(); - - StopIfNotOk(status); + StopIfNotOk(tasks.Finish()); tbl.attr(R_NamesSymbol) = names; tbl.attr(R_ClassSymbol) = arrow::r::data::classes_tbl_df; @@ -1299,10 +1255,6 @@ SEXP Array__as_vector(const std::shared_ptr& array) { // [[arrow::export]] SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array) { - if (chunked_array->num_chunks() == 1) { - return Array__as_vector(chunked_array->chunk(0)); - } - return arrow::r::ArrayVector__as_vector(chunked_array->length(), chunked_array->type(), chunked_array->chunks()); } @@ -1314,19 +1266,15 @@ cpp11::writable::list RecordBatch__to_dataframe( int64_t nr = batch->num_rows(); cpp11::writable::strings names(nc); std::vector arrays(nc); - std::vector> converters(nc); + std::vector> types(nc); for (R_xlen_t i = 0; i < nc; i++) { names[i] = batch->column_name(i); arrays[i] = {batch->column(i)}; - converters[i] = arrow::r::Converter::Make(batch->column(i)->type(), arrays[i]); + types[i] = batch->column(i)->type(); } - if (use_threads) { - return arrow::r::to_dataframe_parallel(nr, nc, names, converters); - } else { - return arrow::r::to_dataframe_serial(nr, nc, names, converters); - } + return arrow::r::to_dataframe(nr, nc, names, arrays, types, use_threads); } // [[arrow::export]] @@ -1335,19 +1283,17 @@ cpp11::writable::list Table__to_dataframe(const std::shared_ptr& t int64_t nc = table->num_columns(); int64_t nr = table->num_rows(); cpp11::writable::strings names(nc); - std::vector> converters(nc); + + std::vector arrays(nc); + std::vector> types(nc); for (R_xlen_t i = 0; i < nc; i++) { - converters[i] = - arrow::r::Converter::Make(table->column(i)->type(), table->column(i)->chunks()); + arrays[i] = table->column(i)->chunks(); names[i] = table->field(i)->name(); + types[i] = table->field(i)->type(); } - if (use_threads) { - return arrow::r::to_dataframe_parallel(nr, nc, names, converters); - } else { - return arrow::r::to_dataframe_serial(nr, nc, names, converters); - } + return arrow::r::to_dataframe(nr, nc, names, arrays, types, use_threads); } #endif From 12c9902ec158088b0b520e476e25e611ec090b67 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 23 Jun 2021 16:04:59 +0200 Subject: [PATCH 07/12] Converter uses ChunkedArray rather than ArrayVector --- r/R/arrowExports.R | 4 +- r/R/chunked-array.R | 12 +- r/src/array_to_vector.cpp | 359 +++++++++++++++++++------------------- r/src/arrowExports.cpp | 11 +- r/src/arrow_types.h | 2 - 5 files changed, 196 insertions(+), 192 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 9257f5787b1..2e619a60c40 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -148,8 +148,8 @@ Array__as_vector <- function(array){ .Call(`_arrow_Array__as_vector`, array) } -ChunkedArray__as_vector <- function(chunked_array){ - .Call(`_arrow_ChunkedArray__as_vector`, chunked_array) +ChunkedArray__as_vector <- function(chunked_array, use_threads){ + .Call(`_arrow_ChunkedArray__as_vector`, chunked_array, use_threads) } RecordBatch__to_dataframe <- function(batch, use_threads){ diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index c58e5ac94f9..f33cb362aaa 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -61,18 +61,18 @@ #' # Pass items into chunked_array as separate objects to create chunks #' class_scores <- chunked_array(c(87, 88, 89), c(94, 93, 92), c(71, 72, 73)) #' class_scores$num_chunks -#' +#' #' # When taking a Slice from a chunked_array, chunks are preserved #' class_scores$Slice(2, length = 5) -#' -#' # You can combine Take and SortIndices to return a ChunkedArray with 1 chunk +#' +#' # You can combine Take and SortIndices to return a ChunkedArray with 1 chunk #' # containing all values, ordered. #' class_scores$Take(class_scores$SortIndices(descending = TRUE)) -#' +#' #' # If you pass a list into chunked_array, you get a list of length 1 #' list_scores <- chunked_array(list(c(9.9, 9.6, 9.5), c(8.2, 8.3, 8.4), c(10.0, 9.9, 9.8))) #' list_scores$num_chunks -#' +#' #' # When constructing a ChunkedArray, the first chunk is used to infer type. #' doubles <- chunked_array(c(1, 2, 3), c(5L, 6L, 7L)) #' doubles$type @@ -82,7 +82,7 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = ArrowDatum, length = function() ChunkedArray__length(self), type_id = function() ChunkedArray__type(self)$id, chunk = function(i) Array$create(ChunkedArray__chunk(self, i)), - as_vector = function() ChunkedArray__as_vector(self), + as_vector = function(use_threads = option_use_threads()) ChunkedArray__as_vector(self, use_threads), Slice = function(offset, length = NULL) { if (is.null(length)) { ChunkedArray__Slice1(self, offset) diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index 5f7f47138ef..f1b9651112d 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -40,7 +40,8 @@ namespace r { class Converter { public: - explicit Converter(ArrayVector arrays) : arrays_(std::move(arrays)) {} + explicit Converter(const std::shared_ptr& chunked_array) + : chunked_array_(std::move(chunked_array)) {} virtual ~Converter() {} @@ -62,37 +63,79 @@ class Converter { // can this run in parallel ? virtual bool Parallel() const { return true; } - SEXP LazyConvert(R_xlen_t n, RTasks& tasks) { - SEXP out = PROTECT(Allocate(n)); - IngestAll(out, tasks); - UNPROTECT(1); - return out; - } - - void IngestAll(SEXP data, RTasks& tasks) { + // converter is passed as self to outlive the scope of Converter::Convert() + virtual SEXP ScheduleConvertTasks(RTasks& tasks, std::shared_ptr self) { + SEXP out = PROTECT(Allocate(chunked_array_->length())); R_xlen_t k = 0, i = 0; - for (const auto& array : arrays_) { + + // for each array, fill the relevant slice of `out` + // potentially in parallel + for (const auto& array : chunked_array_->chunks()) { auto n_chunk = array->length(); tasks.Append(Parallel(), [=] { if (array->null_count() == n_chunk) { - return Ingest_all_nulls(data, k, n_chunk); + return self->Ingest_all_nulls(out, k, n_chunk); } else { - return Ingest_some_nulls(data, array, k, n_chunk, i); + return self->Ingest_some_nulls(out, array, k, n_chunk, i); } }); k += n_chunk; i++; } + + UNPROTECT(1); + return out; } // Converter factory - static std::shared_ptr Make(const std::shared_ptr& type, - ArrayVector arrays); + static std::shared_ptr Make( + const std::shared_ptr& chunked_array); + + static SEXP LazyConvert(const std::shared_ptr& chunked_array, + RTasks& tasks) { + auto converter = Make(chunked_array); + return converter->ScheduleConvertTasks(tasks, converter); + } + + static SEXP Convert(const std::shared_ptr& chunked_array, + bool use_threads) { + const auto& type = chunked_array->type(); + + // TODO: move this down, e.g. Make() could make an alrep aware converter +#if defined(HAS_ALTREP) + // special case when there is only one array + if (chunked_array->num_chunks() == 1) { + const auto& array = chunked_array->chunk(0); + if (arrow::r::GetBoolOption("arrow.use_altrep", true) && array->length() > 0 && + array->null_count() == 0) { + switch (type->id()) { + case arrow::Type::DOUBLE: + return arrow::r::MakeDoubleArrayNoNull(array); + case arrow::Type::INT32: + return arrow::r::MakeInt32ArrayNoNull(array); + default: + break; + } + } + } +#endif + + RTasks tasks(use_threads); + SEXP out = PROTECT(Converter::LazyConvert(chunked_array, tasks)); + StopIfNotOk(tasks.Finish()); + + UNPROTECT(1); + return out; + } + + static SEXP Convert(const std::shared_ptr& array) { + return Convert(std::make_shared(array), false); + } protected: - ArrayVector arrays_; + std::shared_ptr chunked_array_; }; template @@ -126,42 +169,13 @@ Status IngestSome(const std::shared_ptr& array, R_xlen_t n, return IngestSome(array, n, std::forward(set_non_null), nothing); } -// Allocate + Ingest -SEXP ArrayVector__as_vector(R_xlen_t n, const std::shared_ptr& type, - const ArrayVector& arrays) { -#if defined(HAS_ALTREP) - // special case when there is only one array - if (arrays.size() == 1) { - const auto& array = arrays[0]; - if (arrow::r::GetBoolOption("arrow.use_altrep", true) && array->length() > 0 && - array->null_count() == 0) { - switch (type->id()) { - case arrow::Type::DOUBLE: - return arrow::r::MakeDoubleArrayNoNull(array); - case arrow::Type::INT32: - return arrow::r::MakeInt32ArrayNoNull(array); - default: - break; - } - } - } -#endif - - RTasks tasks(false); - auto converter = Converter::Make(type, arrays); - SEXP data = PROTECT(converter->LazyConvert(n, tasks)); - StopIfNotOk(tasks.Finish()); - - UNPROTECT(1); - return data; -} - template class Converter_Int : public Converter { using value_type = typename TypeTraits::ArrayType::value_type; public: - explicit Converter_Int(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Int(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(INTSXP, n); } @@ -195,7 +209,8 @@ class Converter_Double : public Converter { using value_type = typename TypeTraits::ArrayType::value_type; public: - explicit Converter_Double(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Double(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } @@ -226,7 +241,8 @@ class Converter_Double : public Converter { class Converter_Date32 : public Converter { public: - explicit Converter_Date32(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Date32(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP data = PROTECT(Rf_allocVector(REALSXP, n)); @@ -263,7 +279,8 @@ class Converter_Date32 : public Converter { template struct Converter_String : public Converter { public: - explicit Converter_String(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_String(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(STRSXP, n); } @@ -401,7 +418,8 @@ struct Converter_String : public Converter { class Converter_Boolean : public Converter { public: - explicit Converter_Boolean(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Boolean(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(LGLSXP, n); } @@ -439,7 +457,8 @@ template class Converter_Binary : public Converter { public: using offset_type = typename ArrayType::offset_type; - explicit Converter_Binary(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Binary(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); @@ -483,8 +502,9 @@ class Converter_Binary : public Converter { class Converter_FixedSizeBinary : public Converter { public: - explicit Converter_FixedSizeBinary(const ArrayVector& arrays, int byte_width) - : Converter(arrays), byte_width_(byte_width) {} + explicit Converter_FixedSizeBinary(const std::shared_ptr& chunked_array, + int byte_width) + : Converter(chunked_array), byte_width_(byte_width) {} SEXP Allocate(R_xlen_t n) const { SEXP res = PROTECT(Rf_allocVector(VECSXP, n)); @@ -533,25 +553,27 @@ class Converter_Dictionary : public Converter { std::shared_ptr dictionary_; public: - explicit Converter_Dictionary(const ArrayVector& arrays) - : Converter(arrays), need_unification_(NeedUnification()) { + explicit Converter_Dictionary(const std::shared_ptr& chunked_array) + : Converter(chunked_array), need_unification_(NeedUnification()) { if (need_unification_) { - const auto& arr_first = checked_cast(*arrays[0]); + const auto& arr_first = + checked_cast(*chunked_array->chunk(0)); const auto& arr_type = checked_cast(*arr_first.type()); unifier_ = ValueOrStop(DictionaryUnifier::Make(arr_type.value_type())); - size_t n_arrays = arrays.size(); + size_t n_arrays = chunked_array->num_chunks(); arrays_transpose_.resize(n_arrays); for (size_t i = 0; i < n_arrays; i++) { const auto& dict_i = - *checked_cast(*arrays[i]).dictionary(); + *checked_cast(*chunked_array->chunk(i)).dictionary(); StopIfNotOk(unifier_->Unify(dict_i, &arrays_transpose_[i])); } StopIfNotOk(unifier_->GetResult(&out_type_, &dictionary_)); } else { - const auto& dict_array = checked_cast(*arrays_[0]); + const auto& dict_array = + checked_cast(*chunked_array->chunk(0)); auto indices = dict_array.indices(); switch (indices->type_id()) { @@ -653,13 +675,14 @@ class Converter_Dictionary : public Converter { } bool NeedUnification() { - int n = arrays_.size(); + int n = chunked_array_->num_chunks(); if (n < 2) { return false; } - const auto& arr_first = checked_cast(*arrays_[0]); + const auto& arr_first = + checked_cast(*chunked_array_->chunk(0)); for (int i = 1; i < n; i++) { - const auto& arr = checked_cast(*arrays_[i]); + const auto& arr = checked_cast(*chunked_array_->chunk(i)); if (!(arr_first.dictionary()->Equals(arr.dictionary()))) { return true; } @@ -668,7 +691,9 @@ class Converter_Dictionary : public Converter { } bool GetOrdered() const { - return checked_cast(*arrays_[0]).dict_type()->ordered(); + return checked_cast(*chunked_array_->chunk(0)) + .dict_type() + ->ordered(); } SEXP GetLevels() const { @@ -680,8 +705,7 @@ class Converter_Dictionary : public Converter { cpp11::warning("Coercing dictionary values to R character factor levels"); } - SEXP vec = PROTECT(ArrayVector__as_vector(dictionary_->length(), dictionary_->type(), - {dictionary_})); + SEXP vec = PROTECT(Converter::Convert(dictionary_)); SEXP strings_vec = PROTECT(Rf_coerceVector(vec, STRSXP)); UNPROTECT(2); return strings_vec; @@ -690,18 +714,21 @@ class Converter_Dictionary : public Converter { class Converter_Struct : public Converter { public: - explicit Converter_Struct(const ArrayVector& arrays) : Converter(arrays), converters() { - auto first_array = checked_cast(this->arrays_[0].get()); + explicit Converter_Struct(const std::shared_ptr& chunked_array) + : Converter(chunked_array), converters() { + auto first_array = + checked_cast(this->chunked_array_->chunk(0).get()); int nf = first_array->num_fields(); for (int i = 0; i < nf; i++) { converters.push_back( - Converter::Make(first_array->field(i)->type(), {first_array->field(i)})); + Converter::Make(std::make_shared(first_array->field(i)))); } } SEXP Allocate(R_xlen_t n) const { // allocate a data frame column to host each array - auto first_array = checked_cast(this->arrays_[0].get()); + auto first_array = + checked_cast(this->chunked_array_->chunk(0).get()); auto type = first_array->struct_type(); auto out = arrow::r::to_r_list(converters, [n](const std::shared_ptr& converter) { @@ -756,7 +783,8 @@ double ms_to_seconds(int64_t ms) { return static_cast(ms) / 1000; } class Converter_Date64 : public Converter { public: - explicit Converter_Date64(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Date64(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); @@ -788,7 +816,8 @@ class Converter_Date64 : public Converter { template class Converter_Time : public Converter { public: - explicit Converter_Time(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Time(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); @@ -842,13 +871,14 @@ class Converter_Time : public Converter { template class Converter_Timestamp : public Converter_Time { public: - explicit Converter_Timestamp(const ArrayVector& arrays) - : Converter_Time(arrays) {} + explicit Converter_Timestamp(const std::shared_ptr& chunked_array) + : Converter_Time(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); Rf_classgets(data, arrow::r::data::classes_POSIXct); - auto array = checked_cast(this->arrays_[0].get()); + auto array = + checked_cast(this->chunked_array_->chunk(0).get()); auto array_type = checked_cast(array->type().get()); std::string tzone = array_type->timezone(); if (tzone.size() > 0) { @@ -860,7 +890,8 @@ class Converter_Timestamp : public Converter_Time { class Converter_Decimal : public Converter { public: - explicit Converter_Decimal(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Decimal(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { return Rf_allocVector(REALSXP, n); } @@ -893,9 +924,9 @@ class Converter_List : public Converter { std::shared_ptr value_type_; public: - explicit Converter_List(const ArrayVector& arrays, + explicit Converter_List(const std::shared_ptr& chunked_array, const std::shared_ptr& value_type) - : Converter(arrays), value_type_(value_type) {} + : Converter(chunked_array), value_type_(value_type) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::list res(n); @@ -911,7 +942,7 @@ class Converter_List : public Converter { StopIfNotOk(builder->Finish(&array)); // convert to an R object to store as the list' ptype - res.attr(arrow::r::symbols::ptype) = Array__as_vector(array); + res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); return res; } @@ -928,7 +959,7 @@ class Converter_List : public Converter { auto ingest_one = [&](R_xlen_t i) { auto slice = list_array->value_slice(i); - SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); + SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); return Status::OK(); }; @@ -944,10 +975,10 @@ class Converter_FixedSizeList : public Converter { int list_size_; public: - explicit Converter_FixedSizeList(const ArrayVector& arrays, + explicit Converter_FixedSizeList(const std::shared_ptr& chunked_array, const std::shared_ptr& value_type, int list_size) - : Converter(arrays), value_type_(value_type), list_size_(list_size) {} + : Converter(chunked_array), value_type_(value_type), list_size_(list_size) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::list res(n); @@ -962,7 +993,7 @@ class Converter_FixedSizeList : public Converter { StopIfNotOk(builder->Finish(&array)); // convert to an R object to store as the list' ptype - res.attr(arrow::r::symbols::ptype) = Array__as_vector(array); + res.attr(arrow::r::symbols::ptype) = Converter::Convert(array); return res; } @@ -979,7 +1010,7 @@ class Converter_FixedSizeList : public Converter { auto ingest_one = [&](R_xlen_t i) { auto slice = fixed_size_list_array.value_slice(i); - SET_VECTOR_ELT(data, i + start, Array__as_vector(slice)); + SET_VECTOR_ELT(data, i + start, Converter::Convert(slice)); return Status::OK(); }; return IngestSome(array, n, ingest_one); @@ -990,7 +1021,8 @@ class Converter_FixedSizeList : public Converter { class Converter_Int64 : public Converter { public: - explicit Converter_Int64(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Int64(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { cpp11::writable::doubles data(n); @@ -1029,7 +1061,8 @@ class Converter_Int64 : public Converter { class Converter_Null : public Converter { public: - explicit Converter_Null(const ArrayVector& arrays) : Converter(arrays) {} + explicit Converter_Null(const std::shared_ptr& chunked_array) + : Converter(chunked_array) {} SEXP Allocate(R_xlen_t n) const { SEXP data = PROTECT(Rf_allocVector(LGLSXP, n)); @@ -1071,147 +1104,135 @@ bool GetBoolOption(const std::string& name, bool default_) { } } -std::shared_ptr Converter::Make(const std::shared_ptr& type, - ArrayVector arrays) { - if (arrays.empty()) { - // slight hack for the 0-row case since the converters expect at least one - // chunk to process. - arrays.push_back(ValueOrStop(arrow::MakeArrayOfNull(type, 0))); - } - +std::shared_ptr Converter::Make( + const std::shared_ptr& chunked_array) { + const auto& type = chunked_array->type(); switch (type->id()) { // direct support case Type::INT32: - return std::make_shared>( - std::move(arrays)); + return std::make_shared>(chunked_array); case Type::DOUBLE: return std::make_shared>( - std::move(arrays)); + chunked_array); // need to handle 1-bit case case Type::BOOL: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); case Type::BINARY: return std::make_shared>( - std::move(arrays)); + chunked_array); case Type::LARGE_BINARY: return std::make_shared>( - std::move(arrays)); + chunked_array); case Type::FIXED_SIZE_BINARY: return std::make_shared( - std::move(arrays), - checked_cast(*type).byte_width()); + chunked_array, checked_cast(*type).byte_width()); // handle memory dense strings case Type::STRING: return std::make_shared>( - std::move(arrays)); + chunked_array); case Type::LARGE_STRING: return std::make_shared>( - std::move(arrays)); + chunked_array); case Type::DICTIONARY: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); case Type::DATE32: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); case Type::DATE64: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); // promotions to integer vector case Type::INT8: - return std::make_shared>( - std::move(arrays)); + return std::make_shared>(chunked_array); case Type::UINT8: - return std::make_shared>( - std::move(arrays)); + return std::make_shared>(chunked_array); case Type::INT16: - return std::make_shared>( - std::move(arrays)); + return std::make_shared>(chunked_array); case Type::UINT16: - return std::make_shared>( - std::move(arrays)); + return std::make_shared>(chunked_array); // promotions to numeric vector, if they don't fit into int32 case Type::UINT32: - if (ArraysCanFitInteger(arrays)) { + if (ArraysCanFitInteger(chunked_array->chunks())) { return std::make_shared>( - std::move(arrays)); + chunked_array); } else { return std::make_shared>( - std::move(arrays)); + chunked_array); } case Type::UINT64: - if (ArraysCanFitInteger(arrays)) { + if (ArraysCanFitInteger(chunked_array->chunks())) { return std::make_shared>( - std::move(arrays)); + chunked_array); } else { return std::make_shared>( - std::move(arrays)); + chunked_array); } case Type::HALF_FLOAT: return std::make_shared>( - std::move(arrays)); + chunked_array); case Type::FLOAT: return std::make_shared>( - std::move(arrays)); + chunked_array); // time32 and time64 case Type::TIME32: - return std::make_shared>(std::move(arrays)); + return std::make_shared>(chunked_array); case Type::TIME64: - return std::make_shared>(std::move(arrays)); + return std::make_shared>(chunked_array); case Type::TIMESTAMP: - return std::make_shared>(std::move(arrays)); + return std::make_shared>(chunked_array); case Type::INT64: // Prefer integer if it fits, unless option arrow.int64_downcast is `false` - if (GetBoolOption("arrow.int64_downcast", true) && ArraysCanFitInteger(arrays)) { - return std::make_shared>( - std::move(arrays)); + if (GetBoolOption("arrow.int64_downcast", true) && + ArraysCanFitInteger(chunked_array->chunks())) { + return std::make_shared>(chunked_array); } else { - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); } case Type::DECIMAL: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); // nested case Type::STRUCT: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); case Type::LIST: return std::make_shared>( - std::move(arrays), - checked_cast(type.get())->value_type()); + chunked_array, checked_cast(type.get())->value_type()); case Type::LARGE_LIST: return std::make_shared>( - std::move(arrays), + chunked_array, checked_cast(type.get())->value_type()); case Type::FIXED_SIZE_LIST: return std::make_shared( - std::move(arrays), + chunked_array, checked_cast(*type).value_type(), checked_cast(*type).list_size()); case Type::NA: - return std::make_shared(std::move(arrays)); + return std::make_shared(chunked_array); default: break; @@ -1220,20 +1241,29 @@ std::shared_ptr Converter::Make(const std::shared_ptr& type cpp11::stop("cannot handle Array of type ", type->name().c_str()); } -cpp11::writable::list to_dataframe( - int64_t nr, int64_t nc, const cpp11::writable::strings& names, - const std::vector& columns, - const std::vector>& types, - bool use_threads) { - cpp11::writable::list tbl(nc); +std::shared_ptr to_chunks(const std::shared_ptr& array) { + return std::make_shared(array); +} + +std::shared_ptr to_chunks( + const std::shared_ptr& chunked_array) { + return chunked_array; +} + +template +cpp11::writable::list to_data_frame(const std::shared_ptr& data, + bool use_threads) { + int64_t nc = data->num_columns(); + int64_t nr = data->num_rows(); + cpp11::writable::strings names(nc); arrow::r::RTasks tasks(use_threads); - std::vector> converters(nc); - for (int i = 0; i < nc; i++) { - converters[i] = Converter::Make(types[i], columns[i]); + cpp11::writable::list tbl(nc); - tbl[i] = converters[i]->LazyConvert(nr, tasks); + for (int i = 0; i < nc; i++) { + names[i] = data->schema()->field(i)->name(); + tbl[i] = Converter::LazyConvert(to_chunks(data->column(i)), tasks); } StopIfNotOk(tasks.Finish()); @@ -1250,50 +1280,25 @@ cpp11::writable::list to_dataframe( // [[arrow::export]] SEXP Array__as_vector(const std::shared_ptr& array) { - return arrow::r::ArrayVector__as_vector(array->length(), array->type(), {array}); + return arrow::r::Converter::Convert(array); } // [[arrow::export]] -SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array) { - return arrow::r::ArrayVector__as_vector(chunked_array->length(), chunked_array->type(), - chunked_array->chunks()); +SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array, + bool use_threads = false) { + return arrow::r::Converter::Convert(chunked_array, use_threads); } // [[arrow::export]] cpp11::writable::list RecordBatch__to_dataframe( const std::shared_ptr& batch, bool use_threads) { - int64_t nc = batch->num_columns(); - int64_t nr = batch->num_rows(); - cpp11::writable::strings names(nc); - std::vector arrays(nc); - std::vector> types(nc); - - for (R_xlen_t i = 0; i < nc; i++) { - names[i] = batch->column_name(i); - arrays[i] = {batch->column(i)}; - types[i] = batch->column(i)->type(); - } - - return arrow::r::to_dataframe(nr, nc, names, arrays, types, use_threads); + return arrow::r::to_data_frame(batch, use_threads); } // [[arrow::export]] cpp11::writable::list Table__to_dataframe(const std::shared_ptr& table, bool use_threads) { - int64_t nc = table->num_columns(); - int64_t nr = table->num_rows(); - cpp11::writable::strings names(nc); - - std::vector arrays(nc); - std::vector> types(nc); - - for (R_xlen_t i = 0; i < nc; i++) { - arrays[i] = table->column(i)->chunks(); - names[i] = table->field(i)->name(); - types[i] = table->field(i)->type(); - } - - return arrow::r::to_dataframe(nr, nc, names, arrays, types, use_threads); + return arrow::r::to_data_frame(table, use_threads); } #endif diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5f3febffcd3..9541572a1a9 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -583,15 +583,16 @@ extern "C" SEXP _arrow_Array__as_vector(SEXP array_sexp){ // array_to_vector.cpp #if defined(ARROW_R_WITH_ARROW) -SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); -extern "C" SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_array_sexp){ +SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array, bool use_threads); +extern "C" SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_array_sexp, SEXP use_threads_sexp){ BEGIN_CPP11 arrow::r::Input&>::type chunked_array(chunked_array_sexp); - return cpp11::as_sexp(ChunkedArray__as_vector(chunked_array)); + arrow::r::Input::type use_threads(use_threads_sexp); + return cpp11::as_sexp(ChunkedArray__as_vector(chunked_array, use_threads)); END_CPP11 } #else -extern "C" SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_array_sexp){ +extern "C" SEXP _arrow_ChunkedArray__as_vector(SEXP chunked_array_sexp, SEXP use_threads_sexp){ Rf_error("Cannot call ChunkedArray__as_vector(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -6960,7 +6961,7 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ListArray__raw_value_offsets", (DL_FUNC) &_arrow_ListArray__raw_value_offsets, 1}, { "_arrow_LargeListArray__raw_value_offsets", (DL_FUNC) &_arrow_LargeListArray__raw_value_offsets, 1}, { "_arrow_Array__as_vector", (DL_FUNC) &_arrow_Array__as_vector, 1}, - { "_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 1}, + { "_arrow_ChunkedArray__as_vector", (DL_FUNC) &_arrow_ChunkedArray__as_vector, 2}, { "_arrow_RecordBatch__to_dataframe", (DL_FUNC) &_arrow_RecordBatch__to_dataframe, 2}, { "_arrow_Table__to_dataframe", (DL_FUNC) &_arrow_Table__to_dataframe, 2}, { "_arrow_ArrayData__get_type", (DL_FUNC) &_arrow_ArrayData__get_type, 1}, diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 68e1c8659c4..b5a8914d432 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -58,8 +58,6 @@ namespace ds = ::arrow::dataset; namespace compute = ::arrow::compute; namespace fs = ::arrow::fs; -SEXP ChunkedArray__as_vector(const std::shared_ptr& chunked_array); -SEXP Array__as_vector(const std::shared_ptr& array); std::shared_ptr RecordBatch__from_arrays(SEXP, SEXP); arrow::MemoryPool* gc_memory_pool(); From 52cc3620b2e3e8ff869ef914db2d38f2f642e8f3 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 23 Jun 2021 16:24:56 +0200 Subject: [PATCH 08/12] ScheduleConvertTasks() does not need to be virtual at this point --- r/src/array_to_vector.cpp | 47 ++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 25 deletions(-) diff --git a/r/src/array_to_vector.cpp b/r/src/array_to_vector.cpp index f1b9651112d..d5a5425966f 100644 --- a/r/src/array_to_vector.cpp +++ b/r/src/array_to_vector.cpp @@ -64,12 +64,30 @@ class Converter { virtual bool Parallel() const { return true; } // converter is passed as self to outlive the scope of Converter::Convert() - virtual SEXP ScheduleConvertTasks(RTasks& tasks, std::shared_ptr self) { + SEXP ScheduleConvertTasks(RTasks& tasks, std::shared_ptr self) { +#if defined(HAS_ALTREP) + // special case when there is only one array + if (chunked_array_->num_chunks() == 1) { + const auto& array = chunked_array_->chunk(0); + if (arrow::r::GetBoolOption("arrow.use_altrep", true) && array->length() > 0 && + array->null_count() == 0) { + switch (array->type()->id()) { + case arrow::Type::DOUBLE: + return arrow::r::MakeDoubleArrayNoNull(array); + case arrow::Type::INT32: + return arrow::r::MakeInt32ArrayNoNull(array); + default: + break; + } + } + } +#endif + + // allocating the R vector upfront SEXP out = PROTECT(Allocate(chunked_array_->length())); - R_xlen_t k = 0, i = 0; - // for each array, fill the relevant slice of `out` - // potentially in parallel + // for each array, fill the relevant slice of `out`, potentially in parallel + R_xlen_t k = 0, i = 0; for (const auto& array : chunked_array_->chunks()) { auto n_chunk = array->length(); @@ -101,27 +119,6 @@ class Converter { static SEXP Convert(const std::shared_ptr& chunked_array, bool use_threads) { - const auto& type = chunked_array->type(); - - // TODO: move this down, e.g. Make() could make an alrep aware converter -#if defined(HAS_ALTREP) - // special case when there is only one array - if (chunked_array->num_chunks() == 1) { - const auto& array = chunked_array->chunk(0); - if (arrow::r::GetBoolOption("arrow.use_altrep", true) && array->length() > 0 && - array->null_count() == 0) { - switch (type->id()) { - case arrow::Type::DOUBLE: - return arrow::r::MakeDoubleArrayNoNull(array); - case arrow::Type::INT32: - return arrow::r::MakeInt32ArrayNoNull(array); - default: - break; - } - } - } -#endif - RTasks tasks(use_threads); SEXP out = PROTECT(Converter::LazyConvert(chunked_array, tasks)); StopIfNotOk(tasks.Finish()); From 10834d24e8f760a7cfe9db1cb5c4057ef5470022 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 24 Jun 2021 14:27:35 +0200 Subject: [PATCH 09/12] +tests --- r/tests/testthat/test-altrep.R | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/r/tests/testthat/test-altrep.R b/r/tests/testthat/test-altrep.R index ec1c671b12e..42784b61442 100644 --- a/r/tests/testthat/test-altrep.R +++ b/r/tests/testthat/test-altrep.R @@ -94,3 +94,17 @@ test_that("empty vectors are not altrep", { expect_false(is_altrep_int_nonull(as.vector(v_int))) expect_false(is_altrep_dbl_nonull(as.vector(v_dbl))) }) + +test_that("as.data.frame(, ) can create altrep vectors", { + withr::local_options(list(arrow.use_altrep = TRUE)) + + table <- Table$create(int = c(1L, 2L, 3L), dbl = c(1, 2, 3)) + df_table <- as.data.frame(table) + expect_true(is_altrep_int_nonull(df_table$int)) + expect_true(is_altrep_dbl_nonull(df_table$dbl)) + + batch <- RecordBatch$create(int = c(1L, 2L, 3L), dbl = c(1, 2, 3)) + df_batch <- as.data.frame(batch) + expect_true(is_altrep_int_nonull(df_batch$int)) + expect_true(is_altrep_dbl_nonull(df_batch$dbl)) +}) From af659f4a7cacc1c39b11dcb30248f0bcea0205fe Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 25 Jun 2021 09:28:24 +0200 Subject: [PATCH 10/12] + ARROW_R_WITH_ARROW protection --- r/src/altrep.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/r/src/altrep.cpp b/r/src/altrep.cpp index f5f499ab3f6..551a39dc09b 100644 --- a/r/src/altrep.cpp +++ b/r/src/altrep.cpp @@ -20,7 +20,6 @@ #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) - #if defined(HAS_ALTREP) #include From 71e57106c52b5d0b76f951b9fd4c07ac32437bf6 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 25 Jun 2021 09:29:58 +0200 Subject: [PATCH 11/12] - ChunkedArray$as_vector(use_threads=) --- r/R/chunked-array.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/R/chunked-array.R b/r/R/chunked-array.R index f33cb362aaa..9465147a8ce 100644 --- a/r/R/chunked-array.R +++ b/r/R/chunked-array.R @@ -82,7 +82,7 @@ ChunkedArray <- R6Class("ChunkedArray", inherit = ArrowDatum, length = function() ChunkedArray__length(self), type_id = function() ChunkedArray__type(self)$id, chunk = function(i) Array$create(ChunkedArray__chunk(self, i)), - as_vector = function(use_threads = option_use_threads()) ChunkedArray__as_vector(self, use_threads), + as_vector = function() ChunkedArray__as_vector(self, option_use_threads()), Slice = function(offset, length = NULL) { if (is.null(length)) { ChunkedArray__Slice1(self, offset) From 903677e3f0ec094797a1b67796bca127b3e37d00 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 25 Jun 2021 09:35:56 +0200 Subject: [PATCH 12/12] order of includes --- r/src/altrep.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/altrep.cpp b/r/src/altrep.cpp index 551a39dc09b..c5d309b66a6 100644 --- a/r/src/altrep.cpp +++ b/r/src/altrep.cpp @@ -15,11 +15,11 @@ // specific language governing permissions and limitations // under the License. -#include - #include "./arrow_types.h" #if defined(ARROW_R_WITH_ARROW) + +#include #if defined(HAS_ALTREP) #include