diff --git a/python/pyarrow/_exec_plan.pyx b/python/pyarrow/_exec_plan.pyx index 753abe27cfa..60b7351c7ec 100644 --- a/python/pyarrow/_exec_plan.pyx +++ b/python/pyarrow/_exec_plan.pyx @@ -27,16 +27,15 @@ from cython.operator cimport dereference as deref, preincrement as inc from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * -from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table) -from pyarrow.lib import tobytes +from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, pyarrow_unwrap_metadata) +from pyarrow.lib import KeyValueMetadata, tobytes, _pc from pyarrow._compute cimport Expression, _true from pyarrow._dataset cimport Dataset from pyarrow._dataset import InMemoryDataset Initialize() # Initialise support for Datasets in ExecPlan - -cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True): +cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True, metadata=None): """ Internal Function to create an ExecPlan and run it. @@ -53,6 +52,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads to produce the output. use_threads : bool, default True Whenever to use multithreading or not. + metadata : dict or Mapping, default None + Optional metadata for the ExecPlan. + This metadata will be added to the OpenTelemetry tracing + output generated by this plan. (see ARROW-15061) + Can be used for annotating the traces with information about the plan + and the environment (e.g TPC-H query id, scale factor, hardware details). """ cdef: CExecutor *c_executor @@ -72,6 +77,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen shared_ptr[CRecordBatchReader] c_recordbatchreader vector[CDeclaration].iterator plan_iter + shared_ptr[const CKeyValueMetadata] c_meta vector[CDeclaration.Input] no_c_inputs CStatus c_plan_status @@ -82,7 +88,13 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads c_exec_context = make_shared[CExecContext]( c_default_memory_pool(), c_executor) - c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) + + if metadata is not None: + c_meta = pyarrow_unwrap_metadata(metadata) + c_exec_plan = GetResultValue( + CExecPlan.MakeWithMetadata(c_exec_context.get(), c_meta)) + else: + c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get())) plan_iter = plan.begin() diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2e51864b860..1bb3541aeb8 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -2505,12 +2505,18 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog @staticmethod CResult[shared_ptr[CExecPlan]] Make(CExecContext* exec_context) + @staticmethod + CResult[shared_ptr[CExecPlan]] MakeWithMetadata" Make"(CExecContext* exec_context, shared_ptr[const CKeyValueMetadata] metadata) + CStatus StartProducing() CStatus Validate() CStatus StopProducing() CFuture_Void finished() + c_bool HasMetadata() + shared_ptr[const CKeyValueMetadata] metadata() + vector[CExecNode*] sinks() const vector[CExecNode*] sources() const diff --git a/r/R/arrowExports.R b/r/R/arrowExports.R index 9c008e2da32..0edfca1cd80 100644 --- a/r/R/arrowExports.R +++ b/r/R/arrowExports.R @@ -404,6 +404,18 @@ ExecPlan_create <- function(use_threads) { .Call(`_arrow_ExecPlan_create`, use_threads) } +ExecPlan_create_with_metadata <- function(use_threads, metadata) { + .Call(`_arrow_ExecPlan_create_with_metadata`, use_threads, metadata) +} + +ExecPlan_HasMetadata <- function(plan) { + .Call(`_arrow_ExecPlan_HasMetadata`, plan) +} + +ExecPlan_metadata <- function(plan) { + .Call(`_arrow_ExecPlan_metadata`, plan) +} + ExecPlan_run <- function(plan, final_node, sort_options, head) { .Call(`_arrow_ExecPlan_run`, plan, final_node, sort_options, head) } diff --git a/r/R/dplyr-collect.R b/r/R/dplyr-collect.R index 3586dda33e8..1ff5e4208c3 100644 --- a/r/R/dplyr-collect.R +++ b/r/R/dplyr-collect.R @@ -18,7 +18,7 @@ # The following S3 methods are registered on load if dplyr is present -collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { +collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, metadata = empty_named_list(), ...) { # head and tail are not ExecNodes, at best we can handle them via sink node # so if there are any steps done after head/tail, we need to # evaluate the query up to then and then do a new query for the rest @@ -26,6 +26,9 @@ collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) { x$.data <- as_adq(dplyr::compute(x$.data))$.data } + # attach the metadata to the query + x$metadata <- prepare_key_value_metadata(metadata) + # See query-engine.R for ExecPlan/Nodes tryCatch( tab <- do_exec_plan(x), diff --git a/r/R/dplyr.R b/r/R/dplyr.R index c9650fb0653..d9660e345eb 100644 --- a/r/R/dplyr.R +++ b/r/R/dplyr.R @@ -19,7 +19,7 @@ #' @include record-batch.R #' @include table.R -arrow_dplyr_query <- function(.data) { +arrow_dplyr_query <- function(.data, metadata = empty_named_list()) { # An arrow_dplyr_query is a container for an Arrow data object (Table, # RecordBatch, or Dataset) and the state of the user's dplyr query--things # like selected columns, filters, and group vars. @@ -31,6 +31,8 @@ arrow_dplyr_query <- function(.data) { error = function(e) character() ) + metadata_kv <- prepare_key_value_metadata(metadata) + if (inherits(.data, "data.frame")) { .data <- Table$create(.data) } @@ -67,7 +69,13 @@ arrow_dplyr_query <- function(.data) { arrange_vars = list(), # arrange_desc will be a logical vector indicating the sort order for each # expression in arrange_vars (FALSE for ascending, TRUE for descending) - arrange_desc = logical() + arrange_desc = logical(), + # metadata is a named list of strings that'll be attached to the ExecPlan + # This metadata will be added to the OpenTelemetry tracing + # output generated by this plan. (see ARROW-15061) + # Can be used for annotating the traces with information about the plan + # and the environment (e.g TPC-H query id, scale factor, hardware details). + metadata = metadata_kv ), class = "arrow_dplyr_query" ) diff --git a/r/R/query-engine.R b/r/R/query-engine.R index 15563d62f19..6a0fafbe870 100644 --- a/r/R/query-engine.R +++ b/r/R/query-engine.R @@ -16,7 +16,7 @@ # under the License. do_exec_plan <- function(.data) { - plan <- ExecPlan$create() + plan <- ExecPlan$create(metadata = .data$metadata) final_node <- plan$Build(.data) tab <- plan$Run(final_node) # TODO (ARROW-14289): make the head/tail methods return RBR not Table @@ -237,11 +237,22 @@ ExecPlan <- R6Class("ExecPlan", # TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ... ExecPlan_Write(self, node, ...) }, - Stop = function() ExecPlan_StopProducing(self) + Stop = function() ExecPlan_StopProducing(self), + HasMetadata = function() { + ExecPlan_HasMetadata(self) + }, + metadata = function() { + ExecPlan_metadata(self) + } ) ) -ExecPlan$create <- function(use_threads = option_use_threads()) { - ExecPlan_create(use_threads) + +ExecPlan$create <- function(use_threads = option_use_threads(), metadata) { + if (missing(metadata)) { + ExecPlan_create(use_threads) + } else { + ExecPlan_create_with_metadata(use_threads, metadata) + } } ExecNode <- R6Class("ExecNode", diff --git a/r/src/arrowExports.cpp b/r/src/arrowExports.cpp index 5810320a057..a25692411a4 100644 --- a/r/src/arrowExports.cpp +++ b/r/src/arrowExports.cpp @@ -868,6 +868,34 @@ BEGIN_CPP11 return cpp11::as_sexp(ExecPlan_create(use_threads)); END_CPP11 } +// compute-exec.cpp +std::shared_ptr ExecPlan_create_with_metadata(bool use_threads, cpp11::strings metadata); +extern "C" SEXP _arrow_ExecPlan_create_with_metadata(SEXP use_threads_sexp, SEXP metadata_sexp){ +BEGIN_CPP11 + arrow::r::Input::type use_threads(use_threads_sexp); + arrow::r::Input::type metadata(metadata_sexp); + return cpp11::as_sexp(ExecPlan_create_with_metadata(use_threads, metadata)); +END_CPP11 +} + +// compute-exec.cpp +bool ExecPlan_HasMetadata(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_HasMetadata(SEXP plan_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + return cpp11::as_sexp(ExecPlan_HasMetadata(plan)); +END_CPP11 +} + +// compute-exec.cpp +cpp11::writable::list ExecPlan_metadata(const std::shared_ptr& plan); +extern "C" SEXP _arrow_ExecPlan_metadata(SEXP plan_sexp){ +BEGIN_CPP11 + arrow::r::Input&>::type plan(plan_sexp); + return cpp11::as_sexp(ExecPlan_metadata(plan)); +END_CPP11 +} + // compute-exec.cpp std::shared_ptr ExecPlan_run(const std::shared_ptr& plan, const std::shared_ptr& final_node, cpp11::list sort_options, int64_t head); extern "C" SEXP _arrow_ExecPlan_run(SEXP plan_sexp, SEXP final_node_sexp, SEXP sort_options_sexp, SEXP head_sexp){ @@ -5237,6 +5265,9 @@ static const R_CallMethodDef CallEntries[] = { { "_arrow_io___CompressedOutputStream__Make", (DL_FUNC) &_arrow_io___CompressedOutputStream__Make, 2}, { "_arrow_io___CompressedInputStream__Make", (DL_FUNC) &_arrow_io___CompressedInputStream__Make, 2}, { "_arrow_ExecPlan_create", (DL_FUNC) &_arrow_ExecPlan_create, 1}, + { "_arrow_ExecPlan_create_with_metadata", (DL_FUNC) &_arrow_ExecPlan_create_with_metadata, 2}, + { "_arrow_ExecPlan_HasMetadata", (DL_FUNC) &_arrow_ExecPlan_HasMetadata, 1}, + { "_arrow_ExecPlan_metadata", (DL_FUNC) &_arrow_ExecPlan_metadata, 1}, { "_arrow_ExecPlan_run", (DL_FUNC) &_arrow_ExecPlan_run, 4}, { "_arrow_ExecPlan_StopProducing", (DL_FUNC) &_arrow_ExecPlan_StopProducing, 1}, { "_arrow_ExecNode_output_schema", (DL_FUNC) &_arrow_ExecNode_output_schema, 1}, diff --git a/r/src/compute-exec.cpp b/r/src/compute-exec.cpp index 0566383509a..bdf46422a4d 100644 --- a/r/src/compute-exec.cpp +++ b/r/src/compute-exec.cpp @@ -43,6 +43,48 @@ std::shared_ptr ExecPlan_create(bool use_threads) { return plan; } +// [[arrow::export]] +std::shared_ptr ExecPlan_create_with_metadata( + bool use_threads, cpp11::strings metadata) { + static compute::ExecContext threaded_context{gc_memory_pool(), + arrow::internal::GetCpuThreadPool()}; + auto vec_metadata = cpp11::as_cpp>(metadata); + auto names_metadata = cpp11::as_cpp>(metadata.names()); + auto kv = std::shared_ptr( + new arrow::KeyValueMetadata(names_metadata, vec_metadata)); + auto plan = ValueOrStop( + compute::ExecPlan::Make(use_threads ? &threaded_context : gc_context(), kv)); + return plan; +} + +// [[arrow::export]] +bool ExecPlan_HasMetadata(const std::shared_ptr& plan) { + return plan->HasMetadata(); +} + +// [[arrow::export]] +cpp11::writable::list ExecPlan_metadata(const std::shared_ptr& plan) { + auto meta = plan->metadata(); + int64_t n = 0; + if (plan->HasMetadata()) { + n = meta->size(); + } + + cpp11::writable::list out(n); + std::vector names_out(n); + + for (int i = 0; i < n; i++) { + auto key = meta->key(i); + out[i] = cpp11::as_sexp(meta->value(i)); + if (key == "r") { + Rf_classgets(out[i], arrow::r::data::classes_metadata_r); + } + names_out[i] = key; + } + out.names() = names_out; + return out; +} + std::shared_ptr MakeExecNodeOrStop( const std::string& factory_name, compute::ExecPlan* plan, std::vector inputs, const compute::ExecNodeOptions& options) {