diff --git a/ci/docker/fedora-35-cpp.dockerfile b/ci/docker/fedora-35-cpp.dockerfile index 469ff59d7fd..b79ceb894bf 100644 --- a/ci/docker/fedora-35-cpp.dockerfile +++ b/ci/docker/fedora-35-cpp.dockerfile @@ -85,7 +85,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile b/ci/docker/ubuntu-20.04-cpp.dockerfile index dfeb93bedd9..6e811ea2f71 100644 --- a/ci/docker/ubuntu-20.04-cpp.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp.dockerfile @@ -142,7 +142,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index 92d802f8760..a7cc5ff38ad 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -170,7 +170,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 3a0353bc7db..9d9376bb8f2 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -4096,7 +4096,7 @@ macro(build_opentelemetry) # N.B. OTel targets and libraries don't follow any consistent naming scheme if(_OPENTELEMETRY_LIB STREQUAL "http_client_curl") set(_OPENTELEMETRY_STATIC_LIBRARY - "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" ) elseif(_OPENTELEMETRY_LIB STREQUAL "ostream_span_exporter") set(_OPENTELEMETRY_STATIC_LIBRARY diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 8662c319c99..cd1a8967eeb 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -67,6 +67,7 @@ add_arrow_test(utility-test tdigest_test.cc test_common.cc time_test.cc + tracing_test.cc trie_test.cc uri_test.cc utf8_util_test.cc diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index b7ad783866c..d9da841d3aa 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -37,6 +37,11 @@ namespace internal { Executor::~Executor() = default; +// By default we do nothing here. Subclasses that expect to be allocated +// with static storage duration should override this and ensure any threads respect the +// lifetime of these resources. +void Executor::KeepAlive(std::shared_ptr resource) {} + namespace { struct Task { @@ -200,6 +205,8 @@ struct ThreadPool::State { // Are we shutting down? bool please_shutdown_ = false; bool quick_shutdown_ = false; + + std::vector> kept_alive_resources_; }; // The worker loop is an independent function so that it can keep running @@ -417,19 +424,11 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto StopCallback&& stop_callback) { { ProtectAgainstFork(); - std::lock_guard lock(state_->mutex_); - if (state_->please_shutdown_) { - return Status::Invalid("operation forbidden during or after shutdown"); - } - CollectFinishedWorkersUnlocked(); - state_->tasks_queued_or_running_++; - if (static_cast(state_->workers_.size()) < state_->tasks_queued_or_running_ && - state_->desired_capacity_ > static_cast(state_->workers_.size())) { - // We can still spin up more workers so spin up a new worker - LaunchWorkersUnlocked(/*threads=*/1); - } #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it + // This task-wrapping needs to be done before we grab the mutex because the + // first call to OT (whatever that happens to be) will attempt to grab this mutex + // when calling KeepAlive to keep the OT infrastructure alive. struct { void operator()() { auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); @@ -441,6 +440,17 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; task = std::move(wrapper); #endif + std::lock_guard lock(state_->mutex_); + if (state_->please_shutdown_) { + return Status::Invalid("operation forbidden during or after shutdown"); + } + CollectFinishedWorkersUnlocked(); + state_->tasks_queued_or_running_++; + if (static_cast(state_->workers_.size()) < state_->tasks_queued_or_running_ && + state_->desired_capacity_ > static_cast(state_->workers_.size())) { + // We can still spin up more workers so spin up a new worker + LaunchWorkersUnlocked(/*threads=*/1); + } state_->pending_tasks_.push_back( {std::move(task), std::move(stop_token), std::move(stop_callback)}); } @@ -448,6 +458,12 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto return Status::OK(); } +void ThreadPool::KeepAlive(std::shared_ptr resource) { + // Seems unlikely but we might as well guard against concurrent calls to KeepAlive + std::lock_guard lk(state_->mutex_); + state_->kept_alive_resources_.push_back(std::move(resource)); +} + Result> ThreadPool::Make(int threads) { auto pool = std::shared_ptr(new ThreadPool()); RETURN_NOT_OK(pool->SetCapacity(threads)); diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 4b7002a6736..cb23b22fcbd 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -202,6 +202,27 @@ class ARROW_EXPORT Executor { // Executor. Returns false if this Executor does not support this property. virtual bool OwnsThisThread() { return false; } + /// \brief An interface to represent something with a custom destructor + /// + /// \see KeepAlive + class ARROW_EXPORT Resource { + public: + virtual ~Resource() = default; + }; + + /// \brief Keep a resource alive until all executor threads have terminated + /// + /// Executors may have static storage duration. In particular, the CPU and I/O + /// executors are currently implemented this way. These threads may access other + /// objects with static storage duration such as the OpenTelemetry runtime context + /// the default memory pool, or other static executors. + /// + /// The order in which these objects are destroyed is difficult to control. In order + /// to ensure those objects remain alive until all threads have finished those objects + /// should be wrapped in a Resource object and passed into this method. The given + /// shared_ptr will be kept alive until all threads have finished their worker loops. + virtual void KeepAlive(std::shared_ptr resource); + protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Executor); @@ -434,6 +455,8 @@ class ARROW_EXPORT ThreadPool : public Executor { // This is useful for sequencing tests void WaitForIdle(); + void KeepAlive(std::shared_ptr resource) override; + struct State; protected: diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 5dbb0f345e4..904a1fd76a8 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -16,6 +16,8 @@ // under the License. #include "arrow/util/tracing_internal.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/tracing.h" #include @@ -134,7 +136,23 @@ std::unique_ptr InitializeExporter() { return nullptr; } +struct StorageSingleton : public Executor::Resource { + StorageSingleton() + : storage_(otel::context::RuntimeContext::GetConstRuntimeContextStorage()) {} + nostd::shared_ptr storage_; +}; + +std::shared_ptr GetStorageSingleton() { + static std::shared_ptr storage_singleton = + std::make_shared(); + return storage_singleton; +} + nostd::shared_ptr InitializeSdkTracerProvider() { + // Bind the lifetime of the OT runtime context to the CPU and I/O thread + // pools. This will keep OT alive until all thread tasks have finished. + internal::GetCpuThreadPool()->KeepAlive(GetStorageSingleton()); + io::default_io_context().executor()->KeepAlive(GetStorageSingleton()); auto exporter = InitializeExporter(); if (exporter) { sdktrace::BatchSpanProcessorOptions options; diff --git a/cpp/src/arrow/util/tracing_test.cc b/cpp/src/arrow/util/tracing_test.cc new file mode 100644 index 00000000000..08d737ddfd5 --- /dev/null +++ b/cpp/src/arrow/util/tracing_test.cc @@ -0,0 +1,53 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/thread_pool.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { +namespace util { +namespace tracing { + +#ifdef ARROW_WITH_OPENTELEMETRY + +// This test is a regression for ARROW-15604. OT has some static state that +// can be initialized after the CPU and I/O thread pools. We need to make +// sure OT's state persists for the lifetime of any threads in those thread +// pools. +// +// This test checks this by spawning a thread task that will outlive the unit +// test's lifetime and so teardown of static state should begin before this +// thread finishes. +TEST(Tracing, OtLifetime) { + ASSERT_OK(::arrow::internal::GetCpuThreadPool()->Spawn([] { + // This thread will outlive the main test thread. + Span span; + START_SPAN(span, "Test"); + SleepFor(0.1); + })); +} + +#endif + +} // namespace tracing +} // namespace util +} // namespace arrow diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 4e90f4c9682..7acecd1a530 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -66,8 +66,8 @@ ARROW_MIMALLOC_BUILD_VERSION=v1.7.3 ARROW_MIMALLOC_BUILD_SHA256_CHECKSUM=0f987bda01ca9df87ec90e9d98c63fa893ee61f3cca565e5ca5ed744fdcc5109 ARROW_NLOHMANN_JSON_BUILD_VERSION=v3.10.2 ARROW_NLOHMANN_JSON_BUILD_SHA256_CHECKSUM=081ed0f9f89805c2d96335c3acfa993b39a0a5b4b4cef7edb68dd2210a13458c -ARROW_OPENTELEMETRY_BUILD_VERSION=v1.2.0 -ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=7a6420f9e4fa44b81a5b06e30e5e116da71decc9086e5cc4f126e1efc8a397c2 +ARROW_OPENTELEMETRY_BUILD_VERSION=v1.3.0 +ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=6a4c43b9c9f753841ebc0fe2717325271f02e2a1d5ddd0b52735c35243629ab3 ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.11.0 ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=985367f8905e91018e636cbf0d83ab3f834b665c4f5899a27d10cae9657710e2 ARROW_ORC_BUILD_VERSION=1.7.3