Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 17 additions & 5 deletions python/pyarrow/_exec_plan.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,15 @@ from cython.operator cimport dereference as deref, preincrement as inc
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table)
from pyarrow.lib import tobytes
from pyarrow.lib cimport (Table, check_status, pyarrow_unwrap_table, pyarrow_wrap_table, pyarrow_unwrap_metadata)
from pyarrow.lib import KeyValueMetadata, tobytes, _pc
from pyarrow._compute cimport Expression, _true
from pyarrow._dataset cimport Dataset
from pyarrow._dataset import InMemoryDataset

Initialize() # Initialise support for Datasets in ExecPlan


cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True):
cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads=True, metadata=None):
"""
Internal Function to create an ExecPlan and run it.

Expand All @@ -53,6 +52,12 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
to produce the output.
use_threads : bool, default True
Whenever to use multithreading or not.
metadata : dict or Mapping, default None
Optional metadata for the ExecPlan.
This metadata will be added to the OpenTelemetry tracing
output generated by this plan. (see ARROW-15061)
Can be used for annotating the traces with information about the plan
and the environment (e.g TPC-H query id, scale factor, hardware details).
"""
cdef:
CExecutor *c_executor
Expand All @@ -72,6 +77,7 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads
shared_ptr[CAsyncExecBatchGenerator] c_async_exec_batch_gen
shared_ptr[CRecordBatchReader] c_recordbatchreader
vector[CDeclaration].iterator plan_iter
shared_ptr[const CKeyValueMetadata] c_meta
vector[CDeclaration.Input] no_c_inputs
CStatus c_plan_status

Expand All @@ -82,7 +88,13 @@ cdef execplan(inputs, output_type, vector[CDeclaration] plan, c_bool use_threads

c_exec_context = make_shared[CExecContext](
c_default_memory_pool(), c_executor)
c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get()))

if metadata is not None:
c_meta = pyarrow_unwrap_metadata(metadata)
c_exec_plan = GetResultValue(
CExecPlan.MakeWithMetadata(c_exec_context.get(), c_meta))
else:
c_exec_plan = GetResultValue(CExecPlan.Make(c_exec_context.get()))

plan_iter = plan.begin()

Expand Down
6 changes: 6 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2505,12 +2505,18 @@ cdef extern from "arrow/compute/exec/exec_plan.h" namespace "arrow::compute" nog
@staticmethod
CResult[shared_ptr[CExecPlan]] Make(CExecContext* exec_context)

@staticmethod
CResult[shared_ptr[CExecPlan]] MakeWithMetadata" Make"(CExecContext* exec_context, shared_ptr[const CKeyValueMetadata] metadata)

CStatus StartProducing()
CStatus Validate()
CStatus StopProducing()

CFuture_Void finished()

c_bool HasMetadata()
shared_ptr[const CKeyValueMetadata] metadata()

vector[CExecNode*] sinks() const
vector[CExecNode*] sources() const

Expand Down
12 changes: 12 additions & 0 deletions r/R/arrowExports.R

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion r/R/dplyr-collect.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@

# The following S3 methods are registered on load if dplyr is present

collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, ...) {
collect.arrow_dplyr_query <- function(x, as_data_frame = TRUE, metadata = empty_named_list(), ...) {
# head and tail are not ExecNodes, at best we can handle them via sink node
# so if there are any steps done after head/tail, we need to
# evaluate the query up to then and then do a new query for the rest
if (is_collapsed(x) && has_head_tail(x$.data)) {
x$.data <- as_adq(dplyr::compute(x$.data))$.data
}

# attach the metadata to the query
x$metadata <- prepare_key_value_metadata(metadata)

# See query-engine.R for ExecPlan/Nodes
tryCatch(
tab <- do_exec_plan(x),
Expand Down
12 changes: 10 additions & 2 deletions r/R/dplyr.R
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#' @include record-batch.R
#' @include table.R

arrow_dplyr_query <- function(.data) {
arrow_dplyr_query <- function(.data, metadata = empty_named_list()) {
# An arrow_dplyr_query is a container for an Arrow data object (Table,
# RecordBatch, or Dataset) and the state of the user's dplyr query--things
# like selected columns, filters, and group vars.
Expand All @@ -31,6 +31,8 @@ arrow_dplyr_query <- function(.data) {
error = function(e) character()
)

metadata_kv <- prepare_key_value_metadata(metadata)

if (inherits(.data, "data.frame")) {
.data <- Table$create(.data)
}
Expand Down Expand Up @@ -67,7 +69,13 @@ arrow_dplyr_query <- function(.data) {
arrange_vars = list(),
# arrange_desc will be a logical vector indicating the sort order for each
# expression in arrange_vars (FALSE for ascending, TRUE for descending)
arrange_desc = logical()
arrange_desc = logical(),
# metadata is a named list of strings that'll be attached to the ExecPlan
# This metadata will be added to the OpenTelemetry tracing
# output generated by this plan. (see ARROW-15061)
# Can be used for annotating the traces with information about the plan
# and the environment (e.g TPC-H query id, scale factor, hardware details).
metadata = metadata_kv
),
class = "arrow_dplyr_query"
)
Expand Down
19 changes: 15 additions & 4 deletions r/R/query-engine.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# under the License.

do_exec_plan <- function(.data) {
plan <- ExecPlan$create()
plan <- ExecPlan$create(metadata = .data$metadata)
final_node <- plan$Build(.data)
tab <- plan$Run(final_node)
# TODO (ARROW-14289): make the head/tail methods return RBR not Table
Expand Down Expand Up @@ -237,11 +237,22 @@ ExecPlan <- R6Class("ExecPlan",
# TODO(ARROW-16200): take FileSystemDatasetWriteOptions not ...
ExecPlan_Write(self, node, ...)
},
Stop = function() ExecPlan_StopProducing(self)
Stop = function() ExecPlan_StopProducing(self),
HasMetadata = function() {
ExecPlan_HasMetadata(self)
},
metadata = function() {
ExecPlan_metadata(self)
}
)
)
ExecPlan$create <- function(use_threads = option_use_threads()) {
ExecPlan_create(use_threads)

ExecPlan$create <- function(use_threads = option_use_threads(), metadata) {
if (missing(metadata)) {
ExecPlan_create(use_threads)
} else {
ExecPlan_create_with_metadata(use_threads, metadata)
}
}

ExecNode <- R6Class("ExecNode",
Expand Down
31 changes: 31 additions & 0 deletions r/src/arrowExports.cpp

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 42 additions & 0 deletions r/src/compute-exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,48 @@ std::shared_ptr<compute::ExecPlan> ExecPlan_create(bool use_threads) {
return plan;
}

// [[arrow::export]]
std::shared_ptr<compute::ExecPlan> ExecPlan_create_with_metadata(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems rather odd to me that we would need two methods here but it may just be my ignorance of working with R. Wouldn't it be possible to send an empty value for metadata if the user doesn't want to specify it?

bool use_threads, cpp11::strings metadata) {
static compute::ExecContext threaded_context{gc_memory_pool(),
arrow::internal::GetCpuThreadPool()};
auto vec_metadata = cpp11::as_cpp<std::vector<std::string>>(metadata);
auto names_metadata = cpp11::as_cpp<std::vector<std::string>>(metadata.names());
auto kv = std::shared_ptr<arrow::KeyValueMetadata>(
new arrow::KeyValueMetadata(names_metadata, vec_metadata));
auto plan = ValueOrStop(
compute::ExecPlan::Make(use_threads ? &threaded_context : gc_context(), kv));
return plan;
}

// [[arrow::export]]
bool ExecPlan_HasMetadata(const std::shared_ptr<compute::ExecPlan>& plan) {
return plan->HasMetadata();
}

// [[arrow::export]]
cpp11::writable::list ExecPlan_metadata(const std::shared_ptr<compute::ExecPlan>& plan) {
auto meta = plan->metadata();
int64_t n = 0;
if (plan->HasMetadata()) {
n = meta->size();
}

cpp11::writable::list out(n);
std::vector<std::string> names_out(n);

for (int i = 0; i < n; i++) {
auto key = meta->key(i);
out[i] = cpp11::as_sexp(meta->value(i));
if (key == "r") {
Rf_classgets(out[i], arrow::r::data::classes_metadata_r);
}
names_out[i] = key;
}
out.names() = names_out;
return out;
}

std::shared_ptr<compute::ExecNode> MakeExecNodeOrStop(
const std::string& factory_name, compute::ExecPlan* plan,
std::vector<compute::ExecNode*> inputs, const compute::ExecNodeOptions& options) {
Expand Down