From 7bfecc234d1cbf5948bbcebd4317a9fd2caafa67 Mon Sep 17 00:00:00 2001 From: Alvin Chunga Date: Thu, 24 Mar 2022 02:54:47 -0500 Subject: [PATCH 1/3] ARROW-15062: [C++] Add memory information to current spans, bytes in memory_pool and RSS --- cpp/src/arrow/compute/exec/aggregate_node.cc | 157 +++++++++++++------ cpp/src/arrow/compute/exec/exec_plan.cc | 14 +- cpp/src/arrow/compute/exec/filter_node.cc | 25 ++- cpp/src/arrow/compute/exec/hash_join.cc | 12 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 10 +- cpp/src/arrow/compute/exec/project_node.cc | 24 ++- cpp/src/arrow/compute/exec/sink_node.cc | 58 +++++-- cpp/src/arrow/compute/exec/source_node.cc | 14 +- cpp/src/arrow/compute/exec/union_node.cc | 12 +- cpp/src/arrow/compute/function.cc | 12 +- cpp/src/arrow/util/io_util.cc | 63 ++++++++ cpp/src/arrow/util/io_util.h | 12 ++ cpp/src/arrow/util/tracing_internal.cc | 4 + 13 files changed, 316 insertions(+), 101 deletions(-) diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index b4982ef111f..32ad772dd4a 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -168,17 +168,26 @@ class ScalarAggregateNode : public ExecNode { Status DoConsume(const ExecBatch& batch, size_t thread_index) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN( + span, "Consume", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", + plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}}); + START_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); @@ -191,10 +200,14 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); DCHECK_EQ(input, inputs_[0]); auto thread_index = get_thread_index_(); @@ -221,10 +234,14 @@ class ScalarAggregateNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); // Scalar aggregates will only output a single batch @@ -262,17 +279,27 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { util::tracing::Span span; - START_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}}); + START_SPAN( + span, "Finish", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + START_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); KernelContext ctx{plan()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); @@ -392,10 +419,14 @@ class GroupByNode : public ExecNode { Status Consume(ExecBatch batch) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN( + span, "Consume", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); size_t thread_index = get_thread_index_(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -418,11 +449,15 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}}); + START_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); @@ -439,7 +474,13 @@ class GroupByNode : public ExecNode { Status Merge() { util::tracing::Span span; - START_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}}); + START_SPAN( + span, "Merge", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ThreadLocalState* state0 = &local_states_[0]; for (size_t i = 1; i < local_states_.size(); ++i) { ThreadLocalState* state = &local_states_[i]; @@ -457,7 +498,11 @@ class GroupByNode : public ExecNode { {{"function.name", aggs_[i].function}, {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Merge"}}); + {"function.kind", std::string(kind_name()) + "::Merge"}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", + arrow::internal::tracing::GetMemoryUsedByProcess()}}); KernelContext batch_ctx{ctx_}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); @@ -473,8 +518,13 @@ class GroupByNode : public ExecNode { Result Finalize() { util::tracing::Span span; - START_SPAN(span, "Finalize", - {{"group_by", ToStringExtra()}, {"node.label", label()}}); + START_SPAN( + span, "Finalize", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ThreadLocalState* state = &local_states_[0]; // If we never got any batches, then state won't have been initialized @@ -486,11 +536,15 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + START_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); KernelContext batch_ctx{ctx_}; batch_ctx.SetState(state->agg_states[i].get()); RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); @@ -548,10 +602,14 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); // bail if StopProducing was called if (finished_.is_finished()) return; @@ -588,9 +646,12 @@ class GroupByNode : public ExecNode { Status StartProducing() override { START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 7a1c387c1aa..f6ce2c3c2e2 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -81,7 +81,12 @@ struct ExecPlanImpl : public ExecPlan { } Status StartProducing() { - START_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); + START_SPAN( + span_, "ExecPlan", + {{"plan", ToString()}, + {"memory_pool_bytes", exec_context_->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); @@ -389,7 +394,12 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) { Status MapNode::StartProducing() { START_SPAN( span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 6f15e44653f..64b77c33772 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -70,10 +70,15 @@ class FilterNode : public MapNode { SimplifyWithGuarantee(filter_, target.guarantee)); util::tracing::Span span; - START_SPAN(span, "Filter", - {{"filter.expression", ToStringExtra()}, - {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", target.length}}); + + START_SPAN( + span, "Filter", + {{"filter.expression", ToStringExtra()}, + {"filter.expression.simplified", simplified_filter.ToString()}, + {"filter.length", target.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, plan()->exec_context())); @@ -103,10 +108,14 @@ class FilterNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"filter", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"filter", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); auto result = DoFilter(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 56d02dd3b11..4016380e1e5 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -90,10 +90,14 @@ class HashJoinBasicImpl : public HashJoinImpl { TaskScheduler::ScheduleImpl schedule_task_callback) override { num_threads = std::max(num_threads, static_cast(1)); - START_SPAN(span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, - {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads)}}); + START_SPAN( + span_, "HashJoinBasicImpl", + {{"detail", filter.ToString()}, + {"join.kind", ToString(join_type)}, + {"join.threads", static_cast(num_threads)}, + {"memory_pool_bytes", ctx->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ctx_ = ctx; join_type_ = join_type; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 060d40a078c..56df1b2980d 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -525,8 +525,12 @@ class HashJoinNode : public ExecNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); { Status status = impl_->InputReceived(thread_index, side, std::move(batch)); @@ -572,7 +576,7 @@ class HashJoinNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 79eddeb15e2..603c9441a6e 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,10 +80,14 @@ class ProjectNode : public MapNode { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, "Project", - {{"project.descr", exprs_[i].descr().ToString()}, - {"project.length", target.length}, - {"project.expression", exprs_[i].ToString()}}); + START_SPAN( + span, "Project", + {{"project.descr", exprs_[i].descr().ToString()}, + {"project.length", target.length}, + {"project.expression", exprs_[i].ToString()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], target.guarantee)); @@ -98,10 +102,14 @@ class ProjectNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"project", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"project", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); auto result = DoProject(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index e981de38996..752affd826f 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -76,10 +76,14 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); @@ -106,8 +110,13 @@ class SinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); DCHECK_EQ(input, inputs_[0]); @@ -174,10 +183,14 @@ class ConsumingSinkNode : public ExecNode { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); DCHECK_GT(inputs_.size(), 0); RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); @@ -204,8 +217,13 @@ class ConsumingSinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); DCHECK_EQ(input, inputs_[0]); @@ -354,8 +372,13 @@ struct OrderBySinkNode final : public SinkNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, + {"batch.length", batch.length}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); DCHECK_EQ(input, inputs_[0]); @@ -392,7 +415,12 @@ struct OrderBySinkNode final : public SinkNode { void Finish() override { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); + START_SPAN_WITH_PARENT( + span, span_, "Finish", + {{"node.label", label()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 777faf22192..14a5062c753 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -70,11 +70,15 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}}); + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 9232516cc6d..6548de78e90 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -112,10 +112,14 @@ class UnionNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 1a7f36862dd..9e91a8ccab6 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -215,10 +215,14 @@ Result Function::Execute(const std::vector& args, } util::tracing::Span span; - START_SPAN(span, name(), - {{"function.name", name()}, - {"function.options", options ? options->ToString() : ""}, - {"function.kind", kind()}}); + START_SPAN( + span, name(), + {{"function.name", name()}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", kind()}, + {"memory_pool_bytes", ctx->memory_pool()->bytes_allocated()}, + {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, + {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); // type-check Datum arguments here. Really we'd like to avoid this as much as // possible diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 398e65a3c93..a14810b2f91 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -51,6 +51,7 @@ #include #include +#include #include #include #include // IWYU pragma: keep @@ -68,11 +69,18 @@ #ifdef _WIN32 #include "arrow/io/mman.h" +#include "psapi.h" +#include "windows.h" #undef Realloc #undef Free #else // POSIX-like platforms +#include +#include +#include #include +#include #include +#include #endif // define max read/write count @@ -1867,5 +1875,60 @@ uint64_t GetOptionalThreadId() { return (tid == 0) ? tid - 1 : tid; } +int parseLine(std::string line){ + // This assumes that a digit will be found and the line ends in " Kb". + uint64_t i = line.length(); + uint64_t index = 0; + while (line[index] <'0' || line[index] > '9') index++; + line[i-3] = '\0'; + i = atoi(&line[index]); + return i; +} + +int64_t GetMemoryUsedByProcess() { +#ifdef _WIN32 + PROCESS_MEMORY_COUNTERS_EX pmc; + PROCESS_MEMORY_COUNTERS_EX pmc; + GetProcessMemoryInfo(GetCurrentProcess(), (PROCESS_MEMORY_COUNTERS*)&pmc, sizeof(pmc)); + return pmc.WorkingSetSize; +#else + std::string line; + std::string name; + uint64_t result; + std::ifstream status_info("/proc/self/status", std::ios::in); + while (status_info) { + std::getline(status_info, line); + int64_t colon = line.find(':'); + if (colon != std::string::npos) { + name = TrimString(line.substr(0, colon - 1)); + if (name.compare("VmRSS") == 0){ + result = parseLine(line); + break; + } + } + } + if (status_info.is_open()) status_info.close(); + return result * 1024; +#endif +} + +//TODO: Reutilize PR-11426 functions +int64_t GetMemoryUsed() { +#ifdef _WIN32 + MEMORYSTATUSEX memInfo; + memInfo.dwLength = sizeof(MEMORYSTATUSEX); + GlobalMemoryStatusEx(&memInfo); + return memInfo.ullTotalPhys - memInfo.ullAvailPhys; +#else +#endif + int64_t total_memory_size; + int64_t used_memory_size; + struct sysinfo si; + sysinfo(&si); + total_memory_size = static_cast(si.totalram); + used_memory_size = total_memory_size - static_cast(si.freeram); + return used_memory_size; +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index c094727a647..85650780158 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -346,5 +346,17 @@ int64_t GetRandomSeed(); ARROW_EXPORT uint64_t GetThreadId(); +/// \brief Get the current Memory used in bytes +/// +/// This function support Windows and Linux +ARROW_EXPORT +int64_t GetMemoryUsed(); + +/// \brief Get the current memory used by current process in bytes +/// +/// This function support Windows and Linux +ARROW_EXPORT +int64_t GetMemoryUsedByProcess(); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 904a1fd76a8..95b7c7fb3c1 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -23,6 +23,10 @@ #include #include #include +#include +#include +#include +#include #ifdef _MSC_VER #pragma warning(push) From fe9a3fa947ae609bdb100258fe539b390615218e Mon Sep 17 00:00:00 2001 From: Alvin Chunga Date: Tue, 5 Apr 2022 20:15:13 -0500 Subject: [PATCH 2/3] ARROW-15251: [C++] Remove memory system info (RSS) until query tester will be implemented --- cpp/src/arrow/compute/exec/aggregate_node.cc | 159 +++++++------------ cpp/src/arrow/compute/exec/exec_plan.cc | 20 +-- cpp/src/arrow/compute/exec/filter_node.cc | 26 ++- cpp/src/arrow/compute/exec/hash_join.cc | 13 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 8 +- cpp/src/arrow/compute/exec/project_node.cc | 26 ++- cpp/src/arrow/compute/exec/sink_node.cc | 52 ++---- cpp/src/arrow/compute/exec/source_node.cc | 15 +- cpp/src/arrow/compute/exec/union_node.cc | 13 +- cpp/src/arrow/compute/function.cc | 13 +- cpp/src/arrow/util/io_util.cc | 114 +++++++------ cpp/src/arrow/util/io_util.h | 8 +- cpp/src/arrow/util/tracing_internal.cc | 8 +- cpp/src/arrow/util/tracing_internal.h | 4 + 14 files changed, 185 insertions(+), 294 deletions(-) diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 32ad772dd4a..97a08fb1841 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -168,26 +168,19 @@ class ScalarAggregateNode : public ExecNode { Status DoConsume(const ExecBatch& batch, size_t thread_index) { util::tracing::Span span; - START_SPAN( - span, "Consume", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", - plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, "Consume", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN( - span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}, + GET_MEMORY_POOL_INFO}); KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); @@ -200,14 +193,11 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); DCHECK_EQ(input, inputs_[0]); auto thread_index = get_thread_index_(); @@ -234,14 +224,11 @@ class ScalarAggregateNode : public ExecNode { } Status StartProducing() override { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); // Scalar aggregates will only output a single batch @@ -281,25 +268,18 @@ class ScalarAggregateNode : public ExecNode { util::tracing::Span span; START_SPAN( span, "Finish", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"aggregate", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN( - span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}, + GET_MEMORY_POOL_INFO}); KernelContext ctx{plan()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); @@ -419,14 +399,11 @@ class GroupByNode : public ExecNode { Status Consume(ExecBatch batch) { util::tracing::Span span; - START_SPAN( - span, "Consume", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, "Consume", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); size_t thread_index = get_thread_index_(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -449,15 +426,12 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN( - span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}, + GET_MEMORY_POOL_INFO}); KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); @@ -476,11 +450,7 @@ class GroupByNode : public ExecNode { util::tracing::Span span; START_SPAN( span, "Merge", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"group_by", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); ThreadLocalState* state0 = &local_states_[0]; for (size_t i = 1; i < local_states_.size(); ++i) { ThreadLocalState* state = &local_states_[i]; @@ -499,10 +469,7 @@ class GroupByNode : public ExecNode { {"function.options", aggs_[i].options ? aggs_[i].options->ToString() : ""}, {"function.kind", std::string(kind_name()) + "::Merge"}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", - arrow::internal::tracing::GetMemoryUsedByProcess()}}); + GET_MEMORY_POOL_INFO}); KernelContext batch_ctx{ctx_}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); @@ -520,11 +487,7 @@ class GroupByNode : public ExecNode { util::tracing::Span span; START_SPAN( span, "Finalize", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"group_by", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); ThreadLocalState* state = &local_states_[0]; // If we never got any batches, then state won't have been initialized @@ -536,15 +499,12 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN( - span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}, + GET_MEMORY_POOL_INFO}); KernelContext batch_ctx{ctx_}; batch_ctx.SetState(state->agg_states[i].get()); RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); @@ -602,14 +562,11 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); // bail if StopProducing was called if (finished_.is_finished()) return; @@ -646,12 +603,10 @@ class GroupByNode : public ExecNode { Status StartProducing() override { START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", ctx_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index f6ce2c3c2e2..235531e11e2 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -81,12 +81,7 @@ struct ExecPlanImpl : public ExecPlan { } Status StartProducing() { - START_SPAN( - span_, "ExecPlan", - {{"plan", ToString()}, - {"memory_pool_bytes", exec_context_->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, "ExecPlan", {{"plan", ToString()}, GET_MEMORY_POOL_INFO}); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); @@ -392,14 +387,11 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) { } Status MapNode::StartProducing() { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 64b77c33772..5d5b7cb3ddc 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -71,14 +71,11 @@ class FilterNode : public MapNode { util::tracing::Span span; - START_SPAN( - span, "Filter", - {{"filter.expression", ToStringExtra()}, - {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", target.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, "Filter", + {{"filter.expression", ToStringExtra()}, + {"filter.expression.simplified", simplified_filter.ToString()}, + {"filter.length", target.length}, + GET_MEMORY_POOL_INFO}); ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, plan()->exec_context())); @@ -108,14 +105,11 @@ class FilterNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"filter", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"filter", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); auto result = DoFilter(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 4016380e1e5..30126a4be5e 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -90,14 +90,11 @@ class HashJoinBasicImpl : public HashJoinImpl { TaskScheduler::ScheduleImpl schedule_task_callback) override { num_threads = std::max(num_threads, static_cast(1)); - START_SPAN( - span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, - {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads)}, - {"memory_pool_bytes", ctx->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, "HashJoinBasicImpl", + {{"detail", filter.ToString()}, + {"join.kind", ToString(join_type)}, + {"join.threads", static_cast(num_threads)}, + GET_MEMORY_POOL_INFO}); ctx_ = ctx; join_type_ = join_type; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 56df1b2980d..3e1c14148b2 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -525,12 +525,8 @@ class HashJoinNode : public ExecNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); { Status status = impl_->InputReceived(thread_index, side, std::move(batch)); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 603c9441a6e..217208577e2 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,14 +80,11 @@ class ProjectNode : public MapNode { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { util::tracing::Span span; - START_SPAN( - span, "Project", - {{"project.descr", exprs_[i].descr().ToString()}, - {"project.length", target.length}, - {"project.expression", exprs_[i].ToString()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, "Project", + {{"project.descr", exprs_[i].descr().ToString()}, + {"project.length", target.length}, + {"project.expression", exprs_[i].ToString()}, + GET_MEMORY_POOL_INFO}); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], target.guarantee)); @@ -102,14 +99,11 @@ class ProjectNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "InputReceived", - {{"project", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"project", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}, + GET_MEMORY_POOL_INFO}); auto result = DoProject(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index 752affd826f..bf5c6f5c9c9 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -76,14 +76,11 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); @@ -112,11 +109,7 @@ class SinkNode : public ExecNode { util::tracing::Span span; START_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); DCHECK_EQ(input, inputs_[0]); @@ -183,14 +176,11 @@ class ConsumingSinkNode : public ExecNode { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); DCHECK_GT(inputs_.size(), 0); RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); @@ -219,11 +209,7 @@ class ConsumingSinkNode : public ExecNode { util::tracing::Span span; START_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); DCHECK_EQ(input, inputs_[0]); @@ -374,11 +360,7 @@ struct OrderBySinkNode final : public SinkNode { util::tracing::Span span; START_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, - {"batch.length", batch.length}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); DCHECK_EQ(input, inputs_[0]); @@ -415,12 +397,8 @@ struct OrderBySinkNode final : public SinkNode { void Finish() override { util::tracing::Span span; - START_SPAN_WITH_PARENT( - span, span_, "Finish", - {{"node.label", label()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN_WITH_PARENT(span, span_, "Finish", + {{"node.label", label()}, GET_MEMORY_POOL_INFO}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 14a5062c753..cec483947ab 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -70,15 +70,12 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}, + GET_MEMORY_POOL_INFO}); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 6548de78e90..810209eb9b8 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -112,14 +112,11 @@ class UnionNode : public ExecNode { } Status StartProducing() override { - START_SPAN( - span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - {"memory_pool_bytes", plan_->exec_context()->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}, + GET_MEMORY_POOL_INFO}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 9e91a8ccab6..32a0e397946 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -215,14 +215,11 @@ Result Function::Execute(const std::vector& args, } util::tracing::Span span; - START_SPAN( - span, name(), - {{"function.name", name()}, - {"function.options", options ? options->ToString() : ""}, - {"function.kind", kind()}, - {"memory_pool_bytes", ctx->memory_pool()->bytes_allocated()}, - {"memory_used", arrow::internal::tracing::GetMemoryUsed()}, - {"memory_used_process", arrow::internal::tracing::GetMemoryUsedByProcess()}}); + START_SPAN(span, name(), + {{"function.name", name()}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", kind()}, + GET_MEMORY_POOL_INFO}); // type-check Datum arguments here. Really we'd like to avoid this as much as // possible diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index a14810b2f91..a1e3fc996ca 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -51,7 +51,6 @@ #include #include -#include #include #include #include // IWYU pragma: keep @@ -69,18 +68,11 @@ #ifdef _WIN32 #include "arrow/io/mman.h" -#include "psapi.h" -#include "windows.h" #undef Realloc #undef Free #else // POSIX-like platforms -#include -#include -#include #include -#include #include -#include #endif // define max read/write count @@ -105,10 +97,32 @@ #include "arrow/util/logging.h" // For filename conversion -#if defined(_WIN32) +#ifdef _WIN32 #include "arrow/util/utf8.h" #endif +#ifdef _WIN32 +#include +#include + +#elif __unix__ || __unix || unix || (__APPLE__ && __MACH__) +#include + +#if __APPLE__ && __MACH__ +#include + +#elif (_AIX || __TOS__AIX__) || (__sun__ || __sun || sun && (_SVR4 || __svr4__)) +#include + +#elif __linux__ || __linux || linux || __gnu_linux__ +#include + +#endif + +#else +#error "Cannot define getPeakRSS( ) or getCurrentRSS( ) for an unknown OS." +#endif + namespace arrow { using internal::checked_cast; @@ -1875,59 +1889,41 @@ uint64_t GetOptionalThreadId() { return (tid == 0) ? tid - 1 : tid; } -int parseLine(std::string line){ - // This assumes that a digit will be found and the line ends in " Kb". - uint64_t i = line.length(); - uint64_t index = 0; - while (line[index] <'0' || line[index] > '9') index++; - line[i-3] = '\0'; - i = atoi(&line[index]); - return i; -} - -int64_t GetMemoryUsedByProcess() { -#ifdef _WIN32 - PROCESS_MEMORY_COUNTERS_EX pmc; - PROCESS_MEMORY_COUNTERS_EX pmc; - GetProcessMemoryInfo(GetCurrentProcess(), (PROCESS_MEMORY_COUNTERS*)&pmc, sizeof(pmc)); - return pmc.WorkingSetSize; -#else - std::string line; - std::string name; - uint64_t result; - std::ifstream status_info("/proc/self/status", std::ios::in); - while (status_info) { - std::getline(status_info, line); - int64_t colon = line.find(':'); - if (colon != std::string::npos) { - name = TrimString(line.substr(0, colon - 1)); - if (name.compare("VmRSS") == 0){ - result = parseLine(line); - break; - } - } - } - if (status_info.is_open()) status_info.close(); - return result * 1024; -#endif -} +// Returns the current resident set size (physical memory use) measured +// in bytes, or zero if the value cannot be determined on this OS. +// See: https://stackoverflow.com/a/14927379 +int64_t GetCurrentRSS() { +#if defined(_WIN32) + // Windows -------------------------------------------------- + PROCESS_MEMORY_COUNTERS info; + GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); + return (int64_t)info.WorkingSetSize; + +#elif defined(__APPLE__) && defined(__MACH__) + // OSX ------------------------------------------------------ + struct mach_task_basic_info info; + mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; + if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != + KERN_SUCCESS) + return (int64_t)0L; + return (int64_t)info.resident_size; + +#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__) + // Linux ---------------------------------------------------- + int64_t rss = 0L; + FILE* fp = NULL; + if ((fp = fopen("/proc/self/statm", "r")) == NULL) return (int64_t)0L; + if (fscanf(fp, "%*s%ld", &rss) != 1) { + fclose(fp); + return (int64_t)0L; + } + fclose(fp); + return (int64_t)rss * (int64_t)sysconf(_SC_PAGESIZE); -//TODO: Reutilize PR-11426 functions -int64_t GetMemoryUsed() { -#ifdef _WIN32 - MEMORYSTATUSEX memInfo; - memInfo.dwLength = sizeof(MEMORYSTATUSEX); - GlobalMemoryStatusEx(&memInfo); - return memInfo.ullTotalPhys - memInfo.ullAvailPhys; #else + // AIX, BSD, Solaris, and Unknown OS ------------------------ + return (int64_t)0L; // Unsupported. #endif - int64_t total_memory_size; - int64_t used_memory_size; - struct sysinfo si; - sysinfo(&si); - total_memory_size = static_cast(si.totalram); - used_memory_size = total_memory_size - static_cast(si.freeram); - return used_memory_size; } } // namespace internal diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 85650780158..be6b0578a66 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -346,17 +346,11 @@ int64_t GetRandomSeed(); ARROW_EXPORT uint64_t GetThreadId(); -/// \brief Get the current Memory used in bytes -/// -/// This function support Windows and Linux -ARROW_EXPORT -int64_t GetMemoryUsed(); - /// \brief Get the current memory used by current process in bytes /// /// This function support Windows and Linux ARROW_EXPORT -int64_t GetMemoryUsedByProcess(); +int64_t GetCurrentRSS(); } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 95b7c7fb3c1..372f703f156 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -20,13 +20,13 @@ #include "arrow/util/thread_pool.h" #include "arrow/util/tracing.h" +#include +#include +#include +#include #include #include #include -#include -#include -#include -#include #ifdef _MSC_VER #pragma warning(push) diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 77320eb2aec..bfc73066041 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -151,6 +151,9 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) +#define GET_MEMORY_POOL_INFO \ + { "memory_pool_bytes", default_memory_pool()->bytes_allocated() } + #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ std::move(generator)) @@ -181,6 +184,7 @@ class SpanImpl {}; #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) +#define GET_MEMORY_POOL_INFO #define PROPAGATE_SPAN_TO_GENERATOR(generator) #define WRAP_ASYNC_GENERATOR(generator) #define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name) From 968716ace73a97512f510513c53f32d564f5b49c Mon Sep 17 00:00:00 2001 From: Alvin Chunga Date: Tue, 12 Apr 2022 21:09:42 -0500 Subject: [PATCH 3/3] Create START_COMPUTE_SPAN which add the memory info to the current span atributes. --- cpp/src/arrow/compute/exec/aggregate_node.cc | 125 +++++++++---------- cpp/src/arrow/compute/exec/exec_plan.cc | 10 +- cpp/src/arrow/compute/exec/filter_node.cc | 19 ++- cpp/src/arrow/compute/exec/hash_join.cc | 9 +- cpp/src/arrow/compute/exec/hash_join_node.cc | 8 +- cpp/src/arrow/compute/exec/project_node.cc | 18 ++- cpp/src/arrow/compute/exec/sink_node.cc | 33 +++-- cpp/src/arrow/compute/exec/source_node.cc | 11 +- cpp/src/arrow/compute/exec/union_node.cc | 9 +- cpp/src/arrow/compute/function.cc | 9 +- cpp/src/arrow/util/io_util.cc | 51 ++++---- cpp/src/arrow/util/io_util.h | 4 +- cpp/src/arrow/util/io_util_test.cc | 12 ++ cpp/src/arrow/util/tracing_internal.cc | 4 - cpp/src/arrow/util/tracing_internal.h | 16 ++- 15 files changed, 159 insertions(+), 179 deletions(-) diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index 97a08fb1841..5bc8e3442f8 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -168,19 +168,17 @@ class ScalarAggregateNode : public ExecNode { Status DoConsume(const ExecBatch& batch, size_t thread_index) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Consume", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); @@ -193,11 +191,10 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto thread_index = get_thread_index_(); @@ -224,11 +221,10 @@ class ScalarAggregateNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); // Scalar aggregates will only output a single batch @@ -266,20 +262,18 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { util::tracing::Span span; - START_SPAN( - span, "Finish", - {{"aggregate", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Finish", + {{"aggregate", ToStringExtra()}, {"node.label", label()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext ctx{plan()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); @@ -399,11 +393,10 @@ class GroupByNode : public ExecNode { Status Consume(ExecBatch batch) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Consume", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); size_t thread_index = get_thread_index_(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -426,12 +419,11 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); @@ -448,9 +440,8 @@ class GroupByNode : public ExecNode { Status Merge() { util::tracing::Span span; - START_SPAN( - span, "Merge", - {{"group_by", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Merge", + {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state0 = &local_states_[0]; for (size_t i = 1; i < local_states_.size(); ++i) { ThreadLocalState* state = &local_states_[i]; @@ -464,12 +455,12 @@ class GroupByNode : public ExecNode { for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Merge"}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Merge"}}); KernelContext batch_ctx{ctx_}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); @@ -485,9 +476,8 @@ class GroupByNode : public ExecNode { Result Finalize() { util::tracing::Span span; - START_SPAN( - span, "Finalize", - {{"group_by", ToStringExtra()}, {"node.label", label()}, GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Finalize", + {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state = &local_states_[0]; // If we never got any batches, then state won't have been initialized @@ -499,12 +489,11 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext batch_ctx{ctx_}; batch_ctx.SetState(state->agg_states[i].get()); RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); @@ -562,11 +551,10 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); // bail if StopProducing was called if (finished_.is_finished()) return; @@ -602,11 +590,10 @@ class GroupByNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 235531e11e2..3ffd2a7626a 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -81,7 +81,7 @@ struct ExecPlanImpl : public ExecPlan { } Status StartProducing() { - START_SPAN(span_, "ExecPlan", {{"plan", ToString()}, GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); @@ -387,11 +387,9 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) { } Status MapNode::StartProducing() { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN( + span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 5d5b7cb3ddc..0c849cb0435 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -70,12 +70,10 @@ class FilterNode : public MapNode { SimplifyWithGuarantee(filter_, target.guarantee)); util::tracing::Span span; - - START_SPAN(span, "Filter", - {{"filter.expression", ToStringExtra()}, - {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", target.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Filter", + {{"filter.expression", ToStringExtra()}, + {"filter.expression.simplified", simplified_filter.ToString()}, + {"filter.length", target.length}}); ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, plan()->exec_context())); @@ -105,11 +103,10 @@ class FilterNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"filter", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"filter", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); auto result = DoFilter(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 30126a4be5e..3207bb96984 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -90,11 +90,10 @@ class HashJoinBasicImpl : public HashJoinImpl { TaskScheduler::ScheduleImpl schedule_task_callback) override { num_threads = std::max(num_threads, static_cast(1)); - START_SPAN(span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, - {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads)}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", + {{"detail", filter.ToString()}, + {"join.kind", ToString(join_type)}, + {"join.threads", static_cast(num_threads)}}); ctx_ = ctx; join_type_ = join_type; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 3e1c14148b2..103d4153352 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -526,7 +526,7 @@ class HashJoinNode : public ExecNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); util::tracing::Span span; START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); + {{"batch.length", batch.length}}); { Status status = impl_->InputReceived(thread_index, side, std::move(batch)); @@ -573,9 +573,9 @@ class HashJoinNode : public ExecNode { Status StartProducing() override { START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this); bool use_sync_execution = !(plan_->exec_context()->executor()); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 217208577e2..b8fb64c5d54 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,11 +80,10 @@ class ProjectNode : public MapNode { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, "Project", - {{"project.descr", exprs_[i].descr().ToString()}, - {"project.length", target.length}, - {"project.expression", exprs_[i].ToString()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, "Project", + {{"project.descr", exprs_[i].descr().ToString()}, + {"project.length", target.length}, + {"project.expression", exprs_[i].ToString()}}); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], target.guarantee)); @@ -99,11 +98,10 @@ class ProjectNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"project", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"project", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); auto result = DoProject(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index bf5c6f5c9c9..ec278b55e7f 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -76,11 +76,10 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); @@ -107,9 +106,9 @@ class SinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( + START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -176,11 +175,10 @@ class ConsumingSinkNode : public ExecNode { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); DCHECK_GT(inputs_.size(), 0); RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); @@ -207,9 +205,9 @@ class ConsumingSinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( + START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -358,9 +356,9 @@ struct OrderBySinkNode final : public SinkNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT( + START_COMPUTE_SPAN_WITH_PARENT( span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}, GET_MEMORY_POOL_INFO}); + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -397,8 +395,7 @@ struct OrderBySinkNode final : public SinkNode { void Finish() override { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "Finish", - {{"node.label", label()}, GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index cec483947ab..14095aafb1d 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -70,12 +70,11 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}}); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 810209eb9b8..c5bea52500a 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -112,11 +112,10 @@ class UnionNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 32a0e397946..1c18243f5fb 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -215,11 +215,10 @@ Result Function::Execute(const std::vector& args, } util::tracing::Span span; - START_SPAN(span, name(), - {{"function.name", name()}, - {"function.options", options ? options->ToString() : ""}, - {"function.kind", kind()}, - GET_MEMORY_POOL_INFO}); + START_COMPUTE_SPAN(span, name(), + {{"function.name", name()}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", kind()}}); // type-check Datum arguments here. Really we'd like to avoid this as much as // possible diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index a1e3fc996ca..be47b94ce00 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -97,7 +97,7 @@ #include "arrow/util/logging.h" // For filename conversion -#ifdef _WIN32 +#if defined(_WIN32) #include "arrow/util/utf8.h" #endif @@ -105,22 +105,11 @@ #include #include -#elif __unix__ || __unix || unix || (__APPLE__ && __MACH__) -#include - -#if __APPLE__ && __MACH__ +#elif __APPLE__ #include -#elif (_AIX || __TOS__AIX__) || (__sun__ || __sun || sun && (_SVR4 || __svr4__)) -#include - -#elif __linux__ || __linux || linux || __gnu_linux__ -#include - -#endif - -#else -#error "Cannot define getPeakRSS( ) or getCurrentRSS( ) for an unknown OS." +#elif __linux__ +#include #endif namespace arrow { @@ -1891,38 +1880,40 @@ uint64_t GetOptionalThreadId() { // Returns the current resident set size (physical memory use) measured // in bytes, or zero if the value cannot be determined on this OS. -// See: https://stackoverflow.com/a/14927379 int64_t GetCurrentRSS() { #if defined(_WIN32) // Windows -------------------------------------------------- PROCESS_MEMORY_COUNTERS info; GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); - return (int64_t)info.WorkingSetSize; + return static_cast(info.WorkingSetSize); -#elif defined(__APPLE__) && defined(__MACH__) +#elif defined(__APPLE__) // OSX ------------------------------------------------------ struct mach_task_basic_info info; mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != - KERN_SUCCESS) - return (int64_t)0L; - return (int64_t)info.resident_size; + KERN_SUCCESS) { + ARROW_LOG(WARNING) << "Can't resolve RSS value"; + return 0; + } + return static_cast(info.resident_size); -#elif defined(__linux__) || defined(__linux) || defined(linux) || defined(__gnu_linux__) +#elif defined(__linux__) // Linux ---------------------------------------------------- int64_t rss = 0L; - FILE* fp = NULL; - if ((fp = fopen("/proc/self/statm", "r")) == NULL) return (int64_t)0L; - if (fscanf(fp, "%*s%ld", &rss) != 1) { - fclose(fp); - return (int64_t)0L; + + std::ifstream fp("/proc/self/statm"); + if (fp) { + fp >> rss; + return rss * sysconf(_SC_PAGESIZE); + } else { + ARROW_LOG(WARNING) << "Can't resolve RSS value from /proc/self/statm"; + return 0; } - fclose(fp); - return (int64_t)rss * (int64_t)sysconf(_SC_PAGESIZE); #else // AIX, BSD, Solaris, and Unknown OS ------------------------ - return (int64_t)0L; // Unsupported. + return 0; // Unsupported. #endif } diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index be6b0578a66..30dfb2ba677 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -346,9 +346,9 @@ int64_t GetRandomSeed(); ARROW_EXPORT uint64_t GetThreadId(); -/// \brief Get the current memory used by current process in bytes +/// \brief Get the current memory used by the current process in bytes /// -/// This function support Windows and Linux +/// This function supports Windows, Linux, and Mac and will return 0 otherwise ARROW_EXPORT int64_t GetCurrentRSS(); diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 0508b4c1902..efc4f2164b7 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -719,5 +719,17 @@ TEST(SendSignal, ToThread) { #endif } +TEST(Memory, GetRSS) { +#if defined(_WIN32) + ASSERT_GT(GetCurrentRSS(), 0); +#elif defined(__APPLE__) + ASSERT_GT(GetCurrentRSS(), 0); +#elif defined(__linux__) + ASSERT_GT(GetCurrentRSS(), 0); +#else + ASSERT_EQ(GetCurrentRSS(), 0); +#endif +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 372f703f156..904a1fd76a8 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -20,10 +20,6 @@ #include "arrow/util/thread_pool.h" #include "arrow/util/tracing.h" -#include -#include -#include -#include #include #include #include diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index bfc73066041..d0d6062e6e2 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -132,6 +132,16 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( ::arrow::internal::tracing::SpanOptionsWithParent(parent_span))}) \ .span) +#define START_COMPUTE_SPAN(target_span, ...) \ + START_SPAN(target_span, __VA_ARGS__); \ + target_span.Get().span->SetAttribute( \ + "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) + +#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \ + START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \ + target_span.Get().span->SetAttribute( \ + "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) + #define EVENT(target_span, ...) target_span.Get().span->AddEvent(__VA_ARGS__) #define MARK_SPAN(target_span, status) \ @@ -151,9 +161,6 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( return st; \ }) -#define GET_MEMORY_POOL_INFO \ - { "memory_pool_bytes", default_memory_pool()->bytes_allocated() } - #define PROPAGATE_SPAN_TO_GENERATOR(generator) \ generator = ::arrow::internal::tracing::PropagateSpanThroughAsyncGenerator( \ std::move(generator)) @@ -180,11 +187,12 @@ class SpanImpl {}; #define START_SPAN(target_span, ...) #define START_SPAN_WITH_PARENT(target_span, parent_span, ...) +#define START_COMPUTE_SPAN(target_span, ...) +#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) #define MARK_SPAN(target_span, status) #define EVENT(target_span, ...) #define END_SPAN(target_span) #define END_SPAN_ON_FUTURE_COMPLETION(target_span, target_future, target_capture) -#define GET_MEMORY_POOL_INFO #define PROPAGATE_SPAN_TO_GENERATOR(generator) #define WRAP_ASYNC_GENERATOR(generator) #define WRAP_ASYNC_GENERATOR_WITH_CHILD_SPAN(generator, name)