From 43e5d47a18fba14cc717b9f22377da03e5674dd1 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 11 Feb 2022 17:05:24 -1000 Subject: [PATCH 1/9] ARROW-15604: Threads may end after the OT storage has been destroyed. This results in use-after-free. This commit binds the OT storage lifetime to our thread pool threads so that the storage will not be destroyed until all threads have ended. --- cpp/src/arrow/util/CMakeLists.txt | 1 + cpp/src/arrow/util/thread_pool.cc | 8 ++++ cpp/src/arrow/util/tracing_internal.cc | 15 ++++++++ cpp/src/arrow/util/tracing_internal.h | 11 ++++++ cpp/src/arrow/util/tracing_test.cc | 52 ++++++++++++++++++++++++++ 5 files changed, 87 insertions(+) create mode 100644 cpp/src/arrow/util/tracing_test.cc diff --git a/cpp/src/arrow/util/CMakeLists.txt b/cpp/src/arrow/util/CMakeLists.txt index 8662c319c99..cd1a8967eeb 100644 --- a/cpp/src/arrow/util/CMakeLists.txt +++ b/cpp/src/arrow/util/CMakeLists.txt @@ -67,6 +67,7 @@ add_arrow_test(utility-test tdigest_test.cc test_common.cc time_test.cc + tracing_test.cc trie_test.cc uri_test.cc utf8_util_test.cc diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index b7ad783866c..9e7331cf59d 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -29,6 +29,7 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/mutex.h" +#include "arrow/util/tracing_internal.h" #include "arrow/util/tracing_internal.h" @@ -206,6 +207,13 @@ struct ThreadPool::State { // after the ThreadPool is destroyed. static void WorkerLoop(std::shared_ptr state, std::list::iterator it) { +#ifdef ARROW_WITH_OPENTELEMETRY + // The main thread may exit and start shutting down static state before + // this thread ends. This means that any calls to OpenTelemetry's static + // state will potentially access freed memory. By grabbing a handle we + // keep OpenTelemetry's static state alive until this thread ends. + internal::tracing::OtHandle handle = internal::tracing::Attach(); +#endif std::unique_lock lock(state->mutex_); // Since we hold the lock, `it` now points to the correct thread object diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 5dbb0f345e4..2b37e792875 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -176,8 +176,23 @@ otel::trace::TracerProvider* GetTracerProvider() { static nostd::shared_ptr provider = InitializeTracing(); return provider.get(); } + +struct StorageSingleton { + StorageSingleton() : storage_(otel::context::GetDefaultStorage()) { + otel::context::RuntimeContext::SetRuntimeContextStorage(storage_); + } + nostd::shared_ptr storage_; +}; + } // namespace +static StorageSingleton storage_singleton; + +OtHandle::OtHandle(nostd::shared_ptr handle) + : handle_(std::move(handle)){}; + +OtHandle Attach() { return OtHandle(storage_singleton.storage_); } + opentelemetry::trace::Tracer* GetTracer() { static nostd::shared_ptr tracer = GetTracerProvider()->GetTracer("arrow"); diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 77320eb2aec..5a3354aee65 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -106,6 +106,17 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } +class OtHandle { + public: + OtHandle(opentelemetry::nostd::shared_ptr + handle); + + private: + opentelemetry::nostd::shared_ptr handle_; +}; + +OtHandle Attach(); + class SpanImpl { public: opentelemetry::nostd::shared_ptr span; diff --git a/cpp/src/arrow/util/tracing_test.cc b/cpp/src/arrow/util/tracing_test.cc new file mode 100644 index 00000000000..e5189e3bf16 --- /dev/null +++ b/cpp/src/arrow/util/tracing_test.cc @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "arrow/testing/gtest_util.h" +#include "arrow/util/tracing_internal.h" + +namespace arrow { +namespace util { +namespace tracing { + +#ifdef ARROW_WITH_OPENTELEMETRY + +TEST(Tracing, Attach) { + std::thread task([] { + // If this next line is commented out then the test will emit tsan + // errors because, when the main thread exits, the OT infrastructure + // will be torn down. By grabbing a handle we tie the lifetime of OT + // to the thread so that OT will not shutdown until after the thread. + auto handle = ::arrow::internal::tracing::Attach(); + Span span; + START_SPAN(span, "Test"); + SleepFor(0.1); + }); + // We don't detach our threads in Arrow but we don't control their + // lifetime since they are tied to static thread pools. So detaching + // here allows us to simulate that. + task.detach(); +} + +#endif + +} // namespace tracing +} // namespace util +} // namespace arrow \ No newline at end of file From 7433918a48677bfda78a8b02bceceb55829e754a Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 11 Feb 2022 17:06:47 -1000 Subject: [PATCH 2/9] Revert "MINOR: [CI][Python] Disable OpenTelemetry in the ubuntu-python-sdist-test build" This reverts commit d59dbbc36c7950e58332d081d47c2d43ea898215. --- docker-compose.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 8d5dfcd717f..665aeef7a12 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -776,8 +776,6 @@ services: shm_size: *shm-size environment: <<: *ccache - # Bundled build of OpenTelemetry needs a git client - ARROW_WITH_OPENTELEMETRY: "OFF" PYARROW_VERSION: ${PYARROW_VERSION:-} volumes: *ubuntu-volumes command: > From d75f1b6cf3ceffab7a25f66e73fd4915fbf8cd5f Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Fri, 11 Feb 2022 17:13:09 -1000 Subject: [PATCH 3/9] ARROW-15604: Lint --- cpp/src/arrow/util/tracing_internal.cc | 2 +- cpp/src/arrow/util/tracing_test.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 2b37e792875..f3f76366359 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -189,7 +189,7 @@ struct StorageSingleton { static StorageSingleton storage_singleton; OtHandle::OtHandle(nostd::shared_ptr handle) - : handle_(std::move(handle)){}; + : handle_(std::move(handle)) {} OtHandle Attach() { return OtHandle(storage_singleton.storage_); } diff --git a/cpp/src/arrow/util/tracing_test.cc b/cpp/src/arrow/util/tracing_test.cc index e5189e3bf16..faaf88e45ec 100644 --- a/cpp/src/arrow/util/tracing_test.cc +++ b/cpp/src/arrow/util/tracing_test.cc @@ -49,4 +49,4 @@ TEST(Tracing, Attach) { } // namespace tracing } // namespace util -} // namespace arrow \ No newline at end of file +} // namespace arrow From 3e67623f6332fd4de97969feb8435d52b74ca994 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Feb 2022 08:55:22 -1000 Subject: [PATCH 4/9] ARROW-15604: Addressing PR comments --- cpp/src/arrow/util/thread_pool.cc | 21 ++++++++++------ cpp/src/arrow/util/thread_pool.h | 23 +++++++++++++++++ cpp/src/arrow/util/tracing_internal.cc | 34 ++++++++++++++------------ cpp/src/arrow/util/tracing_test.cc | 15 +++--------- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 9e7331cf59d..37db045244e 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -29,7 +29,6 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging.h" #include "arrow/util/mutex.h" -#include "arrow/util/tracing_internal.h" #include "arrow/util/tracing_internal.h" @@ -38,6 +37,11 @@ namespace internal { Executor::~Executor() = default; +// By default we do nothing here. Subclasses that expect to be allocated +// with static storage duration should override this and ensure any threads respect the +// lifetime of these resources. +void Executor::KeepAlive(std::shared_ptr resource) {} + namespace { struct Task { @@ -201,19 +205,14 @@ struct ThreadPool::State { // Are we shutting down? bool please_shutdown_ = false; bool quick_shutdown_ = false; + + std::vector> kept_alive_resources_; }; // The worker loop is an independent function so that it can keep running // after the ThreadPool is destroyed. static void WorkerLoop(std::shared_ptr state, std::list::iterator it) { -#ifdef ARROW_WITH_OPENTELEMETRY - // The main thread may exit and start shutting down static state before - // this thread ends. This means that any calls to OpenTelemetry's static - // state will potentially access freed memory. By grabbing a handle we - // keep OpenTelemetry's static state alive until this thread ends. - internal::tracing::OtHandle handle = internal::tracing::Attach(); -#endif std::unique_lock lock(state->mutex_); // Since we hold the lock, `it` now points to the correct thread object @@ -456,6 +455,12 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto return Status::OK(); } +void ThreadPool::KeepAlive(std::shared_ptr resource) { + // Seems unlikely but we might as well guard against concurrent calls to KeepAlive + std::lock_guard lk(state_->mutex_); + state_->kept_alive_resources_.push_back(std::move(resource)); +} + Result> ThreadPool::Make(int threads) { auto pool = std::shared_ptr(new ThreadPool()); RETURN_NOT_OK(pool->SetCapacity(threads)); diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 4b7002a6736..44a8df4078b 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -202,6 +202,27 @@ class ARROW_EXPORT Executor { // Executor. Returns false if this Executor does not support this property. virtual bool OwnsThisThread() { return false; } + /// \brief An interface to represent something with a custom destructor + /// + /// \see KeepAlive + class Resource { + public: + virtual ~Resource() = default; + }; + + /// \brief Keeps a resource alive until all executor threads have terminated + /// + /// Executors may have static storage duration. In particular, the CPU and I/O + /// executors are currently implemented this way. These threads may access other + /// objects with static storage duration such as the OpenTelemetry runtime context + /// the default memory pool, or other static executors. + /// + /// The order in which these objects are destroyed is difficult to control. In order + /// to ensure those objects remain alive until all threads have finished those objects + /// should be wrapped in a Resource object and passed into this method. The given + /// shared_ptr will be kept alive until all threads have finished their worker loops. + virtual void KeepAlive(std::shared_ptr resource); + protected: ARROW_DISALLOW_COPY_AND_ASSIGN(Executor); @@ -434,6 +455,8 @@ class ARROW_EXPORT ThreadPool : public Executor { // This is useful for sequencing tests void WaitForIdle(); + void KeepAlive(std::shared_ptr resource) override; + struct State; protected: diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index f3f76366359..254b75bf102 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -16,6 +16,8 @@ // under the License. #include "arrow/util/tracing_internal.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/tracing.h" #include @@ -134,7 +136,24 @@ std::unique_ptr InitializeExporter() { return nullptr; } +struct StorageSingleton : public Executor::Resource { + StorageSingleton() : storage_(otel::context::GetDefaultStorage()) { + otel::context::RuntimeContext::SetRuntimeContextStorage(storage_); + } + nostd::shared_ptr storage_; +}; + +std::shared_ptr GetStorageSingleton() { + static std::shared_ptr storage_singleton = + std::make_shared(); + return storage_singleton; +} + nostd::shared_ptr InitializeSdkTracerProvider() { + // Bind the lifetime of the OT runtime context to the CPU and I/O thread + // pools. This will keep OT alive until all thread tasks have finished. + internal::GetCpuThreadPool()->KeepAlive(GetStorageSingleton()); + io::default_io_context().executor()->KeepAlive(GetStorageSingleton()); auto exporter = InitializeExporter(); if (exporter) { sdktrace::BatchSpanProcessorOptions options; @@ -176,23 +195,8 @@ otel::trace::TracerProvider* GetTracerProvider() { static nostd::shared_ptr provider = InitializeTracing(); return provider.get(); } - -struct StorageSingleton { - StorageSingleton() : storage_(otel::context::GetDefaultStorage()) { - otel::context::RuntimeContext::SetRuntimeContextStorage(storage_); - } - nostd::shared_ptr storage_; -}; - } // namespace -static StorageSingleton storage_singleton; - -OtHandle::OtHandle(nostd::shared_ptr handle) - : handle_(std::move(handle)) {} - -OtHandle Attach() { return OtHandle(storage_singleton.storage_); } - opentelemetry::trace::Tracer* GetTracer() { static nostd::shared_ptr tracer = GetTracerProvider()->GetTracer("arrow"); diff --git a/cpp/src/arrow/util/tracing_test.cc b/cpp/src/arrow/util/tracing_test.cc index faaf88e45ec..cdd15e6d007 100644 --- a/cpp/src/arrow/util/tracing_test.cc +++ b/cpp/src/arrow/util/tracing_test.cc @@ -20,6 +20,7 @@ #include #include "arrow/testing/gtest_util.h" +#include "arrow/util/thread_pool.h" #include "arrow/util/tracing_internal.h" namespace arrow { @@ -29,20 +30,12 @@ namespace tracing { #ifdef ARROW_WITH_OPENTELEMETRY TEST(Tracing, Attach) { - std::thread task([] { - // If this next line is commented out then the test will emit tsan - // errors because, when the main thread exits, the OT infrastructure - // will be torn down. By grabbing a handle we tie the lifetime of OT - // to the thread so that OT will not shutdown until after the thread. - auto handle = ::arrow::internal::tracing::Attach(); + ASSERT_OK(::arrow::internal::GetCpuThreadPool()->Spawn([] { + // This thread will outlive the main test thread. Span span; START_SPAN(span, "Test"); SleepFor(0.1); - }); - // We don't detach our threads in Arrow but we don't control their - // lifetime since they are tied to static thread pools. So detaching - // here allows us to simulate that. - task.detach(); + })); } #endif From 3d668e7b288a6e5f34bb8cfddcdd47b6ac41746e Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Feb 2022 08:59:20 -1000 Subject: [PATCH 5/9] ARROW-15604: Added some explanation to the test --- cpp/src/arrow/util/tracing_test.cc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/util/tracing_test.cc b/cpp/src/arrow/util/tracing_test.cc index cdd15e6d007..08d737ddfd5 100644 --- a/cpp/src/arrow/util/tracing_test.cc +++ b/cpp/src/arrow/util/tracing_test.cc @@ -29,7 +29,15 @@ namespace tracing { #ifdef ARROW_WITH_OPENTELEMETRY -TEST(Tracing, Attach) { +// This test is a regression for ARROW-15604. OT has some static state that +// can be initialized after the CPU and I/O thread pools. We need to make +// sure OT's state persists for the lifetime of any threads in those thread +// pools. +// +// This test checks this by spawning a thread task that will outlive the unit +// test's lifetime and so teardown of static state should begin before this +// thread finishes. +TEST(Tracing, OtLifetime) { ASSERT_OK(::arrow::internal::GetCpuThreadPool()->Spawn([] { // This thread will outlive the main test thread. Span span; From 76ba74a2eaeff429d4ea993d6cbb90b2db1b91b3 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Feb 2022 09:01:00 -1000 Subject: [PATCH 6/9] ARROW-15604: Removed unintentional docker-compose change --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index 665aeef7a12..8d5dfcd717f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -776,6 +776,8 @@ services: shm_size: *shm-size environment: <<: *ccache + # Bundled build of OpenTelemetry needs a git client + ARROW_WITH_OPENTELEMETRY: "OFF" PYARROW_VERSION: ${PYARROW_VERSION:-} volumes: *ubuntu-volumes command: > From 9c174011637da67e5b850c5dbabf634413244c07 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Wed, 23 Feb 2022 09:11:21 -1000 Subject: [PATCH 7/9] Revert "ARROW-15604: [C++][CI] Sporadic ThreadSanitizer failure with OpenTracing" This reverts commit 6aa30703a511451fc244f93f6a438e7cf659a030. --- ci/docker/fedora-35-cpp.dockerfile | 2 +- ci/docker/ubuntu-20.04-cpp.dockerfile | 2 +- ci/docker/ubuntu-22.04-cpp.dockerfile | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ci/docker/fedora-35-cpp.dockerfile b/ci/docker/fedora-35-cpp.dockerfile index 469ff59d7fd..b79ceb894bf 100644 --- a/ci/docker/fedora-35-cpp.dockerfile +++ b/ci/docker/fedora-35-cpp.dockerfile @@ -85,7 +85,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ diff --git a/ci/docker/ubuntu-20.04-cpp.dockerfile b/ci/docker/ubuntu-20.04-cpp.dockerfile index dfeb93bedd9..6e811ea2f71 100644 --- a/ci/docker/ubuntu-20.04-cpp.dockerfile +++ b/ci/docker/ubuntu-20.04-cpp.dockerfile @@ -142,7 +142,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ diff --git a/ci/docker/ubuntu-22.04-cpp.dockerfile b/ci/docker/ubuntu-22.04-cpp.dockerfile index 92d802f8760..a7cc5ff38ad 100644 --- a/ci/docker/ubuntu-22.04-cpp.dockerfile +++ b/ci/docker/ubuntu-22.04-cpp.dockerfile @@ -170,7 +170,7 @@ ENV ARROW_BUILD_TESTS=ON \ ARROW_WITH_BROTLI=ON \ ARROW_WITH_BZ2=ON \ ARROW_WITH_LZ4=ON \ - ARROW_WITH_OPENTELEMETRY=OFF \ + ARROW_WITH_OPENTELEMETRY=ON \ ARROW_WITH_SNAPPY=ON \ ARROW_WITH_ZLIB=ON \ ARROW_WITH_ZSTD=ON \ From 4e6a7e7f5f1e844c282bed2f7ab6f1c998ef7791 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Apr 2022 15:10:48 -1000 Subject: [PATCH 8/9] ARROW-15604: Addressing PR feedback. Upgrade OT to 1.3.0. Switch over to grabbing OT's static context instead of resetting it with our own. --- cpp/src/arrow/util/thread_pool.cc | 25 ++++++++++++++----------- cpp/src/arrow/util/thread_pool.h | 4 ++-- cpp/src/arrow/util/tracing_internal.cc | 7 +++---- cpp/src/arrow/util/tracing_internal.h | 11 ----------- cpp/thirdparty/versions.txt | 4 ++-- 5 files changed, 21 insertions(+), 30 deletions(-) diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 37db045244e..d9da841d3aa 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -424,19 +424,11 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto StopCallback&& stop_callback) { { ProtectAgainstFork(); - std::lock_guard lock(state_->mutex_); - if (state_->please_shutdown_) { - return Status::Invalid("operation forbidden during or after shutdown"); - } - CollectFinishedWorkersUnlocked(); - state_->tasks_queued_or_running_++; - if (static_cast(state_->workers_.size()) < state_->tasks_queued_or_running_ && - state_->desired_capacity_ > static_cast(state_->workers_.size())) { - // We can still spin up more workers so spin up a new worker - LaunchWorkersUnlocked(/*threads=*/1); - } #ifdef ARROW_WITH_OPENTELEMETRY // Wrap the task to propagate a parent tracing span to it + // This task-wrapping needs to be done before we grab the mutex because the + // first call to OT (whatever that happens to be) will attempt to grab this mutex + // when calling KeepAlive to keep the OT infrastructure alive. struct { void operator()() { auto scope = ::arrow::internal::tracing::GetTracer()->WithActiveSpan(activeSpan); @@ -448,6 +440,17 @@ Status ThreadPool::SpawnReal(TaskHints hints, FnOnce task, StopToken sto ::arrow::internal::tracing::GetTracer()->GetCurrentSpan()}; task = std::move(wrapper); #endif + std::lock_guard lock(state_->mutex_); + if (state_->please_shutdown_) { + return Status::Invalid("operation forbidden during or after shutdown"); + } + CollectFinishedWorkersUnlocked(); + state_->tasks_queued_or_running_++; + if (static_cast(state_->workers_.size()) < state_->tasks_queued_or_running_ && + state_->desired_capacity_ > static_cast(state_->workers_.size())) { + // We can still spin up more workers so spin up a new worker + LaunchWorkersUnlocked(/*threads=*/1); + } state_->pending_tasks_.push_back( {std::move(task), std::move(stop_token), std::move(stop_callback)}); } diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 44a8df4078b..cb23b22fcbd 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -205,12 +205,12 @@ class ARROW_EXPORT Executor { /// \brief An interface to represent something with a custom destructor /// /// \see KeepAlive - class Resource { + class ARROW_EXPORT Resource { public: virtual ~Resource() = default; }; - /// \brief Keeps a resource alive until all executor threads have terminated + /// \brief Keep a resource alive until all executor threads have terminated /// /// Executors may have static storage duration. In particular, the CPU and I/O /// executors are currently implemented this way. These threads may access other diff --git a/cpp/src/arrow/util/tracing_internal.cc b/cpp/src/arrow/util/tracing_internal.cc index 254b75bf102..904a1fd76a8 100644 --- a/cpp/src/arrow/util/tracing_internal.cc +++ b/cpp/src/arrow/util/tracing_internal.cc @@ -137,10 +137,9 @@ std::unique_ptr InitializeExporter() { } struct StorageSingleton : public Executor::Resource { - StorageSingleton() : storage_(otel::context::GetDefaultStorage()) { - otel::context::RuntimeContext::SetRuntimeContextStorage(storage_); - } - nostd::shared_ptr storage_; + StorageSingleton() + : storage_(otel::context::RuntimeContext::GetConstRuntimeContextStorage()) {} + nostd::shared_ptr storage_; }; std::shared_ptr GetStorageSingleton() { diff --git a/cpp/src/arrow/util/tracing_internal.h b/cpp/src/arrow/util/tracing_internal.h index 5a3354aee65..77320eb2aec 100644 --- a/cpp/src/arrow/util/tracing_internal.h +++ b/cpp/src/arrow/util/tracing_internal.h @@ -106,17 +106,6 @@ AsyncGenerator PropagateSpanThroughAsyncGenerator(AsyncGenerator wrapped) return PropagateSpanThroughAsyncGenerator(std::move(wrapped), std::move(span)); } -class OtHandle { - public: - OtHandle(opentelemetry::nostd::shared_ptr - handle); - - private: - opentelemetry::nostd::shared_ptr handle_; -}; - -OtHandle Attach(); - class SpanImpl { public: opentelemetry::nostd::shared_ptr span; diff --git a/cpp/thirdparty/versions.txt b/cpp/thirdparty/versions.txt index 4e90f4c9682..7acecd1a530 100644 --- a/cpp/thirdparty/versions.txt +++ b/cpp/thirdparty/versions.txt @@ -66,8 +66,8 @@ ARROW_MIMALLOC_BUILD_VERSION=v1.7.3 ARROW_MIMALLOC_BUILD_SHA256_CHECKSUM=0f987bda01ca9df87ec90e9d98c63fa893ee61f3cca565e5ca5ed744fdcc5109 ARROW_NLOHMANN_JSON_BUILD_VERSION=v3.10.2 ARROW_NLOHMANN_JSON_BUILD_SHA256_CHECKSUM=081ed0f9f89805c2d96335c3acfa993b39a0a5b4b4cef7edb68dd2210a13458c -ARROW_OPENTELEMETRY_BUILD_VERSION=v1.2.0 -ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=7a6420f9e4fa44b81a5b06e30e5e116da71decc9086e5cc4f126e1efc8a397c2 +ARROW_OPENTELEMETRY_BUILD_VERSION=v1.3.0 +ARROW_OPENTELEMETRY_BUILD_SHA256_CHECKSUM=6a4c43b9c9f753841ebc0fe2717325271f02e2a1d5ddd0b52735c35243629ab3 ARROW_OPENTELEMETRY_PROTO_BUILD_VERSION=v0.11.0 ARROW_OPENTELEMETRY_PROTO_BUILD_SHA256_CHECKSUM=985367f8905e91018e636cbf0d83ab3f834b665c4f5899a27d10cae9657710e2 ARROW_ORC_BUILD_VERSION=1.7.3 From f9c069d9ed8d547231b784a3e37dced06abcb735 Mon Sep 17 00:00:00 2001 From: Weston Pace Date: Tue, 12 Apr 2022 15:38:23 -1000 Subject: [PATCH 9/9] ARROW-15604: OT 1.3 changed the naming of the curl lib --- cpp/cmake_modules/ThirdpartyToolchain.cmake | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/cmake_modules/ThirdpartyToolchain.cmake b/cpp/cmake_modules/ThirdpartyToolchain.cmake index 3a0353bc7db..9d9376bb8f2 100644 --- a/cpp/cmake_modules/ThirdpartyToolchain.cmake +++ b/cpp/cmake_modules/ThirdpartyToolchain.cmake @@ -4096,7 +4096,7 @@ macro(build_opentelemetry) # N.B. OTel targets and libraries don't follow any consistent naming scheme if(_OPENTELEMETRY_LIB STREQUAL "http_client_curl") set(_OPENTELEMETRY_STATIC_LIBRARY - "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" + "${OPENTELEMETRY_PREFIX}/lib/${CMAKE_STATIC_LIBRARY_PREFIX}opentelemetry_${_OPENTELEMETRY_LIB}${CMAKE_STATIC_LIBRARY_SUFFIX}" ) elseif(_OPENTELEMETRY_LIB STREQUAL "ostream_span_exporter") set(_OPENTELEMETRY_STATIC_LIBRARY