From f7db25aff6e4e78e89cf5254abd54c2a4b133997 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Wed, 16 Mar 2022 18:14:31 +0100 Subject: [PATCH 1/9] Update R bindings for ExecPlan --- r/R/query-engine.R | 17 +++++++++++++-- r/src/arrowExports.cpp | 48 ++++++++++++++++++++++++++++++++++++++++++ r/src/compute-exec.cpp | 23 ++++++++++++++++++++ 3 files changed, 86 insertions(+), 2 deletions(-) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 15563d62f19..4545ff43919 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -240,8 +240,21 @@ ExecPlan <- R6Class("ExecPlan", Stop = function() ExecPlan_StopProducing(self) ) ) -ExecPlan$create <- function(use_threads = option_use_threads()) { - ExecPlan_create(use_threads) + +ExecPlan$create <- function(use_threads = option_use_threads(), metadata) { + if (missing(metadata)) { + ExecPlan_create(use_threads) + } else { + ExecPlan_create_with_metadata(use_threads, metadata) + } +} + +ExecPlan$HasMetadata <- function(plan) { + ExecPlan_HasMetadata(plan) +} + +ExecPlan$metadata <- function(plan) { + ExecPlan_metadata(plan) } ExecNode <- R6Class("ExecNode", diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5810320a057..e5ccee326a6 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -868,6 +868,51 @@ BEGIN_CPP11 return cpp11::as_sexp(ExecPlan_create(use_threads)); END_CPP11 } +// compute-exec.cpp +std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata); +extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ +BEGIN_CPP11 + arrow::r::Input::type use_threads(use_threads_sexp); + arrow::r::Input::type metadata(metadata_sexp); + return cpp11::as_sexp(ExecPlan_create_with_metadata(use_threads, metadata)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ + Rf_error("Cannot call ExecPlan_create_with_metadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// compute-exec.cpp +#if defined(ARROW_R_WITH_ARROW) +bool ExecPlan_HasMetadata(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + return cpp11::as_sexp(ExecPlan_HasMetadata(plan)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ + Rf_error("Cannot call ExecPlan_HasMetadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + +// compute-exec.cpp +#if defined(ARROW_R_WITH_ARROW) +std::shared_ptr ExecPlan_metadata(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + return cpp11::as_sexp(ExecPlan_metadata(plan)); +END_CPP11 +} +#else +extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ + Rf_error("Cannot call ExecPlan_metadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); +} +#endif + // compute-exec.cpp std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head); extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP head_sexp){ @@ -5237,6 +5282,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, { "_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, + { "_arrow_ExecPlan_create_with_metadata", (DL_FUNC) &_arrow_ExecPlan_create_with_metadata, 2}, + { "_arrow_ExecPlan_HasMetadata", (DL_FUNC) &_arrow_ExecPlan_HasMetadata, 1}, + { "_arrow_ExecPlan_metadata", (DL_FUNC) &_arrow_ExecPlan_metadata, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 4}, { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0566383509a..3a6f19e7b69 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -43,6 +43,29 @@ std::shared_ptr ExecPlan_create(bool use_threads) { return plan; } +// [[arrow::export]] +std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata) { + static compute::ExecContext threaded_context{gc_memory_pool(), + arrow::internal::GetCpuThreadPool()}; + auto vec_metadata = cpp11::as_cpp>(metadata); + auto names_metadata = cpp11::as_cpp>(metadata.names()); + auto kv = std::shared_ptr( + new arrow::KeyValueMetadata(names_metadata, vec_metadata)); + auto plan = ValueOrStop( + compute::ExecPlan::Make(use_threads ? &threaded_context : gc_context(), kv)); + return plan; +} + +// [[arrow::export]] +bool ExecPlan_HasMetadata(const std::shared_ptr& plan) { + return plan->HasMetadata(); +} + +// [[arrow::export]] +std::shared_ptr ExecPlan_metadata(const std::shared_ptr& plan) { + return plan->metadata(); +} + std::shared_ptr MakeExecNodeOrStop( const std::string& factory_name, compute::ExecPlan* plan, std::vector inputs, const compute::ExecNodeOptions& options) { From 63505a54baf18b175a38838b082e757cdb3dcd38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Mon, 21 Mar 2022 23:06:08 +0100 Subject: [PATCH 2/9] R bindings WIP --- r/R/arrowExports.R | 12 ++++++++++++ r/R/dplyr-collect.R | 5 ++++- r/R/dplyr.R | 8 ++++++-- r/R/query-engine.R | 16 +++++++--------- r/src/arrowExports.cpp | 6 +++--- r/src/compute-exec.cpp | 22 ++++++++++++++++++++-- 6 files changed, 52 insertions(+), 17 deletions(-) diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 9c008e2da32..0edfca1cd80 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -404,6 +404,18 @@ ExecPlan_create <- function(use_threads) { .Call(`_arrow_ExecPlan_create`, use_threads) } +ExecPlan_create_with_metadata <- function(use_threads, metadata) { + .Call(`_arrow_ExecPlan_create_with_metadata`, use_threads, metadata) +} + +ExecPlan_HasMetadata <- function(plan) { + .Call(`_arrow_ExecPlan_HasMetadata`, plan) +} + +ExecPlan_metadata <- function(plan) { + .Call(`_arrow_ExecPlan_metadata`, plan) +} + ExecPlan_run <- function(plan, final_node, sort_options, head) { .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, head) } diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index 3586dda33e8..c7f9c17844c 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -18,7 +18,7 @@ # The following S3 methods are registered on load if dplyr is present -collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { +collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, metadata = empty_named_list(), ...) { # head and tail are not ExecNodes, at best we can handle them via sink node # so if there are any steps done after head/tail, we need to # evaluate the query up to then and then do a new query for the rest @@ -26,6 +26,9 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { x$.data <- as_adq(dplyr::compute(x$.data))$.data } + # attach the metadata to the query + x$metadata=prepare_key_value_metadata(metadata) + # See query-engine.R for ExecPlan/Nodes tryCatch( tab <- do_exec_plan(x), diff --git a/r/R/dplyr.R b/r/R/dplyr.R index c9650fb0653..6f997f16f93 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -19,7 +19,7 @@ #' @include record-batch.R #' @include table.R -arrow_dplyr_query <- function(.data) { +arrow_dplyr_query <- function(.data, metadata = empty_named_list()) { # An arrow_dplyr_query is a container for an Arrow data object (Table, # RecordBatch, or Dataset) and the state of the user's dplyr query--things # like selected columns, filters, and group vars. @@ -31,6 +31,8 @@ arrow_dplyr_query <- function(.data) { error = function(e) character() ) + metadata_kv = prepare_key_value_metadata(metadata) + if (inherits(.data, "data.frame")) { .data <- Table$create(.data) } @@ -67,7 +69,9 @@ arrow_dplyr_query <- function(.data) { arrange_vars = list(), # arrange_desc will be a logical vector indicating the sort order for each # expression in arrange_vars (FALSE for ascending, TRUE for descending) - arrange_desc = logical() + arrange_desc = logical(), + # metadata is a named list of strings that'll be attached to the ExecPlan + metadata = metadata_kv ), class = "arrow_dplyr_query" ) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 4545ff43919..3918bb7ec78 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -16,7 +16,7 @@ # under the License. do_exec_plan <- function(.data) { - plan <- ExecPlan$create() + plan <- ExecPlan$create(metadata=.data$metadata) final_node <- plan$Build(.data) tab <- plan$Run(final_node) # TODO (ARROW-14289): make the head/tail methods return RBR not Table @@ -238,6 +238,12 @@ ExecPlan <- R6Class("ExecPlan", ExecPlan_Write(self, node, ...) }, Stop = function() ExecPlan_StopProducing(self) + HasMetadata = function() { + ExecPlan_HasMetadata(self) + }, + metadata = function() { + ExecPlan_metadata(self) + } ) ) @@ -249,14 +255,6 @@ ExecPlan$create <- function(use_threads = option_use_threads(), metadata) { } } -ExecPlan$HasMetadata <- function(plan) { - ExecPlan_HasMetadata(plan) -} - -ExecPlan$metadata <- function(plan) { - ExecPlan_metadata(plan) -} - ExecNode <- R6Class("ExecNode", inherit = ArrowObject, public = list( diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index e5ccee326a6..a188de8f6d1 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -900,7 +900,7 @@ extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ // compute-exec.cpp #if defined(ARROW_R_WITH_ARROW) -std::shared_ptr ExecPlan_metadata(const std::shared_ptr& plan); +cpp11::writable::list ExecPlan_metadata(const std::shared_ptr& plan); extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ BEGIN_CPP11 arrow::r::Input&>::type plan(plan_sexp); @@ -5283,8 +5283,8 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, { "_arrow_ExecPlan_create_with_metadata", (DL_FUNC) &_arrow_ExecPlan_create_with_metadata, 2}, - { "_arrow_ExecPlan_HasMetadata", (DL_FUNC) &_arrow_ExecPlan_HasMetadata, 1}, - { "_arrow_ExecPlan_metadata", (DL_FUNC) &_arrow_ExecPlan_metadata, 1}, + { "_arrow_ExecPlan_HasMetadata", (DL_FUNC) &_arrow_ExecPlan_HasMetadata, 1}, + { "_arrow_ExecPlan_metadata", (DL_FUNC) &_arrow_ExecPlan_metadata, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 4}, { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 3a6f19e7b69..98514e0ce70 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -62,8 +62,26 @@ bool ExecPlan_HasMetadata(const std::shared_ptr& plan) { } // [[arrow::export]] -std::shared_ptr ExecPlan_metadata(const std::shared_ptr& plan) { - return plan->metadata(); +cpp11::writable::list ExecPlan_metadata(const std::shared_ptr& plan) { + auto meta = plan->metadata(); + int64_t n = 0; + if (plan->HasMetadata()) { + n = meta->size(); + } + + cpp11::writable::list out(n); + std::vector names_out(n); + + for (int i = 0; i < n; i++) { + auto key = meta->key(i); + out[i] = cpp11::as_sexp(meta->value(i)); + if (key == "r") { + Rf_classgets(out[i], arrow::r::data::classes_metadata_r); + } + names_out[i] = key; + } + out.names() = names_out; + return out; } std::shared_ptr MakeExecNodeOrStop( From 580394406800831567d6072d8bb5de3f403819ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Tue, 22 Mar 2022 19:40:16 +0100 Subject: [PATCH 3/9] Lint --- r/R/dplyr-collect.R | 2 +- r/R/dplyr.R | 2 +- r/R/query-engine.R | 2 +- r/src/compute-exec.cpp | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index c7f9c17844c..1ff5e4208c3 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -27,7 +27,7 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, metadata = empty_ } # attach the metadata to the query - x$metadata=prepare_key_value_metadata(metadata) + x$metadata <- prepare_key_value_metadata(metadata) # See query-engine.R for ExecPlan/Nodes tryCatch( diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 6f997f16f93..2919110826a 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -31,7 +31,7 @@ arrow_dplyr_query <- function(.data, metadata = empty_named_list()) { error = function(e) character() ) - metadata_kv = prepare_key_value_metadata(metadata) + metadata_kv <- prepare_key_value_metadata(metadata) if (inherits(.data, "data.frame")) { .data <- Table$create(.data) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 3918bb7ec78..8c74c23f36d 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -16,7 +16,7 @@ # under the License. do_exec_plan <- function(.data) { - plan <- ExecPlan$create(metadata=.data$metadata) + plan <- ExecPlan$create(metadata = .data$metadata) final_node <- plan$Build(.data) tab <- plan$Run(final_node) # TODO (ARROW-14289): make the head/tail methods return RBR not Table diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 98514e0ce70..bdf46422a4d 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -44,7 +44,8 @@ std::shared_ptr ExecPlan_create(bool use_threads) { } // [[arrow::export]] -std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata) { +std::shared_ptr ExecPlan_create_with_metadata( + bool use_threads, cpp11::strings metadata) { static compute::ExecContext threaded_context{gc_memory_pool(), arrow::internal::GetCpuThreadPool()}; auto vec_metadata = cpp11::as_cpp>(metadata); From 66b190647fe43639d4dc2c92cabc6baafe919ff5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Sat, 2 Apr 2022 23:59:19 +0200 Subject: [PATCH 4/9] Python bindings --- python/pyarrow/_exec_plan.pyx | 17 ++++++++++++----- python/pyarrow/includes/libarrow.pxd | 5 +++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 753abe27cfa..a58c1a3120a 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,16 +27,15 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) -from pyarrow.lib import tobytes +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, pyarrow_unwrap_metadata) +from pyarrow.lib import KeyValueMetadata, tobytes, _pc from pyarrow._compute cimport Expression, _true from pyarrow._dataset cimport Dataset from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan - -cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True): +cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True, metadata = None): """ Internal Function to create an ExecPlan and run it. @@ -53,6 +52,8 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads to produce the output. use_threads : bool, default True Whenever to use multithreading or not. + metadata : dict or Mapping, default None + Optional metadata for the ExecPlan. """ cdef: CExecutor *c_executor @@ -72,6 +73,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen shared_ptr[CRecordBatchReader] c_recordbatchreader vector[CDeclaration].iterator plan_iter + shared_ptr[const CKeyValueMetadata] c_meta vector[CDeclaration.Input] no_c_inputs CStatus c_plan_status @@ -82,7 +84,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads c_exec_context = make_shared[CExecContext]( c_default_memory_pool(), c_executor) - c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) + + if metadata is not None: + c_meta = pyarrow_unwrap_metadata(metadata) + c_exec_plan = GetResultValue(CExecPlan.MakeWithMetadata(c_exec_context.get(), c_meta)) + else: + c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) plan_iter = plan.begin() diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2e51864b860..86360ec055d 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2504,6 +2504,8 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog cdef cppclass CExecPlan "arrow::compute::ExecPlan": @staticmethod CResult[shared_ptr[CExecPlan]] Make(CExecContext* exec_context) + @staticmethod + CResult[shared_ptr[CExecPlan]] MakeWithMetadata" Make"(CExecContext* exec_context, shared_ptr[const CKeyValueMetadata] metadata) CStatus StartProducing() CStatus Validate() @@ -2511,6 +2513,9 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog CFuture_Void finished() + c_bool HasMetadata() + shared_ptr[const CKeyValueMetadata] metadata() + vector[CExecNode*] sinks() const vector[CExecNode*] sources() const From 2ee3e7d7ba357da2bbfe48f85d83d9d7cdc84905 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Sun, 3 Apr 2022 00:13:58 +0200 Subject: [PATCH 5/9] Lint --- python/pyarrow/_exec_plan.pyx | 5 +++-- python/pyarrow/includes/libarrow.pxd | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index a58c1a3120a..34a80f550d3 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -35,7 +35,7 @@ from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan -cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True, metadata = None): +cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True, metadata=None): """ Internal Function to create an ExecPlan and run it. @@ -87,7 +87,8 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads if metadata is not None: c_meta = pyarrow_unwrap_metadata(metadata) - c_exec_plan = GetResultValue(CExecPlan.MakeWithMetadata(c_exec_context.get(), c_meta)) + c_exec_plan = GetResultValue( + CExecPlan.MakeWithMetadata(c_exec_context.get(), c_meta)) else: c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 86360ec055d..1bb3541aeb8 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2504,6 +2504,7 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog cdef cppclass CExecPlan "arrow::compute::ExecPlan": @staticmethod CResult[shared_ptr[CExecPlan]] Make(CExecContext* exec_context) + @staticmethod CResult[shared_ptr[CExecPlan]] MakeWithMetadata" Make"(CExecContext* exec_context, shared_ptr[const CKeyValueMetadata] metadata) From d8a8e81f083c2080d2f77291aa29fc0688e796d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Tue, 19 Apr 2022 12:54:38 +0200 Subject: [PATCH 6/9] Amend docstring for ExecPlan metadata --- python/pyarrow/_exec_plan.pyx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 34a80f550d3..60b7351c7ec 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -53,7 +53,11 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads use_threads : bool, default True Whenever to use multithreading or not. metadata : dict or Mapping, default None - Optional metadata for the ExecPlan. + Optional metadata for the ExecPlan. + This metadata will be added to the OpenTelemetry tracing + output generated by this plan. (see ARROW-15061) + Can be used for annotating the traces with information about the plan + and the environment (e.g TPC-H query id, scale factor, hardware details). """ cdef: CExecutor *c_executor From b68e0f6b4ab7697c939fdc97111dff17e85d9b00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Tue, 19 Apr 2022 13:03:07 +0200 Subject: [PATCH 7/9] Fix missing if statement --- r/R/query-engine.R | 2 +- r/src/arrowExports.cpp | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 8c74c23f36d..6a0fafbe870 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -237,7 +237,7 @@ ExecPlan <- R6Class("ExecPlan", # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... ExecPlan_Write(self, node, ...) }, - Stop = function() ExecPlan_StopProducing(self) + Stop = function() ExecPlan_StopProducing(self), HasMetadata = function() { ExecPlan_HasMetadata(self) }, diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index a188de8f6d1..fec56ed3c60 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -869,6 +869,7 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp +#if defined(ARROW_R_WITH_ARROW) std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata); extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ BEGIN_CPP11 From c2d40dac9433fdbcd8c26d4fc69612b0cfea02c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Tue, 17 May 2022 21:31:33 +0200 Subject: [PATCH 8/9] Expand docstring for ExecPlan metadata --- r/R/dplyr.R | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/r/R/dplyr.R b/r/R/dplyr.R index 2919110826a..d9660e345eb 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -71,6 +71,10 @@ arrow_dplyr_query <- function(.data, metadata = empty_named_list()) { # expression in arrange_vars (FALSE for ascending, TRUE for descending) arrange_desc = logical(), # metadata is a named list of strings that'll be attached to the ExecPlan + # This metadata will be added to the OpenTelemetry tracing + # output generated by this plan. (see ARROW-15061) + # Can be used for annotating the traces with information about the plan + # and the environment (e.g TPC-H query id, scale factor, hardware details). metadata = metadata_kv ), class = "arrow_dplyr_query" From 7a8e137d2ebf52240296ac027132eb7d5c2edb3f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=81kos=20Hadnagy?= Date: Tue, 17 May 2022 23:01:30 +0200 Subject: [PATCH 9/9] R build fix --- r/src/arrowExports.cpp | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index fec56ed3c60..a25692411a4 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -869,7 +869,6 @@ BEGIN_CPP11 END_CPP11 } // compute-exec.cpp -#if defined(ARROW_R_WITH_ARROW) std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata); extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ BEGIN_CPP11 @@ -878,14 +877,8 @@ BEGIN_CPP11 return cpp11::as_sexp(ExecPlan_create_with_metadata(use_threads, metadata)); END_CPP11 } -#else -extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ - Rf_error("Cannot call ExecPlan_create_with_metadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif // compute-exec.cpp -#if defined(ARROW_R_WITH_ARROW) bool ExecPlan_HasMetadata(const std::shared_ptr& plan); extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ BEGIN_CPP11 @@ -893,14 +886,8 @@ BEGIN_CPP11 return cpp11::as_sexp(ExecPlan_HasMetadata(plan)); END_CPP11 } -#else -extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ - Rf_error("Cannot call ExecPlan_HasMetadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif // compute-exec.cpp -#if defined(ARROW_R_WITH_ARROW) cpp11::writable::list ExecPlan_metadata(const std::shared_ptr& plan); extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ BEGIN_CPP11 @@ -908,11 +895,6 @@ BEGIN_CPP11 return cpp11::as_sexp(ExecPlan_metadata(plan)); END_CPP11 } -#else -extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ - Rf_error("Cannot call ExecPlan_metadata(). See https://arrow.apache.org/docs/r/articles/install.html for help installing Arrow C++ libraries. "); -} -#endif // compute-exec.cpp std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head);