From 3baa98639289397c4a4f945cea15a31b44dfc29e Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 2 Mar 2021 17:45:42 +0100 Subject: [PATCH 01/33] towards a parallel Table__from_dots() --- r/R/arrowExports.R | 9 ++-- r/src/arrowExports.cpp | 37 +++++++------ r/src/r_to_arrow.cpp | 114 ++++++++++++++++++++++++++++++++++++++++- r/src/table.cpp | 49 ------------------ 4 files changed, 138 insertions(+), 71 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 8477f949f1b..a10527f5675 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1360,8 +1360,13 @@ ExportRecordBatch <- function(batch, array_ptr, schema_ptr){ invisible(.Call(`_arrow_ExportRecordBatch`, batch, array_ptr, schema_ptr)) } +<<<<<<< HEAD ExportRecordBatchReader <- function(reader, stream_ptr){ invisible(.Call(`_arrow_ExportRecordBatchReader`, reader, stream_ptr)) +======= +Table__from_dots <- function(lst, schema_sxp){ + .Call(`_arrow_Table__from_dots`, lst, schema_sxp) +>>>>>>> e8b766472 (towards a parallel Table__from_dots()) } vec_to_arrow <- function(x, s_type){ @@ -1704,10 +1709,6 @@ Table__from_record_batches <- function(batches, schema_sxp){ .Call(`_arrow_Table__from_record_batches`, batches, schema_sxp) } -Table__from_dots <- function(lst, schema_sxp){ - .Call(`_arrow_Table__from_dots`, lst, schema_sxp) -} - GetCpuThreadPoolCapacity <- function(){ .Call(`_arrow_GetCpuThreadPoolCapacity`) } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 1ccfd593d2c..94b45434501 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5352,6 +5352,22 @@ extern "C" SEXP _arrow_ExportRecordBatchReader(SEXP reader_sexp, SEXP stream_ptr } #endif +// r_to_arrow.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ +BEGIN_CPP11 + arrow::r::Input::type lst(lst_sexp); + arrow::r::Input::type schema_sxp(schema_sxp_sexp); + return cpp11::as_sexp(Table__from_dots(lst, schema_sxp)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ + Rf_error("Cannot call Table__from_dots(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // r_to_arrow.cpp #if defined(ARROW_R_WITH_ARROW) SEXP vec_to_arrow(SEXP x, SEXP s_type); @@ -6696,22 +6712,6 @@ extern "C" SEXP _arrow_Table__from_record_batches(SEXP batches_sexp, SEXP schema } #endif -// table.cpp -#if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ -BEGIN_CPP11 - arrow::r::Input::type lst(lst_sexp); - arrow::r::Input::type schema_sxp(schema_sxp_sexp); - return cpp11::as_sexp(Table__from_dots(lst, schema_sxp)); -END_CPP11 -} -#else -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ - Rf_error("Cannot call Table__from_dots(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif - // threadpool.cpp #if defined(ARROW_R_WITH_ARROW) int GetCpuThreadPoolCapacity(); @@ -7164,7 +7164,11 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExportSchema", (DL_FUNC) &_arrow_ExportSchema, 2}, { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, { "_arrow_ExportRecordBatch", (DL_FUNC) &_arrow_ExportRecordBatch, 3}, +<<<<<<< HEAD { "_arrow_ExportRecordBatchReader", (DL_FUNC) &_arrow_ExportRecordBatchReader, 2}, +======= + { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, +>>>>>>> e8b766472 (towards a parallel Table__from_dots()) { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, @@ -7250,7 +7254,6 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_Table__SelectColumns", (DL_FUNC) &_arrow_Table__SelectColumns, 2}, { "_arrow_all_record_batches", (DL_FUNC) &_arrow_all_record_batches, 1}, { "_arrow_Table__from_record_batches", (DL_FUNC) &_arrow_Table__from_record_batches, 2}, - { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, { "_arrow_GetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_GetCpuThreadPoolCapacity, 0}, { "_arrow_SetCpuThreadPoolCapacity", (DL_FUNC) &_arrow_SetCpuThreadPoolCapacity, 1}, { "_arrow_Array__infer_type", (DL_FUNC) &_arrow_Array__infer_type, 1}, diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 0ab9718da26..eabf0f96b5b 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -25,10 +25,14 @@ #include #include #include +#include #include #include #include #include +#include +#include +#include namespace arrow { @@ -207,12 +211,14 @@ class RConverter : public Converter { virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } virtual Status Extend(SEXP values, int64_t size) { - return Status::NotImplemented("ExtendMasked"); + return Status::NotImplemented("Extend"); } virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { return Status::NotImplemented("ExtendMasked"); } + + virtual bool CanExtendParallel(SEXP values, int64_t size) { return true; } }; template @@ -992,6 +998,37 @@ std::shared_ptr vec_to_arrow__reuse_memory(SEXP x) { cpp11::stop("Unreachable: you might need to fix can_reuse_memory()"); } +Status vector_to_Array(SEXP x, const std::shared_ptr& type, + bool type_inferred, + std::shared_ptr& task_group, + std::shared_ptr& out) { + // short circuit if `x` is already an Array + if (Rf_inherits(x, "Array")) { + out = cpp11::as_cpp>(x); + return Status::OK(); + } + + RConversionOptions options; + options.strict = !type_inferred; + options.type = type; + options.size = vctrs::short_vec_size(x); + + // maybe short circuit when zero-copy is possible + if (can_reuse_memory(x, options.type)) { + out = vec_to_arrow__reuse_memory(x); + return Status::OK(); + } + + // otherwise go through the converter api + auto converter = ValueOrStop(MakeConverter( + options.type, options, gc_memory_pool())); + + RETURN_NOT_OK(converter->Extend(x, options.size)); + ARROW_ASSIGN_OR_RAISE(out, converter->ToArray()); + + return Status::OK(); +} + std::shared_ptr vec_to_arrow(SEXP x, const std::shared_ptr& type, bool type_inferred) { @@ -1015,12 +1052,87 @@ std::shared_ptr vec_to_arrow(SEXP x, options.type, options, gc_memory_pool())); StopIfNotOk(converter->Extend(x, options.size)); + return ValueOrStop(converter->ToArray()); } } // namespace r } // namespace arrow +// [[arrow::export]] +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { + bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); + + int num_fields; + StopIfNotOk(arrow::r::count_fields(lst, &num_fields)); + + // schema + metadata + std::shared_ptr schema; + StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); + StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); + + // table + std::vector> columns(num_fields); + + // for now the parallel task does not work, + // presumably because some ->Extend() can't actually run in parallel + // + // auto parallel_tasks = + // arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); + auto parallel_tasks = + arrow::internal::TaskGroup::MakeSerial(); + + std::vector> delayed_serial_tasks; + + auto extract_one_column = [&](int j, SEXP x, cpp11::r_string) { + if (Rf_inherits(x, "ChunkedArray")) { + columns[j] = cpp11::as_cpp>(x); + } else if (Rf_inherits(x, "Array")) { + columns[j] = std::make_shared( + cpp11::as_cpp>(x)); + } else { + arrow::r::RConversionOptions options; + options.strict = !infer_schema; + options.type = schema->field(j)->type(); + options.size = vctrs::short_vec_size(x); + + // maybe short circuit when zero-copy is possible + if (arrow::r::can_reuse_memory(x, options.type)) { + columns[j] = std::make_shared( + arrow::r::vec_to_arrow__reuse_memory(x)); + } else { + auto converter = ValueOrStop( + arrow::MakeConverter( + options.type, options, gc_memory_pool())); + + auto task = [&]() { + RETURN_NOT_OK(converter->Extend(x, options.size)); + columns[j] = + std::make_shared(converter->ToArray().ValueUnsafe()); + return arrow::Status::OK(); + }; + + if (converter->CanExtendParallel(x, options.size)) { + parallel_tasks->Append(task); + } else { + delayed_serial_tasks.push_back(task); + } + } + } + }; + arrow::r::TraverseDots(lst, num_fields, extract_one_column); + + arrow::Status status = arrow::Status::OK(); + for (auto task : delayed_serial_tasks) { + status &= task(); + } + + status &= parallel_tasks->Finish(); + StopIfNotOk(status); + + return arrow::Table::Make(schema, columns); +} + // [[arrow::export]] SEXP vec_to_arrow(SEXP x, SEXP s_type) { if (Rf_inherits(x, "Array")) return x; diff --git a/r/src/table.cpp b/r/src/table.cpp index 997d8f137cb..b6e9ed81479 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -269,33 +269,6 @@ arrow::Status AddMetadataFromDots(SEXP lst, int num_fields, return arrow::Status::OK(); } -arrow::Status CollectTableColumns( - SEXP lst, const std::shared_ptr& schema, int num_fields, bool inferred, - std::vector>& columns) { - if (!inferred && schema->num_fields() != num_fields) { - cpp11::stop("incompatible. schema has %d fields, and %d columns are supplied", - schema->num_fields(), num_fields); - } - auto extract_one_column = [&columns, &schema, inferred](int j, SEXP x, - std::string name) { - if (!inferred && schema->field(j)->name() != name) { - cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, - schema->field(j)->name().c_str(), name.c_str()); - } - if (Rf_inherits(x, "ChunkedArray")) { - columns[j] = cpp11::as_cpp>(x); - } else if (Rf_inherits(x, "Array")) { - columns[j] = std::make_shared( - cpp11::as_cpp>(x)); - } else { - auto array = arrow::r::vec_to_arrow(x, schema->field(j)->type(), inferred); - columns[j] = std::make_shared(array); - } - }; - arrow::r::TraverseDots(lst, num_fields, extract_one_column); - return arrow::Status::OK(); -} - } // namespace r } // namespace arrow @@ -325,26 +298,4 @@ std::shared_ptr Table__from_record_batches( return tab; } -// [[arrow::export]] -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { - bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); - - int num_fields; - StopIfNotOk(arrow::r::count_fields(lst, &num_fields)); - - // schema + metadata - std::shared_ptr schema; - StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); - StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); - - // table - std::vector> columns(num_fields); - StopIfNotOk( - arrow::r::CollectTableColumns(lst, schema, num_fields, infer_schema, columns)); - - StopIfNotOk(arrow::r::check_consistent_column_length(columns)); - - return arrow::Table::Make(schema, columns); -} - #endif From 0f4828b67cca1bd0b21870a589759ab89aab5e25 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Romain=20Fran=C3=A7ois?= Date: Wed, 3 Mar 2021 09:14:37 +0100 Subject: [PATCH 02/33] Update r/src/r_to_arrow.cpp Co-authored-by: Benjamin Kietzman --- r/src/r_to_arrow.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index eabf0f96b5b..c13f596fc55 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1105,7 +1105,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::MakeConverter( options.type, options, gc_memory_pool())); - auto task = [&]() { + auto task = [=]() { RETURN_NOT_OK(converter->Extend(x, options.size)); columns[j] = std::make_shared(converter->ToArray().ValueUnsafe()); From 6327df8358862ddf5a70cf571aded8cc5d8391c3 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 3 Mar 2021 11:17:53 +0100 Subject: [PATCH 03/33] + FlattenDots() --- r/src/arrow_types.h | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 5aa26eebd71..68e7147e7ed 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -149,6 +149,13 @@ void TraverseDots(cpp11::list dots, int num_fields, Lambda lambda) { } } +inline std::vector FlattenDots(cpp11::list dots, int num_fields) { + std::vector out(num_fields); + auto set = [&](int j, SEXP x, cpp11::r_string) { out[j] = x; }; + TraverseDots(dots, num_fields, set); + return out; +} + arrow::Status InferSchemaFromDots(SEXP lst, SEXP schema_sxp, int num_fields, std::shared_ptr& schema); From a68bf316de437125bb28ca9716a26a052841f1f7 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 3 Mar 2021 11:36:27 +0100 Subject: [PATCH 04/33] moving "can extend be done in parallel" out of RConverter --- r/src/r_to_arrow.cpp | 44 ++++++++++++++++++++++++++------------------ 1 file changed, 26 insertions(+), 18 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index c13f596fc55..614184a00b8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -217,8 +217,6 @@ class RConverter : public Converter { virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { return Status::NotImplemented("ExtendMasked"); } - - virtual bool CanExtendParallel(SEXP values, int64_t size) { return true; } }; template @@ -1074,17 +1072,18 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { // table std::vector> columns(num_fields); - // for now the parallel task does not work, - // presumably because some ->Extend() can't actually run in parallel - // - // auto parallel_tasks = - // arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); auto parallel_tasks = - arrow::internal::TaskGroup::MakeSerial(); - + arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); std::vector> delayed_serial_tasks; + arrow::Status status = arrow::Status::OK(); + auto extract_one_column = [&](int j, SEXP x, cpp11::r_string) { + // no need to do anything further if a previous column has failed + if (!status.ok()) { + return; + } + if (Rf_inherits(x, "ChunkedArray")) { columns[j] = cpp11::as_cpp>(x); } else if (Rf_inherits(x, "Array")) { @@ -1101,18 +1100,25 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { columns[j] = std::make_shared( arrow::r::vec_to_arrow__reuse_memory(x)); } else { - auto converter = ValueOrStop( - arrow::MakeConverter( - options.type, options, gc_memory_pool())); + // this needs to be more informed + bool can_extend_parallel = false; + + auto task = [=, &columns]() { + auto converter_result = + arrow::MakeConverter( + options.type, options, gc_memory_pool()); + RETURN_NOT_OK(converter_result.status()); + auto& converter = converter_result.ValueUnsafe(); - auto task = [=]() { RETURN_NOT_OK(converter->Extend(x, options.size)); - columns[j] = - std::make_shared(converter->ToArray().ValueUnsafe()); - return arrow::Status::OK(); + auto result = converter->ToArray(); + if (result.status().ok()) { + columns[j] = std::make_shared(result.ValueUnsafe()); + } + return result.status(); }; - if (converter->CanExtendParallel(x, options.size)) { + if (can_extend_parallel) { parallel_tasks->Append(task); } else { delayed_serial_tasks.push_back(task); @@ -1122,11 +1128,13 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { }; arrow::r::TraverseDots(lst, num_fields, extract_one_column); - arrow::Status status = arrow::Status::OK(); + // now that the parallel tasks have been started + // do the delayed serial tasks for (auto task : delayed_serial_tasks) { status &= task(); } + // and wait for the parallel tasks to finish status &= parallel_tasks->Finish(); StopIfNotOk(status); From eba3ad2522feb732f7b64f047b6cc2d3bd00a40a Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 3 Mar 2021 11:45:30 +0100 Subject: [PATCH 05/33] use FlattenDots() instead of TraverseDots() to avoid too much lambdaception --- r/src/r_to_arrow.cpp | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 614184a00b8..1c0840584bc 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1078,17 +1078,15 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::Status status = arrow::Status::OK(); - auto extract_one_column = [&](int j, SEXP x, cpp11::r_string) { - // no need to do anything further if a previous column has failed - if (!status.ok()) { - return; - } + auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); + for (int j = 0; j < num_fields && status.ok(); j++) { + SEXP x = flatten_lst[j]; if (Rf_inherits(x, "ChunkedArray")) { columns[j] = cpp11::as_cpp>(x); } else if (Rf_inherits(x, "Array")) { columns[j] = std::make_shared( - cpp11::as_cpp>(x)); + cpp11::as_cpp>(x)); } else { arrow::r::RConversionOptions options; options.strict = !infer_schema; @@ -1098,15 +1096,15 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { // maybe short circuit when zero-copy is possible if (arrow::r::can_reuse_memory(x, options.type)) { columns[j] = std::make_shared( - arrow::r::vec_to_arrow__reuse_memory(x)); + arrow::r::vec_to_arrow__reuse_memory(x)); } else { // this needs to be more informed bool can_extend_parallel = false; auto task = [=, &columns]() { auto converter_result = - arrow::MakeConverter( - options.type, options, gc_memory_pool()); + arrow::MakeConverter( + options.type, options, gc_memory_pool()); RETURN_NOT_OK(converter_result.status()); auto& converter = converter_result.ValueUnsafe(); @@ -1125,8 +1123,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { } } } - }; - arrow::r::TraverseDots(lst, num_fields, extract_one_column); + } // now that the parallel tasks have been started // do the delayed serial tasks From 5174c87fa82478e8cd9caf5563b0c5206881a292 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 3 Mar 2021 14:41:54 +0100 Subject: [PATCH 06/33] simplify --- r/src/r_to_arrow.cpp | 31 +++++++++++++++---------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 1c0840584bc..3c0ea1c5a76 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1057,6 +1057,11 @@ std::shared_ptr vec_to_arrow(SEXP x, } // namespace r } // namespace arrow +bool CanExtendParallel(SEXP x, const std::shared_ptr& type) { + // TODO: identify when it's ok to do things in parallel + return false; +} + // [[arrow::export]] std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); @@ -1086,7 +1091,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { columns[j] = cpp11::as_cpp>(x); } else if (Rf_inherits(x, "Array")) { columns[j] = std::make_shared( - cpp11::as_cpp>(x)); + cpp11::as_cpp>(x)); } else { arrow::r::RConversionOptions options; options.strict = !infer_schema; @@ -1096,27 +1101,21 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { // maybe short circuit when zero-copy is possible if (arrow::r::can_reuse_memory(x, options.type)) { columns[j] = std::make_shared( - arrow::r::vec_to_arrow__reuse_memory(x)); + arrow::r::vec_to_arrow__reuse_memory(x)); } else { - // this needs to be more informed - bool can_extend_parallel = false; - auto task = [=, &columns]() { - auto converter_result = - arrow::MakeConverter( - options.type, options, gc_memory_pool()); - RETURN_NOT_OK(converter_result.status()); - auto& converter = converter_result.ValueUnsafe(); + ARROW_ASSIGN_OR_RAISE( + auto converter, + (arrow::MakeConverter( + options.type, options, gc_memory_pool()))); RETURN_NOT_OK(converter->Extend(x, options.size)); - auto result = converter->ToArray(); - if (result.status().ok()) { - columns[j] = std::make_shared(result.ValueUnsafe()); - } - return result.status(); + ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); + columns[j] = std::make_shared(array); + return arrow::Status::OK(); }; - if (can_extend_parallel) { + if (CanExtendParallel(x, options.type)) { parallel_tasks->Append(task); } else { delayed_serial_tasks.push_back(task); From 3b8d6dc51962227a46a8a810bc1f0dce4d15abeb Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 23 Mar 2021 15:18:49 +0100 Subject: [PATCH 07/33] =?UTF-8?q?using=20parallel=20tasks=20in=20Table=5Ff?= =?UTF-8?q?rom=5Fdots()=20but=20for=20now=20the=20tasks=20effectively=20ru?= =?UTF-8?q?n=20serially,=20via=20the=20R=20=C3=B9utex?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- r/src/r_to_arrow.cpp | 82 +++++++++++++++++++++++++------------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 3c0ea1c5a76..1794bd6a862 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -689,7 +689,6 @@ class RPrimitiveConverter> if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - cpp11::strings s(arrow::r::utf8_strings(x)); RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); auto it = s.begin() + start; @@ -702,7 +701,6 @@ class RPrimitiveConverter> total_length += cpp11::is_na(si) ? 0 : si.size(); } RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); - // append it = s.begin() + start; for (R_xlen_t i = 0; i < size; i++, ++it) { @@ -864,15 +862,6 @@ class RStructConverter : public StructConverter { return Status::OK(); })); - for (R_xlen_t i = 0; i < n_columns; i++) { - std::string name(x_names[i]); - if (name != fields[i]->name()) { - return Status::RError( - "Field name in position ", i, " (", fields[i]->name(), - ") does not match the name of the column of the data frame (", name, ")"); - } - } - for (R_xlen_t i = 0; i < n_columns; i++) { SEXP x_i = VECTOR_ELT(x, i); if (vctrs::vec_size(x_i) < size) { @@ -1054,14 +1043,14 @@ std::shared_ptr vec_to_arrow(SEXP x, return ValueOrStop(converter->ToArray()); } +std::mutex& get_r_mutex() { + static std::mutex m; + return m; +} + } // namespace r } // namespace arrow -bool CanExtendParallel(SEXP x, const std::shared_ptr& type) { - // TODO: identify when it's ok to do things in parallel - return false; -} - // [[arrow::export]] std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); @@ -1084,6 +1073,10 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::Status status = arrow::Status::OK(); auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); + + std::vector> converters(num_fields); + + // init converters for (int j = 0; j < num_fields && status.ok(); j++) { SEXP x = flatten_lst[j]; @@ -1103,34 +1096,51 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { columns[j] = std::make_shared( arrow::r::vec_to_arrow__reuse_memory(x)); } else { - auto task = [=, &columns]() { - ARROW_ASSIGN_OR_RAISE( - auto converter, - (arrow::MakeConverter( - options.type, options, gc_memory_pool()))); - - RETURN_NOT_OK(converter->Extend(x, options.size)); - ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); - columns[j] = std::make_shared(array); - return arrow::Status::OK(); - }; - - if (CanExtendParallel(x, options.type)) { - parallel_tasks->Append(task); - } else { - delayed_serial_tasks.push_back(task); + auto converter_result = + arrow::MakeConverter( + options.type, options, gc_memory_pool()); + if (!converter_result.ok()) { + status = converter_result.status(); + break; } + converters[j] = std::move(converter_result.ValueUnsafe()); + } + } + } + StopIfNotOk(status); + + for (int j = 0; j < num_fields; j++) { + if (converters[j] != nullptr) { + auto task = [j, &converters, &flatten_lst, &columns] { + // for now synchronize all tasks with the R mutex + // until we can lock more precisely + // + // so for now, they don't run in parallel + std::lock_guard lock(arrow::r::get_r_mutex()); + + auto& converter = converters[j]; + + SEXP x = flatten_lst[j]; + + RETURN_NOT_OK(converter->Extend(x, converter->options().size)); + ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); + columns[j] = std::make_shared(array); + return arrow::Status::OK(); + }; + + SEXP x = flatten_lst[j]; + if (Rf_inherits(x, "data.frame")) { + delayed_serial_tasks.push_back(std::move(task)); + } else { + parallel_tasks->Append(task); } } } - // now that the parallel tasks have been started - // do the delayed serial tasks - for (auto task : delayed_serial_tasks) { + for (auto& task : delayed_serial_tasks) { status &= task(); } - // and wait for the parallel tasks to finish status &= parallel_tasks->Finish(); StopIfNotOk(status); From 7b11e2f1ae643c1fd54a1cc8fba54bf80fc5d4aa Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 23 Mar 2021 15:36:24 +0100 Subject: [PATCH 08/33] RConverter gains Parallel() to tell if it can Extend() in parallel --- r/src/r_to_arrow.cpp | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 1794bd6a862..98bdd6b5327 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -217,6 +217,8 @@ class RConverter : public Converter { virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { return Status::NotImplemented("ExtendMasked"); } + + virtual bool Parallel() { return true; } }; template @@ -886,6 +888,8 @@ class RStructConverter : public StructConverter { return Status::OK(); } + bool Parallel() override { return false; } + protected: Status Init(MemoryPool* pool) override { return StructConverter::Init(pool); @@ -1110,7 +1114,8 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { StopIfNotOk(status); for (int j = 0; j < num_fields; j++) { - if (converters[j] != nullptr) { + auto& converter = converters[j]; + if (converter != nullptr) { auto task = [j, &converters, &flatten_lst, &columns] { // for now synchronize all tasks with the R mutex // until we can lock more precisely @@ -1128,11 +1133,10 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { return arrow::Status::OK(); }; - SEXP x = flatten_lst[j]; - if (Rf_inherits(x, "data.frame")) { - delayed_serial_tasks.push_back(std::move(task)); - } else { + if (converter->Parallel()) { parallel_tasks->Append(task); + } else { + delayed_serial_tasks.push_back(std::move(task)); } } } From 3170b40354dfbb9f962aae6a76f9f46521738261 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 24 Mar 2021 11:49:42 +0100 Subject: [PATCH 09/33] RPrimitiveConverter does parallel, unless the SEXP is altrep, in which case this needs to lock the r mutex --- r/src/r_to_arrow.cpp | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 98bdd6b5327..03922a47f2d 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -50,6 +50,11 @@ using internal::MakeConverter; namespace r { +std::mutex& get_r_mutex() { + static std::mutex m; + return m; +} + struct RConversionOptions { RConversionOptions() = default; @@ -218,7 +223,7 @@ class RConverter : public Converter { return Status::NotImplemented("ExtendMasked"); } - virtual bool Parallel() { return true; } + virtual bool Parallel() { return false; } }; template @@ -316,6 +321,8 @@ class RPrimitiveConverter> Status Extend(SEXP, int64_t size) override { return this->primitive_builder_->AppendNulls(size); } + + bool Parallel() override { return true; } }; template @@ -342,6 +349,8 @@ class RPrimitiveConverter< return Status::Invalid("cannot convert"); } + bool Parallel() override { return true; } + private: template Status AppendRangeLoopDifferentType(SEXP x, int64_t size) { @@ -391,6 +400,8 @@ class RPrimitiveConverter< template Status AppendRangeSameTypeALTREP(SEXP x, int64_t size) { + std::lock_guard lock(arrow::r::get_r_mutex()); + // if it is altrep, then we use cpp11 looping // without needing to convert RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); @@ -1047,11 +1058,6 @@ std::shared_ptr vec_to_arrow(SEXP x, return ValueOrStop(converter->ToArray()); } -std::mutex& get_r_mutex() { - static std::mutex m; - return m; -} - } // namespace r } // namespace arrow @@ -1117,16 +1123,9 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { auto& converter = converters[j]; if (converter != nullptr) { auto task = [j, &converters, &flatten_lst, &columns] { - // for now synchronize all tasks with the R mutex - // until we can lock more precisely - // - // so for now, they don't run in parallel - std::lock_guard lock(arrow::r::get_r_mutex()); - auto& converter = converters[j]; SEXP x = flatten_lst[j]; - RETURN_NOT_OK(converter->Extend(x, converter->options().size)); ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); columns[j] = std::make_shared(array); From f65d58579c13e120353e29bef727ae16a7001b58 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 7 Apr 2021 14:40:45 +0200 Subject: [PATCH 10/33] rebase --- r/src/r_to_arrow.cpp | 38 +++++++++++++++++++++++++++++++++++--- r/src/table.cpp | 15 --------------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 03922a47f2d..f90d0652ab2 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1013,7 +1013,7 @@ Status vector_to_Array(SEXP x, const std::shared_ptr& type, RConversionOptions options; options.strict = !type_inferred; options.type = type; - options.size = vctrs::short_vec_size(x); + options.size = vctrs::vec_size(x); // maybe short circuit when zero-copy is possible if (can_reuse_memory(x, options.type)) { @@ -1061,6 +1061,21 @@ std::shared_ptr vec_to_arrow(SEXP x, } // namespace r } // namespace arrow +arrow::Status check_consistent_column_length( + const std::vector>& columns) { + if (columns.size()) { + int64_t num_rows = columns[0]->length(); + + for (const auto& column : columns) { + if (column->length() != num_rows) { + return arrow::Status::Invalid("All columns must have the same length"); + } + } + } + + return arrow::Status::OK(); +} + // [[arrow::export]] std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); @@ -1073,9 +1088,25 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { StopIfNotOk(arrow::r::InferSchemaFromDots(lst, schema_sxp, num_fields, schema)); StopIfNotOk(arrow::r::AddMetadataFromDots(lst, num_fields, schema)); + if (!infer_schema && schema->num_fields() != num_fields) { + cpp11::stop("incompatible. schema has %d fields, and %d columns are supplied", + schema->num_fields(), num_fields); + } + // table std::vector> columns(num_fields); + if (!infer_schema) { + auto check_name = [&](int j, SEXP, cpp11::r_string name) { + std::string cpp_name(name); + if (schema->field(j)->name() != cpp_name) { + cpp11::stop("field at index %d has name '%s' != '%s'", j + 1, + schema->field(j)->name().c_str(), cpp_name.c_str()); + } + }; + arrow::r::TraverseDots(lst, num_fields, check_name); + } + auto parallel_tasks = arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); std::vector> delayed_serial_tasks; @@ -1083,7 +1114,6 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::Status status = arrow::Status::OK(); auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); - std::vector> converters(num_fields); // init converters @@ -1099,7 +1129,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::r::RConversionOptions options; options.strict = !infer_schema; options.type = schema->field(j)->type(); - options.size = vctrs::short_vec_size(x); + options.size = vctrs::vec_size(x); // maybe short circuit when zero-copy is possible if (arrow::r::can_reuse_memory(x, options.type)) { @@ -1145,6 +1175,8 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { } status &= parallel_tasks->Finish(); + status &= check_consistent_column_length(columns); + StopIfNotOk(status); return arrow::Table::Make(schema, columns); diff --git a/r/src/table.cpp b/r/src/table.cpp index b6e9ed81479..68adefcfd4a 100644 --- a/r/src/table.cpp +++ b/r/src/table.cpp @@ -150,21 +150,6 @@ std::shared_ptr Table__SelectColumns( namespace arrow { namespace r { -arrow::Status check_consistent_column_length( - const std::vector>& columns) { - if (columns.size()) { - int64_t num_rows = columns[0]->length(); - - for (const auto& column : columns) { - if (column->length() != num_rows) { - return arrow::Status::Invalid("All columns must have the same length"); - } - } - } - - return arrow::Status::OK(); -} - arrow::Status InferSchemaFromDots(SEXP lst, SEXP schema_sxp, int num_fields, std::shared_ptr& schema) { // maybe a schema was given From 11bfbf28cf6311e73a97dd58573914ab13efdf5c Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 26 Apr 2021 09:53:15 +0200 Subject: [PATCH 11/33] simplify --- r/src/r_to_arrow.cpp | 97 ++++++++++++++++---------------------------- 1 file changed, 36 insertions(+), 61 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index f90d0652ab2..b079fab72ac 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -353,84 +353,59 @@ class RPrimitiveConverter< private: template - Status AppendRangeLoopDifferentType(SEXP x, int64_t size) { - RETURN_NOT_OK(this->Reserve(size)); - - auto append_value = [this](r_value_type value) { - ARROW_ASSIGN_OR_RAISE(auto converted, - RConvert::Convert(this->primitive_type_, value)); - this->primitive_builder_->UnsafeAppend(converted); - return Status::OK(); - }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); - } - - template - Status AppendRangeSameTypeNotALTREP(SEXP x, int64_t size) { - auto p = reinterpret_cast(DATAPTR_RO(x)); - auto p_end = p + size; + Status AppendRangeDispatch(SEXP x, int64_t size) { + bool altrep = ALTREP(x); - auto first_na = std::find_if(p, p_end, is_NA); + if (altrep) { + // `x` is an ALTREP R vector storing `r_value_type` + // and that type matches exactly the type of the array this is building - if (first_na == p_end) { - // no nulls, so we can use AppendValues() directly - return this->primitive_builder_->AppendValues(p, p_end); - } + // constructing `vec` needs to protect `x` so we need locking + std::lock_guard lock(arrow::r::get_r_mutex()); - // Append all values up until the first NULL - RETURN_NOT_OK(this->primitive_builder_->AppendValues(p, first_na)); + typename RVectorVisitor::r_vector_type vec(x); + auto it = vec.begin(); + auto get_r_value = [](decltype(*it) value) { + return RVectorVisitor::GetValue(value); + }; - // loop for the remaining - RETURN_NOT_OK(this->primitive_builder_->Reserve(p_end - first_na)); - p = first_na; - for (; p < p_end; ++p) { - r_value_type value = *p; - if (is_NA(value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - this->primitive_builder_->UnsafeAppend(value); - } + return AppendRange_Iterate( + it, size, get_r_value); + } else { + // `x` is not an ALTREP vector so we have direct access to a range of values + auto it = reinterpret_cast(DATAPTR_RO(x)); + auto get_r_value = [](r_value_type value) { return value; }; + return AppendRange_Iterate( + it, size, get_r_value); } - return Status::OK(); } - template - Status AppendRangeSameTypeALTREP(SEXP x, int64_t size) { - std::lock_guard lock(arrow::r::get_r_mutex()); + template + Status AppendRange_Iterate(Iterator it, int64_t size, GetValue get_r_value) { + constexpr bool needs_conversion = + !std::is_same::value; - // if it is altrep, then we use cpp11 looping - // without needing to convert RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); - typename RVectorVisitor::r_vector_type vec(x); - auto it = vec.begin(); + for (R_xlen_t i = 0; i < size; i++, ++it) { - r_value_type value = RVectorVisitor::GetValue(*it); + // we need the get_r_value abstraction because of int64 vectors + // which don't have their cpp11:: vectors + r_value_type value = get_r_value(*it); + if (is_NA(value)) { this->primitive_builder_->UnsafeAppendNull(); } else { - this->primitive_builder_->UnsafeAppend(value); + if (needs_conversion) { + ARROW_ASSIGN_OR_RAISE(auto converted, + RConvert::Convert(this->primitive_type_, value)); + this->primitive_builder_->UnsafeAppend(converted); + } else { + this->primitive_builder_->UnsafeAppend(value); + } } } return Status::OK(); } - - template - Status AppendRangeDispatch(SEXP x, int64_t size) { - if (std::is_same::value) { - if (!ALTREP(x)) { - return AppendRangeSameTypeNotALTREP(x, size); - } else { - return AppendRangeSameTypeALTREP(x, size); - } - } - - // here if underlying types differ so going - return AppendRangeLoopDifferentType(x, size); - } }; template From bfe4bc6fe254bf3e905b049e32e5721b49164d7a Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 26 Apr 2021 14:52:05 +0200 Subject: [PATCH 12/33] RPrimitiveConverter marked as parallel, using RVectorIterator*<> --- r/src/r_to_arrow.cpp | 141 ++++++++++++++++++++++++++++++++----------- 1 file changed, 106 insertions(+), 35 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index b079fab72ac..aef9b926e87 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -176,6 +176,74 @@ bool is_NA(int64_t value) { return value == NA_INT64; } +template +class RVectorIterator { + public: + using value_type = T; + RVectorIterator(SEXP x, int64_t start) + : ptr_x_(reinterpret_cast(DATAPTR_RO(x)) + start) {} + + RVectorIterator& operator++() { + ++ptr_x_; + return *this; + } + + const T operator*() const { return *ptr_x_; } + + private: + const T* ptr_x_; +}; + +template +class RVectorIterator_ALTREP { + public: + using value_type = T; + using data_type = + typename std::conditional::value, double, T>::type; + using r_vector_type = cpp11::r_vector; + using r_vector_iterator = typename r_vector_type::const_iterator; + + RVectorIterator_ALTREP(SEXP x, int64_t start) + : vector_(x), it_(vector_.begin() + start) {} + + RVectorIterator_ALTREP& operator++() { + ++it_; + return *this; + } + + const T operator*() const { return GetValue(*it_); } + + static T GetValue(data_type x) { return x; } + + private: + r_vector_type vector_; + r_vector_iterator it_; +}; + +template <> +int64_t RVectorIterator_ALTREP::GetValue(double x) { + int64_t value; + memcpy(&value, &x, sizeof(int64_t)); + return value; +} + +template +Status VisitVector(SEXP x, int64_t start, int64_t n, AppendNull&& append_null, + AppendValue&& append_value) { + Iterator it(x, start); + for (R_xlen_t i = 0; i < n; i++, ++it) { + auto value = *it; + + if (is_NA(value)) { + RETURN_NOT_OK(append_null()); + } else { + RETURN_NOT_OK(append_value(value)); + } + } + + return Status::OK(); +} + template struct RVectorVisitor { using data_type = @@ -325,6 +393,7 @@ class RPrimitiveConverter> bool Parallel() override { return true; } }; +// TODO: extend this to BooleanType, but this needs some work in RConvert template class RPrimitiveConverter< T, enable_if_t::value || is_floating_type::value>> @@ -334,13 +403,13 @@ class RPrimitiveConverter< auto rtype = GetVectorType(x); switch (rtype) { case UINT8: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case INT32: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case FLOAT64: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); case INT64: - return AppendRangeDispatch(x, size); + return ExtendDispatch(x, size); default: break; @@ -353,44 +422,30 @@ class RPrimitiveConverter< private: template - Status AppendRangeDispatch(SEXP x, int64_t size) { - bool altrep = ALTREP(x); - - if (altrep) { + Status ExtendDispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { // `x` is an ALTREP R vector storing `r_value_type` // and that type matches exactly the type of the array this is building // constructing `vec` needs to protect `x` so we need locking std::lock_guard lock(arrow::r::get_r_mutex()); - - typename RVectorVisitor::r_vector_type vec(x); - auto it = vec.begin(); - auto get_r_value = [](decltype(*it) value) { - return RVectorVisitor::GetValue(value); - }; - - return AppendRange_Iterate( - it, size, get_r_value); + return Extend_impl(RVectorIterator_ALTREP(x, 0), size); } else { // `x` is not an ALTREP vector so we have direct access to a range of values - auto it = reinterpret_cast(DATAPTR_RO(x)); - auto get_r_value = [](r_value_type value) { return value; }; - return AppendRange_Iterate( - it, size, get_r_value); + return Extend_impl(RVectorIterator(x, 0), size); } } - template - Status AppendRange_Iterate(Iterator it, int64_t size, GetValue get_r_value) { + template + Status Extend_impl(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; constexpr bool needs_conversion = !std::is_same::value; RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); for (R_xlen_t i = 0; i < size; i++, ++it) { - // we need the get_r_value abstraction because of int64 vectors - // which don't have their cpp11:: vectors - r_value_type value = get_r_value(*it); + r_value_type value = *it; if (is_NA(value)) { this->primitive_builder_->UnsafeAppendNull(); @@ -417,17 +472,33 @@ class RPrimitiveConverter::value>> if (rtype != BOOLEAN) { return Status::Invalid("Expecting a logical vector"); } + + if (ALTREP(x)) { + std::lock_guard lock(arrow::r::get_r_mutex()); + return Extend_impl(RVectorIterator_ALTREP(x, 0), size); + } else { + return Extend_impl(RVectorIterator(x, 0), size); + } + } + + bool Parallel() override { return true; } + + private: + template + Status Extend_impl(Iterator it, int64_t size) { RETURN_NOT_OK(this->Reserve(size)); - auto append_value = [this](cpp11::r_bool value) { - this->primitive_builder_->UnsafeAppend(value == 1); - return Status::OK(); - }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + for (R_xlen_t i = 0; i < size; i++, ++it) { + cpp11::r_bool value = *it; + + if (is_NA(value)) { + this->primitive_builder_->UnsafeAppendNull(); + } else { + this->primitive_builder_->UnsafeAppend(value == 1); + } + } + + return Status::OK(); } }; From fdd3611c785f878379cbf22577cd4649f5bf038d Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 26 Apr 2021 17:16:23 +0200 Subject: [PATCH 13/33] use VisitVector() function --- r/src/r_to_arrow.cpp | 226 ++++++++++++++++++++++++------------------- 1 file changed, 126 insertions(+), 100 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index aef9b926e87..fc03799225d 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -227,14 +227,13 @@ int64_t RVectorIterator_ALTREP::GetValue(double x) { return value; } -template -Status VisitVector(SEXP x, int64_t start, int64_t n, AppendNull&& append_null, +template +Status VisitVector(Iterator it, int64_t n, AppendNull&& append_null, AppendValue&& append_value) { - Iterator it(x, start); for (R_xlen_t i = 0; i < n; i++, ++it) { auto value = *it; - if (is_NA(value)) { + if (is_NA(value)) { RETURN_NOT_OK(append_null()); } else { RETURN_NOT_OK(append_value(value)); @@ -244,41 +243,6 @@ Status VisitVector(SEXP x, int64_t start, int64_t n, AppendNull&& append_null, return Status::OK(); } -template -struct RVectorVisitor { - using data_type = - typename std::conditional::value, double, T>::type; - using r_vector_type = cpp11::r_vector; - - template - static Status Visit(SEXP x, int64_t size, AppendNull&& append_null, - AppendValue&& append_value) { - r_vector_type values(x); - auto it = values.begin(); - - for (R_xlen_t i = 0; i < size; i++, ++it) { - auto value = GetValue(*it); - - if (is_NA(value)) { - RETURN_NOT_OK(append_null()); - } else { - RETURN_NOT_OK(append_value(value)); - } - } - - return Status::OK(); - } - - static T GetValue(data_type x) { return x; } -}; - -template <> -int64_t RVectorVisitor::GetValue(double x) { - int64_t value; - memcpy(&value, &x, sizeof(int64_t)); - return value; -} - class RConverter : public Converter { public: virtual Status Append(SEXP) { return Status::NotImplemented("Append"); } @@ -439,27 +403,28 @@ class RPrimitiveConverter< template Status Extend_impl(Iterator it, int64_t size) { using r_value_type = typename Iterator::value_type; - constexpr bool needs_conversion = - !std::is_same::value; - RETURN_NOT_OK(this->primitive_builder_->Reserve(size)); - for (R_xlen_t i = 0; i < size; i++, ++it) { - r_value_type value = *it; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; - if (is_NA(value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - if (needs_conversion) { - ARROW_ASSIGN_OR_RAISE(auto converted, - RConvert::Convert(this->primitive_type_, value)); - this->primitive_builder_->UnsafeAppend(converted); - } else { - this->primitive_builder_->UnsafeAppend(value); - } - } + if (std::is_same::value) { + auto append_value = [this](r_value_type value) { + this->primitive_builder_->UnsafeAppend(value); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); + } else { + auto append_value = [this](r_value_type value) { + ARROW_ASSIGN_OR_RAISE(auto converted, + RConvert::Convert(this->primitive_type_, value)); + this->primitive_builder_->UnsafeAppend(converted); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); } - return Status::OK(); } }; @@ -488,17 +453,15 @@ class RPrimitiveConverter::value>> Status Extend_impl(Iterator it, int64_t size) { RETURN_NOT_OK(this->Reserve(size)); - for (R_xlen_t i = 0; i < size; i++, ++it) { - cpp11::r_bool value = *it; - - if (is_NA(value)) { - this->primitive_builder_->UnsafeAppendNull(); - } else { - this->primitive_builder_->UnsafeAppend(value == 1); - } - } - - return Status::OK(); + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](cpp11::r_bool value) { + this->primitive_builder_->UnsafeAppend(value == 1); + return Status::OK(); + }; + return VisitVector(it, size, append_null, append_value); } }; @@ -507,17 +470,15 @@ class RPrimitiveConverter::value>> : public PrimitiveConverter { public: Status Extend(SEXP x, int64_t size) override { - RETURN_NOT_OK(this->Reserve(size)); - switch (GetVectorType(x)) { case DATE_INT: - return AppendRange_Date(x, size); + return AppendRange_Date_dispatch(x, size); case DATE_DBL: - return AppendRange_Date(x, size); + return AppendRange_Date_dispatch(x, size); case POSIXCT: - return AppendRange_Posixct(x, size); + return AppendRange_Posixct_dispatch(x, size); default: break; @@ -526,9 +487,24 @@ class RPrimitiveConverter::value>> return Status::Invalid("cannot convert to date type "); } + bool Parallel() override { return true; } + private: template - Status AppendRange_Date(SEXP x, int64_t size) { + Status AppendRange_Date_dispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { + std::lock_guard lock(arrow::r::get_r_mutex()); + return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); + } else { + return AppendRange_Date(RVectorIterator(x, 0), size); + } + } + + template + Status AppendRange_Date(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; + RETURN_NOT_OK(this->Reserve(size)); + auto append_null = [this]() { this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); @@ -537,21 +513,32 @@ class RPrimitiveConverter::value>> this->primitive_builder_->UnsafeAppend(FromRDate(this->primitive_type_, value)); return Status::OK(); }; + return VisitVector(it, size, append_null, append_value); + } - return RVectorVisitor::Visit(x, size, append_null, append_value); + Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { + if (ALTREP(x)) { + std::lock_guard lock(arrow::r::get_r_mutex()); + return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); + } else { + return AppendRange_Posixct(RVectorIterator(x, 0), size); + } } - Status AppendRange_Posixct(SEXP x, int64_t size) { + template + Status AppendRange_Posixct(Iterator it, int64_t size) { + using r_value_type = typename Iterator::value_type; + RETURN_NOT_OK(this->Reserve(size)); + auto append_null = [this]() { this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); }; - auto append_value = [this](double value) { + auto append_value = [this](r_value_type value) { this->primitive_builder_->UnsafeAppend(FromPosixct(this->primitive_type_, value)); return Status::OK(); }; - - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(it, size, append_null, append_value); } static int FromRDate(const Date32Type*, int from) { return from; } @@ -616,17 +603,26 @@ class RPrimitiveConverter::value>> auto multiplier = get_TimeUnit_multiplier(this->primitive_type_->unit()) * difftime_multiplier; + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; auto append_value = [this, multiplier](double value) { auto converted = static_cast(value * multiplier); this->primitive_builder_->UnsafeAppend(converted); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + if (ALTREP(x)) { + std::lock_guard lock(arrow::r::get_r_mutex()); + return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + append_value); + } else { + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } } + + bool Parallel() override { return true; } }; template @@ -652,8 +648,17 @@ class RPrimitiveConverter::value>> this->primitive_builder_->UnsafeAppendNull(); return Status::OK(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + if (ALTREP(x)) { + std::lock_guard lock(arrow::r::get_r_mutex()); + return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, + append_value); + } else { + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + } } + + bool Parallel() override { return true; } }; template @@ -696,18 +701,21 @@ class RPrimitiveConverter> RETURN_NOT_OK(this->Reserve(size)); RETURN_NOT_OK(check_binary(x, size)); + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](SEXP raw) { R_xlen_t n = XLENGTH(raw); ARROW_RETURN_NOT_OK(this->primitive_builder_->ReserveData(n)); this->primitive_builder_->UnsafeAppend(RAW_RO(raw), static_cast(n)); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } + + bool Parallel() override { return true; } }; template @@ -718,6 +726,11 @@ class RPrimitiveConverter::v RETURN_NOT_OK(this->Reserve(size)); RETURN_NOT_OK(check_binary(x, size)); + auto append_null = [this]() { + this->primitive_builder_->UnsafeAppendNull(); + return Status::OK(); + }; + auto append_value = [this](SEXP raw) { R_xlen_t n = XLENGTH(raw); @@ -728,12 +741,10 @@ class RPrimitiveConverter::v this->primitive_builder_->UnsafeAppend(RAW_RO(raw)); return Status::OK(); }; - auto append_null = [this]() { - this->primitive_builder_->UnsafeAppendNull(); - return Status::OK(); - }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } + + bool Parallel() override { return true; } }; template @@ -742,6 +753,9 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; + // TODO: reconsider, but e.g. needed for arrow::r::utf8_strings() + bool Parallel() override { return false; } + Status Extend(SEXP x, int64_t size) override { int64_t start = 0; RVectorType rtype = GetVectorType(x); @@ -760,6 +774,7 @@ class RPrimitiveConverter> total_length += cpp11::is_na(si) ? 0 : si.size(); } RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); + // append it = s.begin() + start; for (R_xlen_t i = 0; i < size; i++, ++it) { @@ -806,6 +821,9 @@ class RDictionaryConverter> public: using BuilderType = DictionaryBuilder; + // TODO: reconsider + bool Parallel() override { return false; } + Status Extend(SEXP x, int64_t size) override { // first we need to handle the levels cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol)); @@ -820,12 +838,14 @@ class RDictionaryConverter> return Status::Invalid("invalid R type to convert to dictionary"); } + auto append_null = [this]() { return this->value_builder_->AppendNull(); }; + auto append_value = [this, levels](int value) { SEXP s = STRING_ELT(levels, value - 1); return this->value_builder_->Append(CHAR(s)); }; - auto append_null = [this]() { return this->value_builder_->AppendNull(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } Result> ToArray() override { @@ -861,6 +881,9 @@ struct RConverterTrait> { template class RListConverter : public ListConverter { public: + // TODO: reconsider + bool Parallel() override { return false; } + Status Extend(SEXP x, int64_t size) override { RETURN_NOT_OK(this->Reserve(size)); @@ -869,6 +892,8 @@ class RListConverter : public ListConverter { return Status::Invalid("Cannot convert to list type"); } + auto append_null = [this]() { return this->list_builder_->AppendNull(); }; + auto append_value = [this](SEXP value) { int n = vctrs::vec_size(value); @@ -876,8 +901,8 @@ class RListConverter : public ListConverter { RETURN_NOT_OK(this->list_builder_->Append()); return this->value_converter_.get()->Extend(value, n); }; - auto append_null = [this]() { return this->list_builder_->AppendNull(); }; - return RVectorVisitor::Visit(x, size, append_null, append_value); + + return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } }; @@ -890,6 +915,9 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: + // TODO: reconsider + bool Parallel() override { return false; } + Status Extend(SEXP x, int64_t size) override { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -945,8 +973,6 @@ class RStructConverter : public StructConverter { return Status::OK(); } - bool Parallel() override { return false; } - protected: Status Init(MemoryPool* pool) override { return StructConverter::Init(pool); From b993e550df3fab9aa7993082fa0a5e58f462fe7a Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 27 Apr 2021 10:12:59 +0200 Subject: [PATCH 14/33] factor out R task maganement in dedicated RTasks class --- r/src/r_to_arrow.cpp | 51 +++++++++++++++++++++++++++++++++----------- 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index fc03799225d..8d4e493876f 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1130,6 +1130,41 @@ std::shared_ptr vec_to_arrow(SEXP x, return ValueOrStop(converter->ToArray()); } +class RTasks { + public: + using Task = std::function; + + RTasks() + : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool())){}; + + Status Finish() { + Status status = Status::OK(); + + // run the delayed tasks now + for (auto& task : delayed_serial_tasks) { + status &= task(); + if (!status.ok()) break; + } + + // then wait for the parallel tasks to finish + status &= parallel_tasks->Finish(); + + return status; + } + + void Append(bool parallel, Task&& task) { + if (parallel) { + parallel_tasks->Append(task); + } else { + delayed_serial_tasks.push_back(std::move(task)); + } + } + + std::shared_ptr parallel_tasks; + std::vector> delayed_serial_tasks; +}; + } // namespace r } // namespace arrow @@ -1179,9 +1214,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::r::TraverseDots(lst, num_fields, check_name); } - auto parallel_tasks = - arrow::internal::TaskGroup::MakeThreaded(arrow::internal::GetCpuThreadPool()); - std::vector> delayed_serial_tasks; + arrow::r::RTasks tasks; arrow::Status status = arrow::Status::OK(); @@ -1234,19 +1267,11 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { return arrow::Status::OK(); }; - if (converter->Parallel()) { - parallel_tasks->Append(task); - } else { - delayed_serial_tasks.push_back(std::move(task)); - } + tasks.Append(converter->Parallel(), std::move(task)); } } - for (auto& task : delayed_serial_tasks) { - status &= task(); - } - - status &= parallel_tasks->Finish(); + status &= tasks.Finish(); status &= check_consistent_column_length(columns); StopIfNotOk(status); From 2cf52086e6b721b8746a3cb2c8815a64ea79af7a Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 27 Apr 2021 10:45:21 +0200 Subject: [PATCH 15/33] +RConverter::DelayedExtend() --- r/src/r_to_arrow.cpp | 99 +++++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 8d4e493876f..7df5b3fde41 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -50,6 +50,41 @@ using internal::MakeConverter; namespace r { +class RTasks { + public: + using Task = std::function; + + RTasks() + : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool())) {} + + Status Finish() { + Status status = Status::OK(); + + // run the delayed tasks now + for (auto& task : delayed_serial_tasks) { + status &= task(); + if (!status.ok()) break; + } + + // then wait for the parallel tasks to finish + status &= parallel_tasks->Finish(); + + return status; + } + + void Append(bool parallel, Task&& task) { + if (parallel) { + parallel_tasks->Append(task); + } else { + delayed_serial_tasks.push_back(std::move(task)); + } + } + + std::shared_ptr parallel_tasks; + std::vector> delayed_serial_tasks; +}; + std::mutex& get_r_mutex() { static std::mutex m; return m; @@ -251,6 +286,12 @@ class RConverter : public Converter { return Status::NotImplemented("Extend"); } + virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) { + // by default, just delay the ->Extend(), i.e. not run in parallel + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(false, task); + } + virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { return Status::NotImplemented("ExtendMasked"); } @@ -1130,41 +1171,6 @@ std::shared_ptr vec_to_arrow(SEXP x, return ValueOrStop(converter->ToArray()); } -class RTasks { - public: - using Task = std::function; - - RTasks() - : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool())){}; - - Status Finish() { - Status status = Status::OK(); - - // run the delayed tasks now - for (auto& task : delayed_serial_tasks) { - status &= task(); - if (!status.ok()) break; - } - - // then wait for the parallel tasks to finish - status &= parallel_tasks->Finish(); - - return status; - } - - void Append(bool parallel, Task&& task) { - if (parallel) { - parallel_tasks->Append(task); - } else { - delayed_serial_tasks.push_back(std::move(task)); - } - } - - std::shared_ptr parallel_tasks; - std::vector> delayed_serial_tasks; -}; - } // namespace r } // namespace arrow @@ -1257,23 +1263,22 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { for (int j = 0; j < num_fields; j++) { auto& converter = converters[j]; if (converter != nullptr) { - auto task = [j, &converters, &flatten_lst, &columns] { - auto& converter = converters[j]; - - SEXP x = flatten_lst[j]; - RETURN_NOT_OK(converter->Extend(x, converter->options().size)); - ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); - columns[j] = std::make_shared(array); - return arrow::Status::OK(); - }; - - tasks.Append(converter->Parallel(), std::move(task)); + converter->DelayedExtend(flatten_lst[j], converter->options().size, tasks); } } status &= tasks.Finish(); - status &= check_consistent_column_length(columns); + for (int j = 0; j < num_fields; j++) { + auto& converter = converters[j]; + if (converter != nullptr) { + auto maybe_array = converter->ToArray(); + StopIfNotOk(maybe_array.status()); + columns[j] = std::make_shared(maybe_array.ValueUnsafe()); + } + } + + status &= check_consistent_column_length(columns); StopIfNotOk(status); return arrow::Table::Make(schema, columns); From ad7b384e2a2726addcfb3e75cf18dbbab9ef6186 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 27 Apr 2021 11:16:15 +0200 Subject: [PATCH 16/33] initial implementations of DelayedExtend() and removing get_r_mutex(), locking was too fragile and not flexible enough --- r/src/r_to_arrow.cpp | 91 ++++++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 37 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 7df5b3fde41..27e42eb57d2 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -85,11 +85,6 @@ class RTasks { std::vector> delayed_serial_tasks; }; -std::mutex& get_r_mutex() { - static std::mutex m; - return m; -} - struct RConversionOptions { RConversionOptions() = default; @@ -286,8 +281,9 @@ class RConverter : public Converter { return Status::NotImplemented("Extend"); } + // by default, just delay the ->Extend(), i.e. not run in parallel + // implementations might redefine so that ->Extend() is run in parallel virtual void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) { - // by default, just delay the ->Extend(), i.e. not run in parallel auto task = [this, values, size]() { return this->Extend(values, size); }; tasks.Append(false, task); } @@ -295,8 +291,6 @@ class RConverter : public Converter { virtual Status ExtendMasked(SEXP values, SEXP mask, int64_t size) { return Status::NotImplemented("ExtendMasked"); } - - virtual bool Parallel() { return false; } }; template @@ -394,8 +388,6 @@ class RPrimitiveConverter> Status Extend(SEXP, int64_t size) override { return this->primitive_builder_->AppendNulls(size); } - - bool Parallel() override { return true; } }; // TODO: extend this to BooleanType, but this needs some work in RConvert @@ -423,7 +415,10 @@ class RPrimitiveConverter< return Status::Invalid("cannot convert"); } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } private: template @@ -431,9 +426,6 @@ class RPrimitiveConverter< if (ALTREP(x)) { // `x` is an ALTREP R vector storing `r_value_type` // and that type matches exactly the type of the array this is building - - // constructing `vec` needs to protect `x` so we need locking - std::lock_guard lock(arrow::r::get_r_mutex()); return Extend_impl(RVectorIterator_ALTREP(x, 0), size); } else { // `x` is not an ALTREP vector so we have direct access to a range of values @@ -480,14 +472,16 @@ class RPrimitiveConverter::value>> } if (ALTREP(x)) { - std::lock_guard lock(arrow::r::get_r_mutex()); return Extend_impl(RVectorIterator_ALTREP(x, 0), size); } else { return Extend_impl(RVectorIterator(x, 0), size); } } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } private: template @@ -528,13 +522,15 @@ class RPrimitiveConverter::value>> return Status::Invalid("cannot convert to date type "); } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } private: template Status AppendRange_Date_dispatch(SEXP x, int64_t size) { if (ALTREP(x)) { - std::lock_guard lock(arrow::r::get_r_mutex()); return AppendRange_Date(RVectorIterator_ALTREP(x, 0), size); } else { return AppendRange_Date(RVectorIterator(x, 0), size); @@ -559,7 +555,6 @@ class RPrimitiveConverter::value>> Status AppendRange_Posixct_dispatch(SEXP x, int64_t size) { if (ALTREP(x)) { - std::lock_guard lock(arrow::r::get_r_mutex()); return AppendRange_Posixct(RVectorIterator_ALTREP(x, 0), size); } else { return AppendRange_Posixct(RVectorIterator(x, 0), size); @@ -655,7 +650,6 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - std::lock_guard lock(arrow::r::get_r_mutex()); return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, append_value); } else { @@ -663,7 +657,10 @@ class RPrimitiveConverter::value>> } } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } }; template @@ -691,7 +688,6 @@ class RPrimitiveConverter::value>> }; if (ALTREP(x)) { - std::lock_guard lock(arrow::r::get_r_mutex()); return VisitVector(RVectorIterator_ALTREP(x, 0), size, append_null, append_value); } else { @@ -699,7 +695,10 @@ class RPrimitiveConverter::value>> } } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(!ALTREP(values), std::move(task)); + } }; template @@ -756,7 +755,10 @@ class RPrimitiveConverter> return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(true, std::move(task)); + } }; template @@ -785,7 +787,10 @@ class RPrimitiveConverter::v return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } - bool Parallel() override { return true; } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + tasks.Append(true, std::move(task)); + } }; template @@ -794,9 +799,6 @@ class RPrimitiveConverter> public: using OffsetType = typename T::offset_type; - // TODO: reconsider, but e.g. needed for arrow::r::utf8_strings() - bool Parallel() override { return false; } - Status Extend(SEXP x, int64_t size) override { int64_t start = 0; RVectorType rtype = GetVectorType(x); @@ -829,6 +831,12 @@ class RPrimitiveConverter> return Status::OK(); } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine this., e.g. extract setup from Extend() + tasks.Append(false, std::move(task)); + } }; template @@ -862,9 +870,6 @@ class RDictionaryConverter> public: using BuilderType = DictionaryBuilder; - // TODO: reconsider - bool Parallel() override { return false; } - Status Extend(SEXP x, int64_t size) override { // first we need to handle the levels cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol)); @@ -889,6 +894,12 @@ class RDictionaryConverter> return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine + tasks.Append(false, std::move(task)); + } + Result> ToArray() override { ARROW_ASSIGN_OR_RAISE(auto result, this->builder_->Finish()); @@ -922,9 +933,6 @@ struct RConverterTrait> { template class RListConverter : public ListConverter { public: - // TODO: reconsider - bool Parallel() override { return false; } - Status Extend(SEXP x, int64_t size) override { RETURN_NOT_OK(this->Reserve(size)); @@ -945,6 +953,12 @@ class RListConverter : public ListConverter { return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine + tasks.Append(false, std::move(task)); + } }; class RStructConverter; @@ -956,9 +970,6 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: - // TODO: reconsider - bool Parallel() override { return false; } - Status Extend(SEXP x, int64_t size) override { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -1014,6 +1025,12 @@ class RStructConverter : public StructConverter { return Status::OK(); } + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine. e.g. do setup and then spawn child tasks + tasks.Append(false, std::move(task)); + } + protected: Status Init(MemoryPool* pool) override { return StructConverter::Init(pool); From 779f89a42f296a117ac9a59a737fba20307cee5d Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 27 Apr 2021 15:46:37 +0200 Subject: [PATCH 17/33] struct and dictionary types running (partly) in parallel --- r/src/r_to_arrow.cpp | 135 +++++++++++++++++++++++++++++-------------- 1 file changed, 92 insertions(+), 43 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 27e42eb57d2..5ce8e44c069 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -871,33 +871,24 @@ class RDictionaryConverter> using BuilderType = DictionaryBuilder; Status Extend(SEXP x, int64_t size) override { - // first we need to handle the levels - cpp11::strings levels(Rf_getAttrib(x, R_LevelsSymbol)); - auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false); - RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); - - // then we can proceed - RETURN_NOT_OK(this->Reserve(size)); - - RVectorType rtype = GetVectorType(x); - if (rtype != FACTOR) { - return Status::Invalid("invalid R type to convert to dictionary"); - } - - auto append_null = [this]() { return this->value_builder_->AppendNull(); }; - - auto append_value = [this, levels](int value) { - SEXP s = STRING_ELT(levels, value - 1); - return this->value_builder_->Append(CHAR(s)); - }; - - return VisitVector(RVectorIterator(x, 0), size, append_null, append_value); + RETURN_NOT_OK(ExtendSetup(x, size)); + return ExtendImpl(x, size, GetCharLevels(x)); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - auto task = [this, values, size]() { return this->Extend(values, size); }; - // TODO: refine - tasks.Append(false, std::move(task)); + // the setup runs synchronously first + Status setup = ExtendSetup(values, size); + + if (!setup.ok()) { + // if that fails, propagate the error + tasks.Append(false, [setup]() { return setup; }); + } else { + auto char_levels = GetCharLevels(values); + + tasks.Append(true, [this, values, size, char_levels]() { + return this->ExtendImpl(values, size, char_levels); + }); + } } Result> ToArray() override { @@ -913,6 +904,44 @@ class RDictionaryConverter> return std::make_shared(result->data()); } + + private: + std::vector GetCharLevels(SEXP x) { + SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); + R_xlen_t n_levels = XLENGTH(levels); + std::vector char_levels(XLENGTH(levels)); + const SEXP* p_levels = reinterpret_cast(DATAPTR_RO(levels)); + for (R_xlen_t i = 0; i < n_levels; i++, ++p_levels) { + char_levels[i] = CHAR(*p_levels); + } + + return char_levels; + } + + Status ExtendSetup(SEXP x, int64_t size) { + RVectorType rtype = GetVectorType(x); + if (rtype != FACTOR) { + return Status::Invalid("invalid R type to convert to dictionary"); + } + + // first we need to handle the levels + SEXP levels = Rf_getAttrib(x, R_LevelsSymbol); + auto memo_array = arrow::r::vec_to_arrow(levels, utf8(), false); + RETURN_NOT_OK(this->value_builder_->InsertMemoValues(*memo_array)); + + // then we can proceed + return this->Reserve(size); + } + + Status ExtendImpl(SEXP values, int64_t size, + const std::vector& char_levels) { + auto append_null = [this]() { return this->value_builder_->AppendNull(); }; + auto append_value = [this, &char_levels](int value) { + return this->value_builder_->Append(char_levels[value - 1]); + }; + + return VisitVector(RVectorIterator(values, 0), size, append_null, append_value); + } }; template @@ -971,6 +1000,45 @@ struct RConverterTrait { class RStructConverter : public StructConverter { public: Status Extend(SEXP x, int64_t size) override { + RETURN_NOT_OK(ExtendSetup(x, size)); + + auto fields = this->struct_type_->fields(); + R_xlen_t n_columns = XLENGTH(x); + for (R_xlen_t i = 0; i < n_columns; i++) { + auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); + if (!status.ok()) { + return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), + "): ", status.ToString()); + } + } + + return Status::OK(); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + // the setup runs synchronously first + Status setup = ExtendSetup(values, size); + + if (!setup.ok()) { + // if that fails, propagate the error + tasks.Append(false, [setup]() { return setup; }); + } else { + // otherwise deal with each column, maybe concurrently + auto fields = this->struct_type_->fields(); + R_xlen_t n_columns = XLENGTH(values); + + for (R_xlen_t i = 0; i < n_columns; i++) { + children_[i]->DelayedExtend(VECTOR_ELT(values, i), size, tasks); + } + } + } + + protected: + Status Init(MemoryPool* pool) override { + return StructConverter::Init(pool); + } + + Status ExtendSetup(SEXP x, int64_t size) { // check that x is compatible R_xlen_t n_columns = XLENGTH(x); @@ -1014,27 +1082,8 @@ class RStructConverter : public StructConverter { RETURN_NOT_OK(struct_builder_->Append()); } - for (R_xlen_t i = 0; i < n_columns; i++) { - auto status = children_[i]->Extend(VECTOR_ELT(x, i), size); - if (!status.ok()) { - return Status::Invalid("Problem with column ", (i + 1), " (", fields[i]->name(), - "): ", status.ToString()); - } - } - return Status::OK(); } - - void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - auto task = [this, values, size]() { return this->Extend(values, size); }; - // TODO: refine. e.g. do setup and then spawn child tasks - tasks.Append(false, std::move(task)); - } - - protected: - Status Init(MemoryPool* pool) override { - return StructConverter::Init(pool); - } }; template <> From 12180767bb7d76dacf942e35ac19a6848d064527 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 28 Apr 2021 14:38:27 +0200 Subject: [PATCH 18/33] some care --- r/src/r_to_arrow.cpp | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 5ce8e44c069..44be4427de8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -757,7 +757,7 @@ class RPrimitiveConverter> void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { auto task = [this, values, size]() { return this->Extend(values, size); }; - tasks.Append(true, std::move(task)); + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -789,7 +789,7 @@ class RPrimitiveConverter::v void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { auto task = [this, values, size]() { return this->Extend(values, size); }; - tasks.Append(true, std::move(task)); + tasks.Append(!ALTREP(values), std::move(task)); } }; @@ -973,6 +973,8 @@ class RListConverter : public ListConverter { auto append_null = [this]() { return this->list_builder_->AppendNull(); }; auto append_value = [this](SEXP value) { + // TODO: if we decide that this can be run concurrently + // we'll have to do vec_size() upfront int n = vctrs::vec_size(value); RETURN_NOT_OK(this->list_builder_->ValidateOverflow(n)); @@ -984,9 +986,12 @@ class RListConverter : public ListConverter { } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - auto task = [this, values, size]() { return this->Extend(values, size); }; - // TODO: refine - tasks.Append(false, std::move(task)); + // NOTE: because Extend::[]append_value() calls Extend() on the + // value converter, which might require a setup step, it feels + // complicated to run this task concurrently. + // + // TODO: perhaps allow running concurrently in some cases, e.g. list(int32(!altrep)) + tasks.Append(false, [this, values, size]() { return this->Extend(values, size); }); } }; From c9c3afbb87f5e0d176427e86298781a81e6e0fc8 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 7 May 2021 11:39:20 +0200 Subject: [PATCH 19/33] using internal::FnOnce<> instead of std::function<> --- r/src/r_to_arrow.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 44be4427de8..ce20f0c5cfa 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -52,7 +52,7 @@ namespace r { class RTasks { public: - using Task = std::function; + using Task = internal::FnOnce; RTasks() : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( @@ -63,7 +63,7 @@ class RTasks { // run the delayed tasks now for (auto& task : delayed_serial_tasks) { - status &= task(); + status &= std::move(task)(); if (!status.ok()) break; } @@ -75,14 +75,14 @@ class RTasks { void Append(bool parallel, Task&& task) { if (parallel) { - parallel_tasks->Append(task); + parallel_tasks->Append(std::move(task)); } else { delayed_serial_tasks.push_back(std::move(task)); } } std::shared_ptr parallel_tasks; - std::vector> delayed_serial_tasks; + std::vector delayed_serial_tasks; }; struct RConversionOptions { From 026241e210160eff6b9f7a96a4d33c29384bb270 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 7 May 2021 14:46:29 +0200 Subject: [PATCH 20/33] comment on RTasks.Finish() --- r/src/r_to_arrow.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index ce20f0c5cfa..ceb948e3fa8 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -58,6 +58,13 @@ class RTasks { : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( arrow::internal::GetCpuThreadPool())) {} + // This Finish() method must never be called from a thread pool thread + // as this would deadlock. + // + // Usage is to : + // - create an RTasks instance on the main thread + // - add some tasks with .Append() + // - and then call .Finish() so that the parallel tasks are finished Status Finish() { Status status = Status::OK(); From 9fb41bdeec4005d57fd6a41c816f11d011aa8f94 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Mon, 10 May 2021 11:20:26 +0200 Subject: [PATCH 21/33] trickle down options.use_threads() --- r/R/arrowExports.R | 5 +++++ r/R/table.R | 2 +- r/src/arrowExports.cpp | 13 +++++++++---- r/src/csv.cpp | 1 - r/src/r_to_arrow.cpp | 30 ++++++++++++++++++------------ 5 files changed, 33 insertions(+), 18 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index a10527f5675..e492c92e019 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1360,6 +1360,7 @@ ExportRecordBatch <- function(batch, array_ptr, schema_ptr){ invisible(.Call(`_arrow_ExportRecordBatch`, batch, array_ptr, schema_ptr)) } +<<<<<<< HEAD <<<<<<< HEAD ExportRecordBatchReader <- function(reader, stream_ptr){ invisible(.Call(`_arrow_ExportRecordBatchReader`, reader, stream_ptr)) @@ -1367,6 +1368,10 @@ ExportRecordBatchReader <- function(reader, stream_ptr){ Table__from_dots <- function(lst, schema_sxp){ .Call(`_arrow_Table__from_dots`, lst, schema_sxp) >>>>>>> e8b766472 (towards a parallel Table__from_dots()) +======= +Table__from_dots <- function(lst, schema_sxp, use_threads){ + .Call(`_arrow_Table__from_dots`, lst, schema_sxp, use_threads) +>>>>>>> f2ef4f637 (trickle down options.use_threads()) } vec_to_arrow <- function(x, s_type){ diff --git a/r/R/table.R b/r/R/table.R index 2c432ac8983..2db836d78c9 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -178,7 +178,7 @@ Table$create <- function(..., schema = NULL) { if (all_record_batches(dots)) { Table__from_record_batches(dots, schema) } else { - Table__from_dots(dots, schema) + Table__from_dots(dots, schema, option_use_threads()) } } diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 94b45434501..f7ff8313322 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -5354,16 +5354,17 @@ extern "C" SEXP _arrow_ExportRecordBatchReader(SEXP reader_sexp, SEXP stream_ptr // r_to_arrow.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp); -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, bool use_threads); +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp, SEXP use_threads_sexp){ BEGIN_CPP11 arrow::r::Input::type lst(lst_sexp); arrow::r::Input::type schema_sxp(schema_sxp_sexp); - return cpp11::as_sexp(Table__from_dots(lst, schema_sxp)); + arrow::r::Input::type use_threads(use_threads_sexp); + return cpp11::as_sexp(Table__from_dots(lst, schema_sxp, use_threads)); END_CPP11 } #else -extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp){ +extern "C" SEXP _arrow_Table__from_dots(SEXP lst_sexp, SEXP schema_sxp_sexp, SEXP use_threads_sexp){ Rf_error("Cannot call Table__from_dots(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); } #endif @@ -7164,11 +7165,15 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExportSchema", (DL_FUNC) &_arrow_ExportSchema, 2}, { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, { "_arrow_ExportRecordBatch", (DL_FUNC) &_arrow_ExportRecordBatch, 3}, +<<<<<<< HEAD <<<<<<< HEAD { "_arrow_ExportRecordBatchReader", (DL_FUNC) &_arrow_ExportRecordBatchReader, 2}, ======= { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, >>>>>>> e8b766472 (towards a parallel Table__from_dots()) +======= + { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 3}, +>>>>>>> f2ef4f637 (trickle down options.use_threads()) { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, diff --git a/r/src/csv.cpp b/r/src/csv.cpp index 3df5db87efa..a8d2256cfe3 100644 --- a/r/src/csv.cpp +++ b/r/src/csv.cpp @@ -22,7 +22,6 @@ #include #include #include - #include // [[arrow::export]] diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index ceb948e3fa8..2cb74150226 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -54,9 +54,11 @@ class RTasks { public: using Task = internal::FnOnce; - RTasks() - : parallel_tasks(arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool())) {} + explicit RTasks(bool use_threads) + : use_threads_(use_threads), + parallel_tasks_(use_threads ? arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool()) + : nullptr) {} // This Finish() method must never be called from a thread pool thread // as this would deadlock. @@ -69,27 +71,30 @@ class RTasks { Status status = Status::OK(); // run the delayed tasks now - for (auto& task : delayed_serial_tasks) { + for (auto& task : delayed_serial_tasks_) { status &= std::move(task)(); if (!status.ok()) break; } // then wait for the parallel tasks to finish - status &= parallel_tasks->Finish(); + if (use_threads_) { + status &= parallel_tasks_->Finish(); + } return status; } void Append(bool parallel, Task&& task) { - if (parallel) { - parallel_tasks->Append(std::move(task)); + if (parallel && use_threads_) { + parallel_tasks_->Append(std::move(task)); } else { - delayed_serial_tasks.push_back(std::move(task)); + delayed_serial_tasks_.push_back(std::move(task)); } } - std::shared_ptr parallel_tasks; - std::vector delayed_serial_tasks; + bool use_threads_; + std::shared_ptr parallel_tasks_; + std::vector delayed_serial_tasks_; }; struct RConversionOptions { @@ -1268,7 +1273,8 @@ arrow::Status check_consistent_column_length( } // [[arrow::export]] -std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { +std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, + bool use_threads) { bool infer_schema = !Rf_inherits(schema_sxp, "Schema"); int num_fields; @@ -1298,7 +1304,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp) { arrow::r::TraverseDots(lst, num_fields, check_name); } - arrow::r::RTasks tasks; + arrow::r::RTasks tasks(use_threads); arrow::Status status = arrow::Status::OK(); From ff94919f346c01c107e8e0815798c15e82027f93 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 11 May 2021 15:30:26 +0200 Subject: [PATCH 22/33] for now comment part of the code using zero copy (but potentially costly because null handling). --- r/src/r_to_arrow.cpp | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 2cb74150226..9ff24d16328 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1326,20 +1326,23 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, options.type = schema->field(j)->type(); options.size = vctrs::vec_size(x); - // maybe short circuit when zero-copy is possible - if (arrow::r::can_reuse_memory(x, options.type)) { - columns[j] = std::make_shared( - arrow::r::vec_to_arrow__reuse_memory(x)); - } else { - auto converter_result = - arrow::MakeConverter( - options.type, options, gc_memory_pool()); - if (!converter_result.ok()) { - status = converter_result.status(); - break; - } - converters[j] = std::move(converter_result.ValueUnsafe()); + // TODO: do this in parallel, we can't use vec_to_arrow__reuse_memory() + // as is, because it wraps a SEXP with an cpp11:: vector but it should + // be possible to modify it so that we do the work in a task. + // + // if (arrow::r::can_reuse_memory(x, options.type)) { + // columns[j] = std::make_shared( + // arrow::r::vec_to_arrow__reuse_memory(x)); + // } else { + auto converter_result = + arrow::MakeConverter( + options.type, options, gc_memory_pool()); + if (!converter_result.ok()) { + status = converter_result.status(); + break; } + converters[j] = std::move(converter_result.ValueUnsafe()); + // } } } StopIfNotOk(status); From 9d0f7be570dd79bd9cce4b3708827561d166a8ce Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Wed, 12 May 2021 16:46:17 +0200 Subject: [PATCH 23/33] handle zero copy in parallel in Table__from_dots() --- r/src/r_to_arrow.cpp | 105 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 96 insertions(+), 9 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 9ff24d16328..c46943f093d 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1254,6 +1254,96 @@ std::shared_ptr vec_to_arrow(SEXP x, return ValueOrStop(converter->ToArray()); } +// TODO: most of this is very similar to MakeSimpleArray, just adapted to +// leverage concurrency. Maybe some refactoring needed. +template +bool vector_from_r_memory_impl(SEXP x, const std::shared_ptr& type, + std::vector>& columns, + int j, RTasks& tasks) { + RVector vec(x); + using value_type = typename arrow::TypeTraits::ArrayType::value_type; + auto buffer = std::make_shared>(vec); + + tasks.Append(true, [buffer, x, &columns, j]() { + std::vector> buffers{nullptr, buffer}; + + auto n = XLENGTH(x); + auto p_x_start = reinterpret_cast(DATAPTR_RO(x)); + auto p_x_end = p_x_start + n; + + int null_count = 0; + auto first_na = std::find_if(p_x_start, p_x_end, is_NA); + + if (first_na < p_x_end) { + auto null_bitmap = + ValueOrStop(AllocateBuffer(BitUtil::BytesForBits(n), gc_memory_pool())); + internal::FirstTimeBitmapWriter bitmap_writer(null_bitmap->mutable_data(), 0, n); + + // first loop to clear all the bits before the first NA + auto k = std::distance(p_x_start, first_na); + int i = 0; + for (; i < k; i++, bitmap_writer.Next()) { + bitmap_writer.Set(); + } + + auto p_vec = first_na; + // then finish + for (; i < n; i++, bitmap_writer.Next(), ++p_vec) { + if (is_NA(*p_vec)) { + bitmap_writer.Clear(); + null_count++; + } else { + bitmap_writer.Set(); + } + } + + bitmap_writer.Finish(); + buffers[0] = std::move(null_bitmap); + } + + auto data = ArrayData::Make(std::make_shared(), n, std::move(buffers), + null_count, 0 /*offset*/); + auto array = std::make_shared::ArrayType>(data); + columns[j] = std::make_shared(array); + + return Status::OK(); + }); + + return true; +} + +bool vector_from_r_memory(SEXP x, const std::shared_ptr& type, + std::vector>& columns, + int j, RTasks& tasks) { + if (ALTREP(x)) return false; + + switch (type->id()) { + case Type::INT32: + return TYPEOF(x) == INTSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::DOUBLE: + return TYPEOF(x) == REALSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::UINT8: + return TYPEOF(x) == RAWSXP && !OBJECT(x) && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + + case Type::INT64: + return TYPEOF(x) == REALSXP && Rf_inherits(x, "integer64") && + vector_from_r_memory_impl(x, type, columns, j, + tasks); + default: + break; + } + + return false; +} + } // namespace r } // namespace arrow @@ -1326,14 +1416,12 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, options.type = schema->field(j)->type(); options.size = vctrs::vec_size(x); - // TODO: do this in parallel, we can't use vec_to_arrow__reuse_memory() - // as is, because it wraps a SEXP with an cpp11:: vector but it should - // be possible to modify it so that we do the work in a task. - // - // if (arrow::r::can_reuse_memory(x, options.type)) { - // columns[j] = std::make_shared( - // arrow::r::vec_to_arrow__reuse_memory(x)); - // } else { + // first try to add a task to do a zero copy in parallel + if (arrow::r::vector_from_r_memory(x, options.type, columns, j, tasks)) { + continue; + } + + // if unsuccessful: use RConverter api auto converter_result = arrow::MakeConverter( options.type, options, gc_memory_pool()); @@ -1342,7 +1430,6 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, break; } converters[j] = std::move(converter_result.ValueUnsafe()); - // } } } StopIfNotOk(status); From 939e026c48373ae0022681df1ec4458678aff360 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 13 May 2021 10:06:18 +0200 Subject: [PATCH 24/33] factorr out UnsafeAppendUtf8Strings in the string converter implementation so that it may be done in parallel. --- r/src/r_to_arrow.cpp | 35 +++++++++++++++++++---------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index c46943f093d..9a5ba699488 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -812,43 +812,46 @@ class RPrimitiveConverter> using OffsetType = typename T::offset_type; Status Extend(SEXP x, int64_t size) override { - int64_t start = 0; RVectorType rtype = GetVectorType(x); if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - cpp11::strings s(arrow::r::utf8_strings(x)); + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + } + + void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine this., e.g. extract setup from Extend() + tasks.Append(false, std::move(task)); + } + + private: + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); - auto it = s.begin() + start; + const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get // a definite size and then use UnsafeAppend*() int64_t total_length = 0; - for (R_xlen_t i = 0; i < size; i++, ++it) { - cpp11::r_string si = *it; - total_length += cpp11::is_na(si) ? 0 : si.size(); + for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + SEXP si = *p_strings; + total_length += si == NA_STRING ? 0 : LENGTH(si); } RETURN_NOT_OK(this->primitive_builder_->ReserveData(total_length)); // append - it = s.begin() + start; - for (R_xlen_t i = 0; i < size; i++, ++it) { - cpp11::r_string si = *it; + p_strings = reinterpret_cast(DATAPTR_RO(s)); + for (R_xlen_t i = 0; i < size; i++, ++p_strings) { + SEXP si = *p_strings; if (si == NA_STRING) { this->primitive_builder_->UnsafeAppendNull(); } else { - this->primitive_builder_->UnsafeAppend(CHAR(si), si.size()); + this->primitive_builder_->UnsafeAppend(CHAR(si), LENGTH(si)); } } return Status::OK(); } - - void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - auto task = [this, values, size]() { return this->Extend(values, size); }; - // TODO: refine this., e.g. extract setup from Extend() - tasks.Append(false, std::move(task)); - } }; template From 592a103d08d773e53ab53441a152126116450982 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 13 May 2021 12:22:36 +0200 Subject: [PATCH 25/33] handle strings in parallel --- r/src/arrow_types.h | 5 +++-- r/src/r_to_arrow.cpp | 29 +++++++++++++++++++++++------ 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/r/src/arrow_types.h b/r/src/arrow_types.h index 68e7147e7ed..ca4ca9519c3 100644 --- a/r/src/arrow_types.h +++ b/r/src/arrow_types.h @@ -149,11 +149,12 @@ void TraverseDots(cpp11::list dots, int num_fields, Lambda lambda) { } } -inline std::vector FlattenDots(cpp11::list dots, int num_fields) { +inline cpp11::writable::list FlattenDots(cpp11::list dots, int num_fields) { std::vector out(num_fields); auto set = [&](int j, SEXP x, cpp11::r_string) { out[j] = x; }; TraverseDots(dots, num_fields, set); - return out; + + return cpp11::writable::list(out.begin(), out.end()); } arrow::Status InferSchemaFromDots(SEXP lst, SEXP schema_sxp, int num_fields, diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 9a5ba699488..22dace115f7 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -816,18 +816,24 @@ class RPrimitiveConverter> if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); + // this is fine for now because a converter only ever calls Extend() + // but we'll have to find some other way + utf8_strings_ = arrow::r::utf8_strings(x); + return UnsafeAppendUtf8Strings(utf8_strings_, size); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - auto task = [this, values, size]() { return this->Extend(values, size); }; - // TODO: refine this., e.g. extract setup from Extend() - tasks.Append(false, std::move(task)); + // this is fine for now because a converter only ever calls DelayedExtend() + // but we'll have to find some other way + utf8_strings_ = arrow::r::utf8_strings(values); + + tasks.Append(false, + [this, size]() { return this->Extend(this->utf8_strings_, size); }); } private: - Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { - RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); + Status UnsafeAppendUtf8Strings(SEXP s, int64_t size) { + RETURN_NOT_OK(this->primitive_builder_->Reserve(XLENGTH(s))); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get @@ -852,6 +858,9 @@ class RPrimitiveConverter> return Status::OK(); } + + private: + cpp11::strings utf8_strings_; }; template @@ -1404,6 +1413,14 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); std::vector> converters(num_fields); + // convert strings to utf8 + for (size_t j = 0; j < num_fields; j++) { + SEXP x = flatten_lst[j]; + if (TYPEOF(x) == STRSXP) { + flatten_lst[j] = arrow::r::utf8_strings(flatten_lst[j]); + } + } + // init converters for (int j = 0; j < num_fields && status.ok(); j++) { SEXP x = flatten_lst[j]; From 64fc8aa5cf58d712fa51331ce4e183202ca21839 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 13 May 2021 18:18:06 +0200 Subject: [PATCH 26/33] strings in parallel (for real this time, hopefully) --- r/src/r_to_arrow.cpp | 32 ++++++++++++++------------------ 1 file changed, 14 insertions(+), 18 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 22dace115f7..5f38720d81e 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -816,22 +816,26 @@ class RPrimitiveConverter> if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - // this is fine for now because a converter only ever calls Extend() - // but we'll have to find some other way - utf8_strings_ = arrow::r::utf8_strings(x); - return UnsafeAppendUtf8Strings(utf8_strings_, size); + return UnsafeAppendUtf8Strings(to_utf8(x), size); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - // this is fine for now because a converter only ever calls DelayedExtend() - // but we'll have to find some other way - utf8_strings_ = arrow::r::utf8_strings(values); + // convert to utf8 on the main thread + SEXP x = to_utf8(values); - tasks.Append(false, - [this, size]() { return this->Extend(this->utf8_strings_, size); }); + // but then do the bulk of the work in parallel + tasks.Append(true, [this, x, size]() { return UnsafeAppendUtf8Strings(x, size); }); } private: + // convert x to utf8 strings and cache so that + // the result can be used in parallel (in DelayedExtend) + SEXP to_utf8(SEXP x) { + SEXP utf8 = arrow::r::utf8_strings(x); + utf8_vectors_.push_back(utf8); + return utf8; + } + Status UnsafeAppendUtf8Strings(SEXP s, int64_t size) { RETURN_NOT_OK(this->primitive_builder_->Reserve(XLENGTH(s))); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); @@ -860,7 +864,7 @@ class RPrimitiveConverter> } private: - cpp11::strings utf8_strings_; + cpp11::writable::list utf8_vectors_; }; template @@ -1413,14 +1417,6 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, auto flatten_lst = arrow::r::FlattenDots(lst, num_fields); std::vector> converters(num_fields); - // convert strings to utf8 - for (size_t j = 0; j < num_fields; j++) { - SEXP x = flatten_lst[j]; - if (TYPEOF(x) == STRSXP) { - flatten_lst[j] = arrow::r::utf8_strings(flatten_lst[j]); - } - } - // init converters for (int j = 0; j < num_fields && status.ok(); j++) { SEXP x = flatten_lst[j]; From af3d42b1434ecf1095c6342ff553586e8332ee62 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Thu, 20 May 2021 16:45:36 +0200 Subject: [PATCH 27/33] finish rebase --- r/R/arrowExports.R | 10 ++-------- r/src/arrowExports.cpp | 7 ------- 2 files changed, 2 insertions(+), 15 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index e492c92e019..b59aa12dac7 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -1360,18 +1360,12 @@ ExportRecordBatch <- function(batch, array_ptr, schema_ptr){ invisible(.Call(`_arrow_ExportRecordBatch`, batch, array_ptr, schema_ptr)) } -<<<<<<< HEAD -<<<<<<< HEAD ExportRecordBatchReader <- function(reader, stream_ptr){ invisible(.Call(`_arrow_ExportRecordBatchReader`, reader, stream_ptr)) -======= -Table__from_dots <- function(lst, schema_sxp){ - .Call(`_arrow_Table__from_dots`, lst, schema_sxp) ->>>>>>> e8b766472 (towards a parallel Table__from_dots()) -======= +} + Table__from_dots <- function(lst, schema_sxp, use_threads){ .Call(`_arrow_Table__from_dots`, lst, schema_sxp, use_threads) ->>>>>>> f2ef4f637 (trickle down options.use_threads()) } vec_to_arrow <- function(x, s_type){ diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index f7ff8313322..b25dd5a2a0d 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -7165,15 +7165,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_ExportSchema", (DL_FUNC) &_arrow_ExportSchema, 2}, { "_arrow_ExportArray", (DL_FUNC) &_arrow_ExportArray, 3}, { "_arrow_ExportRecordBatch", (DL_FUNC) &_arrow_ExportRecordBatch, 3}, -<<<<<<< HEAD -<<<<<<< HEAD { "_arrow_ExportRecordBatchReader", (DL_FUNC) &_arrow_ExportRecordBatchReader, 2}, -======= - { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 2}, ->>>>>>> e8b766472 (towards a parallel Table__from_dots()) -======= { "_arrow_Table__from_dots", (DL_FUNC) &_arrow_Table__from_dots, 3}, ->>>>>>> f2ef4f637 (trickle down options.use_threads()) { "_arrow_vec_to_arrow", (DL_FUNC) &_arrow_vec_to_arrow, 2}, { "_arrow_DictionaryArray__FromArrays", (DL_FUNC) &_arrow_DictionaryArray__FromArrays, 3}, { "_arrow_RecordBatch__num_columns", (DL_FUNC) &_arrow_RecordBatch__num_columns, 1}, From 84e74d8e221adf456ba9064662fc2532c077aea0 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 21 May 2021 10:07:20 +0200 Subject: [PATCH 28/33] do ->ToArray() calls in parallel --- r/src/r_to_arrow.cpp | 62 +++++++++++++++++++++++--------------------- 1 file changed, 32 insertions(+), 30 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 5f38720d81e..7446d5741ec 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -816,28 +816,18 @@ class RPrimitiveConverter> if (rtype != STRING) { return Status::Invalid("Expecting a character vector"); } - return UnsafeAppendUtf8Strings(to_utf8(x), size); + return UnsafeAppendUtf8Strings(arrow::r::utf8_strings(x), size); } void DelayedExtend(SEXP values, int64_t size, RTasks& tasks) override { - // convert to utf8 on the main thread - SEXP x = to_utf8(values); - - // but then do the bulk of the work in parallel - tasks.Append(true, [this, x, size]() { return UnsafeAppendUtf8Strings(x, size); }); + auto task = [this, values, size]() { return this->Extend(values, size); }; + // TODO: refine this., e.g. extract setup from Extend() + tasks.Append(false, std::move(task)); } private: - // convert x to utf8 strings and cache so that - // the result can be used in parallel (in DelayedExtend) - SEXP to_utf8(SEXP x) { - SEXP utf8 = arrow::r::utf8_strings(x); - utf8_vectors_.push_back(utf8); - return utf8; - } - - Status UnsafeAppendUtf8Strings(SEXP s, int64_t size) { - RETURN_NOT_OK(this->primitive_builder_->Reserve(XLENGTH(s))); + Status UnsafeAppendUtf8Strings(const cpp11::strings& s, int64_t size) { + RETURN_NOT_OK(this->primitive_builder_->Reserve(s.size())); const SEXP* p_strings = reinterpret_cast(DATAPTR_RO(s)); // we know all the R strings are utf8 already, so we can get @@ -862,9 +852,6 @@ class RPrimitiveConverter> return Status::OK(); } - - private: - cpp11::writable::list utf8_vectors_; }; template @@ -1448,25 +1435,40 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, converters[j] = std::move(converter_result.ValueUnsafe()); } } - StopIfNotOk(status); - for (int j = 0; j < num_fields; j++) { - auto& converter = converters[j]; - if (converter != nullptr) { - converter->DelayedExtend(flatten_lst[j], converter->options().size, tasks); + // if the previous loop didn't break early, spawn + // tasks to Extend, maybe in parallel + if (status.ok()) { + for (int j = 0; j < num_fields; j++) { + auto& converter = converters[j]; + if (converter != nullptr) { + converter->DelayedExtend(flatten_lst[j], converter->options().size, tasks); + } } } + // in any case, this needs to wait until all tasks are finished status &= tasks.Finish(); + // nothing is running in parallel here, so we have an opportunity to stop + StopIfNotOk(status); + + // then finally convert to chunked arrays in parallel + arrow::r::RTasks finish_tasks(use_threads); + for (int j = 0; j < num_fields; j++) { - auto& converter = converters[j]; - if (converter != nullptr) { - auto maybe_array = converter->ToArray(); - StopIfNotOk(maybe_array.status()); - columns[j] = std::make_shared(maybe_array.ValueUnsafe()); - } + finish_tasks.Append(true, [&columns, j, &converters]() { + auto& converter = converters[j]; + if (converter != nullptr) { + auto maybe_array = converter->ToArray(); + RETURN_NOT_OK(maybe_array.status()); + columns[j] = std::make_shared(maybe_array.ValueUnsafe()); + } + return arrow::Status::OK(); + }); } + status &= finish_tasks.Finish(); + StopIfNotOk(status); status &= check_consistent_column_length(columns); StopIfNotOk(status); From 36bb9d70d3ef8ede03162f013dc6af78db72c41b Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 21 May 2021 10:27:12 +0200 Subject: [PATCH 29/33] missing parameter --- r/R/table.R | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/r/R/table.R b/r/R/table.R index 2db836d78c9..8f9fee065f2 100644 --- a/r/R/table.R +++ b/r/R/table.R @@ -168,13 +168,13 @@ Table$create <- function(..., schema = NULL) { names(dots) <- rep_len("", length(dots)) } stopifnot(length(dots) > 0) - + # Preserve any grouping if (length(dots) == 1 && inherits(dots[[1]], "grouped_df")) { - out <- Table__from_dots(dots, schema) + out <- Table__from_dots(dots, schema, option_use_threads()) return(dplyr::group_by(out, !!!dplyr::groups(dots[[1]]))) } - + if (all_record_batches(dots)) { Table__from_record_batches(dots, schema) } else { From eea4a5fe5212bb118181c82a1c2f6c0bd2430bc5 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 21 May 2021 10:36:01 +0200 Subject: [PATCH 30/33] prefer ARROW_ASSIGN_OR_RAISE --- r/src/r_to_arrow.cpp | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index 7446d5741ec..c4aaeb3f0c1 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -1460,9 +1460,8 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, finish_tasks.Append(true, [&columns, j, &converters]() { auto& converter = converters[j]; if (converter != nullptr) { - auto maybe_array = converter->ToArray(); - RETURN_NOT_OK(maybe_array.status()); - columns[j] = std::make_shared(maybe_array.ValueUnsafe()); + ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); + columns[j] = std::make_shared(array); } return arrow::Status::OK(); }); From 388f2c2e563703e1c1c2b86d5063af6769ce5b91 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Fri, 21 May 2021 12:06:50 +0200 Subject: [PATCH 31/33] stopping early --- r/src/r_to_arrow.cpp | 39 +++++++++++++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 6 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index c4aaeb3f0c1..e5395af7d62 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -56,9 +56,11 @@ class RTasks { explicit RTasks(bool use_threads) : use_threads_(use_threads), - parallel_tasks_(use_threads ? arrow::internal::TaskGroup::MakeThreaded( - arrow::internal::GetCpuThreadPool()) - : nullptr) {} + stop_source_(), + parallel_tasks_( + use_threads ? arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()) + : nullptr) {} // This Finish() method must never be called from a thread pool thread // as this would deadlock. @@ -73,7 +75,6 @@ class RTasks { // run the delayed tasks now for (auto& task : delayed_serial_tasks_) { status &= std::move(task)(); - if (!status.ok()) break; } // then wait for the parallel tasks to finish @@ -85,16 +86,42 @@ class RTasks { } void Append(bool parallel, Task&& task) { + StoppingTask stopping_task(stop_source_, std::move(task)); if (parallel && use_threads_) { - parallel_tasks_->Append(std::move(task)); + parallel_tasks_->Append(std::move(stopping_task)); } else { - delayed_serial_tasks_.push_back(std::move(task)); + delayed_serial_tasks_.push_back(std::move(stopping_task)); } } bool use_threads_; + StopSource stop_source_; std::shared_ptr parallel_tasks_; std::vector delayed_serial_tasks_; + + private: + class StoppingTask { + public: + StoppingTask(StopSource stop_source, Task&& task) : task_(std::move(task)) {} + + Status operator()() { + Status status; + StopToken token = stop_source_.token(); + if (token.IsStopRequested()) { + status &= token.Poll(); + } else { + Status status = std::move(task_)(); + if (!status.ok()) { + stop_source_.RequestStop(); + } + } + return status; + } + + private: + StopSource stop_source_; + Task task_; + }; }; struct RConversionOptions { From 96e975907a1951071771fb744d18bd81d9f867ec Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 25 May 2021 10:22:05 +0200 Subject: [PATCH 32/33] RTasks.Reset() --- r/src/r_to_arrow.cpp | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/r/src/r_to_arrow.cpp b/r/src/r_to_arrow.cpp index e5395af7d62..d0f4f3a6def 100644 --- a/r/src/r_to_arrow.cpp +++ b/r/src/r_to_arrow.cpp @@ -94,6 +94,16 @@ class RTasks { } } + void Reset() { + delayed_serial_tasks_.clear(); + + stop_source_.Reset(); + if (use_threads_) { + parallel_tasks_ = arrow::internal::TaskGroup::MakeThreaded( + arrow::internal::GetCpuThreadPool(), stop_source_.token()); + } + } + bool use_threads_; StopSource stop_source_; std::shared_ptr parallel_tasks_; @@ -1424,6 +1434,8 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, arrow::r::TraverseDots(lst, num_fields, check_name); } + // must be careful to avoid R stop() until the tasks + // are finished, i.e. after tasks.Finish() arrow::r::RTasks tasks(use_threads); arrow::Status status = arrow::Status::OK(); @@ -1481,10 +1493,10 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, StopIfNotOk(status); // then finally convert to chunked arrays in parallel - arrow::r::RTasks finish_tasks(use_threads); + tasks.Reset(); for (int j = 0; j < num_fields; j++) { - finish_tasks.Append(true, [&columns, j, &converters]() { + tasks.Append(true, [&columns, j, &converters]() { auto& converter = converters[j]; if (converter != nullptr) { ARROW_ASSIGN_OR_RAISE(auto array, converter->ToArray()); @@ -1493,7 +1505,7 @@ std::shared_ptr Table__from_dots(SEXP lst, SEXP schema_sxp, return arrow::Status::OK(); }); } - status &= finish_tasks.Finish(); + status &= tasks.Finish(); StopIfNotOk(status); status &= check_consistent_column_length(columns); From e914b74ea1c9ff391a8a9dd335fa9c6f962b0411 Mon Sep 17 00:00:00 2001 From: Romain Francois Date: Tue, 25 May 2021 12:36:06 +0200 Subject: [PATCH 33/33] raw vectors are inferred to uint8, not int8 --- r/src/type_infer.cpp | 2 +- r/tests/testthat/test-type.R | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/r/src/type_infer.cpp b/r/src/type_infer.cpp index 93e51be6462..022a29ea5b2 100644 --- a/r/src/type_infer.cpp +++ b/r/src/type_infer.cpp @@ -179,7 +179,7 @@ std::shared_ptr InferArrowType(SEXP x) { case REALSXP: return InferArrowTypeFromVector(x); case RAWSXP: - return int8(); + return uint8(); case STRSXP: return InferArrowTypeFromVector(x); case VECSXP: diff --git a/r/tests/testthat/test-type.R b/r/tests/testthat/test-type.R index 56cef722556..343170edb82 100644 --- a/r/tests/testthat/test-type.R +++ b/r/tests/testthat/test-type.R @@ -31,7 +31,7 @@ test_that("type() infers from R type", { expect_type_equal(type(1:10), int32()) expect_type_equal(type(1), float64()) expect_type_equal(type(TRUE), boolean()) - expect_type_equal(type(raw()), int8()) + expect_type_equal(type(raw()), uint8()) expect_type_equal(type(""), utf8()) expect_type_equal( type(example_data$fct),