Skip to content
Closed
2 changes: 1 addition & 1 deletion ci/docker/fedora-35-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_WITH_BROTLI=ON \
ARROW_WITH_BZ2=ON \
ARROW_WITH_LZ4=ON \
ARROW_WITH_OPENTELEMETRY=OFF \
ARROW_WITH_OPENTELEMETRY=ON \
ARROW_WITH_SNAPPY=ON \
ARROW_WITH_ZLIB=ON \
ARROW_WITH_ZSTD=ON \
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/ubuntu-20.04-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_WITH_BROTLI=ON \
ARROW_WITH_BZ2=ON \
ARROW_WITH_LZ4=ON \
ARROW_WITH_OPENTELEMETRY=OFF \
ARROW_WITH_OPENTELEMETRY=ON \
ARROW_WITH_SNAPPY=ON \
ARROW_WITH_ZLIB=ON \
ARROW_WITH_ZSTD=ON \
Expand Down
2 changes: 1 addition & 1 deletion ci/docker/ubuntu-22.04-cpp.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ ENV ARROW_BUILD_TESTS=ON \
ARROW_WITH_BROTLI=ON \
ARROW_WITH_BZ2=ON \
ARROW_WITH_LZ4=ON \
ARROW_WITH_OPENTELEMETRY=OFF \
ARROW_WITH_OPENTELEMETRY=ON \
ARROW_WITH_SNAPPY=ON \
ARROW_WITH_ZLIB=ON \
ARROW_WITH_ZSTD=ON \
Expand Down
2 changes: 1 addition & 1 deletion cpp/cmake_modules/ThirdpartyToolchain.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -4096,7 +4096,7 @@ macro(build_opentelemetry)
# N.B. OTel targets and libraries don't follow any consistent naming scheme
if(_OPENTELEMETRY_LIB STREQUAL "http_client_curl")
set(_OPENTELEMETRY_STATIC_LIBRARY
"${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}"
"${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}"
)
elseif(_OPENTELEMETRY_LIB STREQUAL "ostream_span_exporter")
set(_OPENTELEMETRY_STATIC_LIBRARY
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ add_arrow_test(utility-test
tdigest_test.cc
test_common.cc
time_test.cc
tracing_test.cc
trie_test.cc
uri_test.cc
utf8_util_test.cc
Expand Down
38 changes: 27 additions & 11 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ namespace internal {

Executor::~Executor() = default;

// By default we do nothing here. Subclasses that expect to be allocated
// with static storage duration should override this and ensure any threads respect the
// lifetime of these resources.
void Executor::KeepAlive(std::shared_ptr<Resource> resource) {}

namespace {

struct Task {
Expand Down Expand Up @@ -200,6 +205,8 @@ struct ThreadPool::State {
// Are we shutting down?
bool please_shutdown_ = false;
bool quick_shutdown_ = false;

std::vector<std::shared_ptr<Resource>> kept_alive_resources_;
};

// The worker loop is an independent function so that it can keep running
Expand Down Expand Up @@ -417,19 +424,11 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken sto
StopCallback&& stop_callback) {
{
ProtectAgainstFork();
std::lock_guard<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
CollectFinishedWorkersUnlocked();
state_->tasks_queued_or_running_++;
if (static_cast<int>(state_->workers_.size()) < state_->tasks_queued_or_running_ &&
state_->desired_capacity_ > static_cast<int>(state_->workers_.size())) {
// We can still spin up more workers so spin up a new worker
LaunchWorkersUnlocked(/*threads=*/1);
}
#ifdef ARROW_WITH_OPENTELEMETRY
// Wrap the task to propagate a parent tracing span to it
// This task-wrapping needs to be done before we grab the mutex because the
// first call to OT (whatever that happens to be) will attempt to grab this mutex
// when calling KeepAlive to keep the OT infrastructure alive.
struct {
void operator()() {
auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan);
Expand All @@ -441,13 +440,30 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce<void()> task, StopToken sto
::arrow::internal::tracing::GetTracer()->GetCurrentSpan()};
task = std::move(wrapper);
#endif
std::lock_guard<std::mutex> lock(state_->mutex_);
if (state_->please_shutdown_) {
return Status::Invalid("operation forbidden during or after shutdown");
}
CollectFinishedWorkersUnlocked();
state_->tasks_queued_or_running_++;
if (static_cast<int>(state_->workers_.size()) < state_->tasks_queued_or_running_ &&
state_->desired_capacity_ > static_cast<int>(state_->workers_.size())) {
// We can still spin up more workers so spin up a new worker
LaunchWorkersUnlocked(/*threads=*/1);
}
state_->pending_tasks_.push_back(
{std::move(task), std::move(stop_token), std::move(stop_callback)});
}
state_->cv_.notify_one();
return Status::OK();
}

void ThreadPool::KeepAlive(std::shared_ptr<Executor::Resource> resource) {
// Seems unlikely but we might as well guard against concurrent calls to KeepAlive
std::lock_guard<std::mutex> lk(state_->mutex_);
state_->kept_alive_resources_.push_back(std::move(resource));
}

Result<std::shared_ptr<ThreadPool>> ThreadPool::Make(int threads) {
auto pool = std::shared_ptr<ThreadPool>(new ThreadPool());
RETURN_NOT_OK(pool->SetCapacity(threads));
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,27 @@ class ARROW_EXPORT Executor {
// Executor. Returns false if this Executor does not support this property.
virtual bool OwnsThisThread() { return false; }

/// \brief An interface to represent something with a custom destructor
///
/// \see KeepAlive
class ARROW_EXPORT Resource {
public:
virtual ~Resource() = default;
};

/// \brief Keep a resource alive until all executor threads have terminated
///
/// Executors may have static storage duration. In particular, the CPU and I/O
/// executors are currently implemented this way. These threads may access other
/// objects with static storage duration such as the OpenTelemetry runtime context
/// the default memory pool, or other static executors.
///
/// The order in which these objects are destroyed is difficult to control. In order
/// to ensure those objects remain alive until all threads have finished those objects
/// should be wrapped in a Resource object and passed into this method. The given
/// shared_ptr will be kept alive until all threads have finished their worker loops.
virtual void KeepAlive(std::shared_ptr<Resource> resource);

protected:
ARROW_DISALLOW_COPY_AND_ASSIGN(Executor);

Expand Down Expand Up @@ -434,6 +455,8 @@ class ARROW_EXPORT ThreadPool : public Executor {
// This is useful for sequencing tests
void WaitForIdle();

void KeepAlive(std::shared_ptr<Executor::Resource> resource) override;

struct State;

protected:
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/arrow/util/tracing_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
// under the License.

#include "arrow/util/tracing_internal.h"
#include "arrow/io/interfaces.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing.h"

#include <iostream>
Expand Down Expand Up @@ -134,7 +136,23 @@ std::unique_ptr<sdktrace::SpanExporter> InitializeExporter() {
return nullptr;
}

struct StorageSingleton : public Executor::Resource {
StorageSingleton()
: storage_(otel::context::RuntimeContext::GetConstRuntimeContextStorage()) {}
nostd::shared_ptr<const otel::context::RuntimeContextStorage> storage_;
};

std::shared_ptr<Executor::Resource> GetStorageSingleton() {
static std::shared_ptr<StorageSingleton> storage_singleton =
std::make_shared<StorageSingleton>();
return storage_singleton;
}

nostd::shared_ptr<sdktrace::TracerProvider> InitializeSdkTracerProvider() {
// Bind the lifetime of the OT runtime context to the CPU and I/O thread
// pools. This will keep OT alive until all thread tasks have finished.
internal::GetCpuThreadPool()->KeepAlive(GetStorageSingleton());
io::default_io_context().executor()->KeepAlive(GetStorageSingleton());
auto exporter = InitializeExporter();
if (exporter) {
sdktrace::BatchSpanProcessorOptions options;
Expand Down
53 changes: 53 additions & 0 deletions cpp/src/arrow/util/tracing_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include <thread>

#include <gtest/gtest.h>

#include "arrow/testing/gtest_util.h"
#include "arrow/util/thread_pool.h"
#include "arrow/util/tracing_internal.h"

namespace arrow {
namespace util {
namespace tracing {

#ifdef ARROW_WITH_OPENTELEMETRY

// This test is a regression for ARROW-15604. OT has some static state that
// can be initialized after the CPU and I/O thread pools. We need to make
// sure OT's state persists for the lifetime of any threads in those thread
// pools.
//
// This test checks this by spawning a thread task that will outlive the unit
// test's lifetime and so teardown of static state should begin before this
// thread finishes.
TEST(Tracing, OtLifetime) {
ASSERT_OK(::arrow::internal::GetCpuThreadPool()->Spawn([] {
// This thread will outlive the main test thread.
Span span;
START_SPAN(span, "Test");
SleepFor(0.1);
}));
}

#endif

} // namespace tracing
} // namespace util
} // namespace arrow
4 changes: 2 additions & 2 deletions cpp/thirdparty/versions.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ ARROW_MIMALLOC_BUILD_VERSION=v1.7.3
ARROW_MIMALLOC_BUILD_SHA256_CHECKSUM=0f987bda01ca9df87ec90e9d98c63fa893ee61f3cca565e5ca5ed744fdcc5109
ARROW_NLOHMANN_JSON_BUILD_VERSION=v3.10.2
ARROW_NLOHMANN_JSON_BUILD_SHA256_CHECKSUM=081ed0f9f89805c2d96335c3acfa993b39a0a5b4b4cef7edb68dd2210a13458c
ARROW_OPENTELEMETRY_BUILD_VERSION=v1.2.0
ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=7a6420f9e4fa44b81a5b06e30e5e116da71decc9086e5cc4f126e1efc8a397c2
ARROW_OPENTELEMETRY_BUILD_VERSION=v1.3.0
ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=6a4c43b9c9f753841ebc0fe2717325271f02e2a1d5ddd0b52735c35243629ab3
ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.11.0
ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=985367f8905e91018e636cbf0d83ab3f834b665c4f5899a27d10cae9657710e2
ARROW_ORC_BUILD_VERSION=1.7.3
Expand Down