diff --git a/cpp/src/arrow/compute/exec/aggregate_node.cc b/cpp/src/arrow/compute/exec/aggregate_node.cc index b4982ef111f..5bc8e3442f8 100644 --- a/cpp/src/arrow/compute/exec/aggregate_node.cc +++ b/cpp/src/arrow/compute/exec/aggregate_node.cc @@ -168,17 +168,17 @@ class ScalarAggregateNode : public ExecNode { Status DoConsume(const ExecBatch& batch, size_t thread_index) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN(span, "Consume", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext batch_ctx{plan()->exec_context()}; batch_ctx.SetState(states_[i][thread_index].get()); @@ -191,10 +191,10 @@ class ScalarAggregateNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"aggregate", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"aggregate", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); auto thread_index = get_thread_index_(); @@ -221,10 +221,10 @@ class ScalarAggregateNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); // Scalar aggregates will only output a single batch @@ -262,17 +262,18 @@ class ScalarAggregateNode : public ExecNode { private: Status Finish() { util::tracing::Span span; - START_SPAN(span, "Finish", {{"aggregate", ToStringExtra()}, {"node.label", label()}}); + START_COMPUTE_SPAN(span, "Finish", + {{"aggregate", ToStringExtra()}, {"node.label", label()}}); ExecBatch batch{{}, 1}; batch.values.resize(kernels_.size()); for (size_t i = 0; i < kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext ctx{plan()->exec_context()}; ARROW_ASSIGN_OR_RAISE(auto merged, ScalarAggregateKernel::MergeAll( kernels_[i], &ctx, std::move(states_[i]))); @@ -392,10 +393,10 @@ class GroupByNode : public ExecNode { Status Consume(ExecBatch batch) { util::tracing::Span span; - START_SPAN(span, "Consume", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN(span, "Consume", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); size_t thread_index = get_thread_index_(); if (thread_index >= local_states_.size()) { return Status::IndexError("thread index ", thread_index, " is out of range [0, ", @@ -418,11 +419,11 @@ class GroupByNode : public ExecNode { // Execute aggregate kernels for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Consume"}}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Consume"}}); KernelContext kernel_ctx{ctx_}; kernel_ctx.SetState(state->agg_states[i].get()); @@ -439,7 +440,8 @@ class GroupByNode : public ExecNode { Status Merge() { util::tracing::Span span; - START_SPAN(span, "Merge", {{"group_by", ToStringExtra()}, {"node.label", label()}}); + START_COMPUTE_SPAN(span, "Merge", + {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state0 = &local_states_[0]; for (size_t i = 1; i < local_states_.size(); ++i) { ThreadLocalState* state = &local_states_[i]; @@ -453,11 +455,12 @@ class GroupByNode : public ExecNode { for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Merge"}}); + START_COMPUTE_SPAN( + span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Merge"}}); KernelContext batch_ctx{ctx_}; DCHECK(state0->agg_states[i]); batch_ctx.SetState(state0->agg_states[i].get()); @@ -473,8 +476,8 @@ class GroupByNode : public ExecNode { Result Finalize() { util::tracing::Span span; - START_SPAN(span, "Finalize", - {{"group_by", ToStringExtra()}, {"node.label", label()}}); + START_COMPUTE_SPAN(span, "Finalize", + {{"group_by", ToStringExtra()}, {"node.label", label()}}); ThreadLocalState* state = &local_states_[0]; // If we never got any batches, then state won't have been initialized @@ -486,11 +489,11 @@ class GroupByNode : public ExecNode { // Aggregate fields come before key fields to match the behavior of GroupBy function for (size_t i = 0; i < agg_kernels_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, aggs_[i].function, - {{"function.name", aggs_[i].function}, - {"function.options", - aggs_[i].options ? aggs_[i].options->ToString() : ""}, - {"function.kind", std::string(kind_name()) + "::Finalize"}}); + START_COMPUTE_SPAN(span, aggs_[i].function, + {{"function.name", aggs_[i].function}, + {"function.options", + aggs_[i].options ? aggs_[i].options->ToString() : ""}, + {"function.kind", std::string(kind_name()) + "::Finalize"}}); KernelContext batch_ctx{ctx_}; batch_ctx.SetState(state->agg_states[i].get()); RETURN_NOT_OK(agg_kernels_[i]->finalize(&batch_ctx, &out_data.values[i])); @@ -548,10 +551,10 @@ class GroupByNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"group_by", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"group_by", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); // bail if StopProducing was called if (finished_.is_finished()) return; @@ -587,10 +590,10 @@ class GroupByNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); diff --git a/cpp/src/arrow/compute/exec/exec_plan.cc b/cpp/src/arrow/compute/exec/exec_plan.cc index 7a1c387c1aa..3ffd2a7626a 100644 --- a/cpp/src/arrow/compute/exec/exec_plan.cc +++ b/cpp/src/arrow/compute/exec/exec_plan.cc @@ -81,7 +81,7 @@ struct ExecPlanImpl : public ExecPlan { } Status StartProducing() { - START_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); + START_COMPUTE_SPAN(span_, "ExecPlan", {{"plan", ToString()}}); #ifdef ARROW_WITH_OPENTELEMETRY if (HasMetadata()) { auto pairs = metadata().get()->sorted_pairs(); @@ -387,7 +387,7 @@ void MapNode::InputFinished(ExecNode* input, int total_batches) { } Status MapNode::StartProducing() { - START_SPAN( + START_COMPUTE_SPAN( span_, std::string(kind_name()) + ":" + label(), {{"node.label", label()}, {"node.detail", ToString()}, {"node.kind", kind_name()}}); finished_ = Future<>::Make(); diff --git a/cpp/src/arrow/compute/exec/filter_node.cc b/cpp/src/arrow/compute/exec/filter_node.cc index 6f15e44653f..0c849cb0435 100644 --- a/cpp/src/arrow/compute/exec/filter_node.cc +++ b/cpp/src/arrow/compute/exec/filter_node.cc @@ -70,10 +70,10 @@ class FilterNode : public MapNode { SimplifyWithGuarantee(filter_, target.guarantee)); util::tracing::Span span; - START_SPAN(span, "Filter", - {{"filter.expression", ToStringExtra()}, - {"filter.expression.simplified", simplified_filter.ToString()}, - {"filter.length", target.length}}); + START_COMPUTE_SPAN(span, "Filter", + {{"filter.expression", ToStringExtra()}, + {"filter.expression.simplified", simplified_filter.ToString()}, + {"filter.length", target.length}}); ARROW_ASSIGN_OR_RAISE(Datum mask, ExecuteScalarExpression(simplified_filter, target, plan()->exec_context())); @@ -103,10 +103,10 @@ class FilterNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"filter", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"filter", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); auto result = DoFilter(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 56d02dd3b11..3207bb96984 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -90,10 +90,10 @@ class HashJoinBasicImpl : public HashJoinImpl { TaskScheduler::ScheduleImpl schedule_task_callback) override { num_threads = std::max(num_threads, static_cast(1)); - START_SPAN(span_, "HashJoinBasicImpl", - {{"detail", filter.ToString()}, - {"join.kind", ToString(join_type)}, - {"join.threads", static_cast(num_threads)}}); + START_COMPUTE_SPAN(span_, "HashJoinBasicImpl", + {{"detail", filter.ToString()}, + {"join.kind", ToString(join_type)}, + {"join.threads", static_cast(num_threads)}}); ctx_ = ctx; join_type_ = join_type; diff --git a/cpp/src/arrow/compute/exec/hash_join_node.cc b/cpp/src/arrow/compute/exec/hash_join_node.cc index 060d40a078c..103d4153352 100644 --- a/cpp/src/arrow/compute/exec/hash_join_node.cc +++ b/cpp/src/arrow/compute/exec/hash_join_node.cc @@ -525,8 +525,8 @@ class HashJoinNode : public ExecNode { EVENT(span_, "InputReceived", {{"batch.length", batch.length}, {"side", side}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"batch.length", batch.length}}); { Status status = impl_->InputReceived(thread_index, side, std::move(batch)); @@ -572,10 +572,10 @@ class HashJoinNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); END_SPAN_ON_FUTURE_COMPLETION(span_, finished(), this); bool use_sync_execution = !(plan_->exec_context()->executor()); diff --git a/cpp/src/arrow/compute/exec/project_node.cc b/cpp/src/arrow/compute/exec/project_node.cc index 79eddeb15e2..b8fb64c5d54 100644 --- a/cpp/src/arrow/compute/exec/project_node.cc +++ b/cpp/src/arrow/compute/exec/project_node.cc @@ -80,10 +80,10 @@ class ProjectNode : public MapNode { std::vector values{exprs_.size()}; for (size_t i = 0; i < exprs_.size(); ++i) { util::tracing::Span span; - START_SPAN(span, "Project", - {{"project.descr", exprs_[i].descr().ToString()}, - {"project.length", target.length}, - {"project.expression", exprs_[i].ToString()}}); + START_COMPUTE_SPAN(span, "Project", + {{"project.descr", exprs_[i].descr().ToString()}, + {"project.length", target.length}, + {"project.expression", exprs_[i].ToString()}}); ARROW_ASSIGN_OR_RAISE(Expression simplified_expr, SimplifyWithGuarantee(exprs_[i], target.guarantee)); @@ -98,10 +98,10 @@ class ProjectNode : public MapNode { DCHECK_EQ(input, inputs_[0]); auto func = [this](ExecBatch batch) { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"project", ToStringExtra()}, - {"node.label", label()}, - {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "InputReceived", + {{"project", ToStringExtra()}, + {"node.label", label()}, + {"batch.length", batch.length}}); auto result = DoProject(std::move(batch)); MARK_SPAN(span, result.status()); END_SPAN(span); diff --git a/cpp/src/arrow/compute/exec/sink_node.cc b/cpp/src/arrow/compute/exec/sink_node.cc index e981de38996..ec278b55e7f 100644 --- a/cpp/src/arrow/compute/exec/sink_node.cc +++ b/cpp/src/arrow/compute/exec/sink_node.cc @@ -76,10 +76,10 @@ class SinkNode : public ExecNode { const char* kind_name() const override { return "SinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); @@ -106,8 +106,9 @@ class SinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -174,10 +175,10 @@ class ConsumingSinkNode : public ExecNode { const char* kind_name() const override { return "ConsumingSinkNode"; } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); DCHECK_GT(inputs_.size(), 0); RETURN_NOT_OK(consumer_->Init(inputs_[0]->output_schema())); finished_ = Future<>::Make(); @@ -204,8 +205,9 @@ class ConsumingSinkNode : public ExecNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -354,8 +356,9 @@ struct OrderBySinkNode final : public SinkNode { void InputReceived(ExecNode* input, ExecBatch batch) override { EVENT(span_, "InputReceived", {{"batch.length", batch.length}}); util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "InputReceived", - {{"node.label", label()}, {"batch.length", batch.length}}); + START_COMPUTE_SPAN_WITH_PARENT( + span, span_, "InputReceived", + {{"node.label", label()}, {"batch.length", batch.length}}); DCHECK_EQ(input, inputs_[0]); @@ -392,7 +395,7 @@ struct OrderBySinkNode final : public SinkNode { void Finish() override { util::tracing::Span span; - START_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); + START_COMPUTE_SPAN_WITH_PARENT(span, span_, "Finish", {{"node.label", label()}}); Status st = DoFinish(); if (ErrorIfNotOk(st)) { producer_.Push(std::move(st)); diff --git a/cpp/src/arrow/compute/exec/source_node.cc b/cpp/src/arrow/compute/exec/source_node.cc index 777faf22192..14095aafb1d 100644 --- a/cpp/src/arrow/compute/exec/source_node.cc +++ b/cpp/src/arrow/compute/exec/source_node.cc @@ -70,11 +70,11 @@ struct SourceNode : ExecNode { [[noreturn]] void InputFinished(ExecNode*, int) override { NoInputs(); } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.kind", kind_name()}, - {"node.label", label()}, - {"node.output_schema", output_schema()->ToString()}, - {"node.detail", ToString()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.kind", kind_name()}, + {"node.label", label()}, + {"node.output_schema", output_schema()->ToString()}, + {"node.detail", ToString()}}); { // If another exec node encountered an error during its StartProducing call // it might have already called StopProducing on all of its inputs (including this diff --git a/cpp/src/arrow/compute/exec/union_node.cc b/cpp/src/arrow/compute/exec/union_node.cc index 9232516cc6d..c5bea52500a 100644 --- a/cpp/src/arrow/compute/exec/union_node.cc +++ b/cpp/src/arrow/compute/exec/union_node.cc @@ -112,10 +112,10 @@ class UnionNode : public ExecNode { } Status StartProducing() override { - START_SPAN(span_, std::string(kind_name()) + ":" + label(), - {{"node.label", label()}, - {"node.detail", ToString()}, - {"node.kind", kind_name()}}); + START_COMPUTE_SPAN(span_, std::string(kind_name()) + ":" + label(), + {{"node.label", label()}, + {"node.detail", ToString()}, + {"node.kind", kind_name()}}); finished_ = Future<>::Make(); END_SPAN_ON_FUTURE_COMPLETION(span_, finished_, this); return Status::OK(); diff --git a/cpp/src/arrow/compute/function.cc b/cpp/src/arrow/compute/function.cc index 1a7f36862dd..1c18243f5fb 100644 --- a/cpp/src/arrow/compute/function.cc +++ b/cpp/src/arrow/compute/function.cc @@ -215,10 +215,10 @@ Result Function::Execute(const std::vector& args, } util::tracing::Span span; - START_SPAN(span, name(), - {{"function.name", name()}, - {"function.options", options ? options->ToString() : ""}, - {"function.kind", kind()}}); + START_COMPUTE_SPAN(span, name(), + {{"function.name", name()}, + {"function.options", options ? options->ToString() : ""}, + {"function.kind", kind()}}); // type-check Datum arguments here. Really we'd like to avoid this as much as // possible diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 398e65a3c93..be47b94ce00 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -101,6 +101,17 @@ #include "arrow/util/utf8.h" #endif +#ifdef _WIN32 +#include +#include + +#elif __APPLE__ +#include + +#elif __linux__ +#include +#endif + namespace arrow { using internal::checked_cast; @@ -1867,5 +1878,44 @@ uint64_t GetOptionalThreadId() { return (tid == 0) ? tid - 1 : tid; } +// Returns the current resident set size (physical memory use) measured +// in bytes, or zero if the value cannot be determined on this OS. +int64_t GetCurrentRSS() { +#if defined(_WIN32) + // Windows -------------------------------------------------- + PROCESS_MEMORY_COUNTERS info; + GetProcessMemoryInfo(GetCurrentProcess(), &info, sizeof(info)); + return static_cast(info.WorkingSetSize); + +#elif defined(__APPLE__) + // OSX ------------------------------------------------------ + struct mach_task_basic_info info; + mach_msg_type_number_t infoCount = MACH_TASK_BASIC_INFO_COUNT; + if (task_info(mach_task_self(), MACH_TASK_BASIC_INFO, (task_info_t)&info, &infoCount) != + KERN_SUCCESS) { + ARROW_LOG(WARNING) << "Can't resolve RSS value"; + return 0; + } + return static_cast(info.resident_size); + +#elif defined(__linux__) + // Linux ---------------------------------------------------- + int64_t rss = 0L; + + std::ifstream fp("/proc/self/statm"); + if (fp) { + fp >> rss; + return rss * sysconf(_SC_PAGESIZE); + } else { + ARROW_LOG(WARNING) << "Can't resolve RSS value from /proc/self/statm"; + return 0; + } + +#else + // AIX, BSD, Solaris, and Unknown OS ------------------------ + return 0; // Unsupported. +#endif +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index c094727a647..30dfb2ba677 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -346,5 +346,11 @@ int64_t GetRandomSeed(); ARROW_EXPORT uint64_t GetThreadId(); +/// \brief Get the current memory used by the current process in bytes +/// +/// This function supports Windows, Linux, and Mac and will return 0 otherwise +ARROW_EXPORT +int64_t GetCurrentRSS(); + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util_test.cc b/cpp/src/arrow/util/io_util_test.cc index 0508b4c1902..efc4f2164b7 100644 --- a/cpp/src/arrow/util/io_util_test.cc +++ b/cpp/src/arrow/util/io_util_test.cc @@ -719,5 +719,17 @@ TEST(SendSignal, ToThread) { #endif } +TEST(Memory, GetRSS) { +#if defined(_WIN32) + ASSERT_GT(GetCurrentRSS(), 0); +#elif defined(__APPLE__) + ASSERT_GT(GetCurrentRSS(), 0); +#elif defined(__linux__) + ASSERT_GT(GetCurrentRSS(), 0); +#else + ASSERT_EQ(GetCurrentRSS(), 0); +#endif +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 77320eb2aec..d0d6062e6e2 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -132,6 +132,16 @@ opentelemetry::trace::StartSpanOptions SpanOptionsWithParent( ::arrow::internal::tracing::SpanOptionsWithParent(parent_span))}) \ .span) +#define START_COMPUTE_SPAN(target_span, ...) \ + START_SPAN(target_span, __VA_ARGS__); \ + target_span.Get().span->SetAttribute( \ + "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) + +#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) \ + START_SPAN_WITH_PARENT(target_span, parent_span, __VA_ARGS__); \ + target_span.Get().span->SetAttribute( \ + "arrow.memory_pool_bytes", ::arrow::default_memory_pool()->bytes_allocated()) + #define EVENT(target_span, ...) target_span.Get().span->AddEvent(__VA_ARGS__) #define MARK_SPAN(target_span, status) \ @@ -177,6 +187,8 @@ class SpanImpl {}; #define START_SPAN(target_span, ...) #define START_SPAN_WITH_PARENT(target_span, parent_span, ...) +#define START_COMPUTE_SPAN(target_span, ...) +#define START_COMPUTE_SPAN_WITH_PARENT(target_span, parent_span, ...) #define MARK_SPAN(target_span, status) #define EVENT(target_span, ...) #define END_SPAN(target_span)