From 4dce506d60f32330bfb9f723edb7713b443d32f8 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Thu, 1 Jul 2021 22:23:24 +0200 Subject: [PATCH] ARROW-13244: [C++] Add facility to get current thread id as uint64 Followup to https://github.com/apache/arrow/pull/10632 --- cpp/src/arrow/CMakeLists.txt | 1 + cpp/src/arrow/io/stdio.cc | 95 ++++++++++++++++++++++++++++ cpp/src/arrow/io/stdio.h | 82 ++++++++++++++++++++++++ cpp/src/arrow/ipc/file_to_stream.cc | 3 +- cpp/src/arrow/ipc/stream_to_file.cc | 3 +- cpp/src/arrow/util/async_generator.h | 37 +++-------- cpp/src/arrow/util/io_util.cc | 87 +++++-------------------- cpp/src/arrow/util/io_util.h | 68 ++------------------ 8 files changed, 214 insertions(+), 162 deletions(-) create mode 100644 cpp/src/arrow/io/stdio.cc create mode 100644 cpp/src/arrow/io/stdio.h diff --git a/cpp/src/arrow/CMakeLists.txt b/cpp/src/arrow/CMakeLists.txt index 56e734226b0..634d202623f 100644 --- a/cpp/src/arrow/CMakeLists.txt +++ b/cpp/src/arrow/CMakeLists.txt @@ -188,6 +188,7 @@ set(ARROW_SRCS io/interfaces.cc io/memory.cc io/slow.cc + io/stdio.cc io/transform.cc util/basic_decimal.cc util/bit_block_counter.cc diff --git a/cpp/src/arrow/io/stdio.cc b/cpp/src/arrow/io/stdio.cc new file mode 100644 index 00000000000..7ef4843a224 --- /dev/null +++ b/cpp/src/arrow/io/stdio.cc @@ -0,0 +1,95 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "arrow/io/stdio.h" + +#include + +#include "arrow/buffer.h" +#include "arrow/result.h" + +namespace arrow { +namespace io { + +// +// StdoutStream implementation +// + +StdoutStream::StdoutStream() : pos_(0) { set_mode(FileMode::WRITE); } + +Status StdoutStream::Close() { return Status::OK(); } + +bool StdoutStream::closed() const { return false; } + +Result StdoutStream::Tell() const { return pos_; } + +Status StdoutStream::Write(const void* data, int64_t nbytes) { + pos_ += nbytes; + std::cout.write(reinterpret_cast(data), nbytes); + return Status::OK(); +} + +// +// StderrStream implementation +// + +StderrStream::StderrStream() : pos_(0) { set_mode(FileMode::WRITE); } + +Status StderrStream::Close() { return Status::OK(); } + +bool StderrStream::closed() const { return false; } + +Result StderrStream::Tell() const { return pos_; } + +Status StderrStream::Write(const void* data, int64_t nbytes) { + pos_ += nbytes; + std::cerr.write(reinterpret_cast(data), nbytes); + return Status::OK(); +} + +// +// StdinStream implementation +// + +StdinStream::StdinStream() : pos_(0) { set_mode(FileMode::READ); } + +Status StdinStream::Close() { return Status::OK(); } + +bool StdinStream::closed() const { return false; } + +Result StdinStream::Tell() const { return pos_; } + +Result StdinStream::Read(int64_t nbytes, void* out) { + std::cin.read(reinterpret_cast(out), nbytes); + if (std::cin) { + pos_ += nbytes; + return nbytes; + } else { + return 0; + } +} + +Result> StdinStream::Read(int64_t nbytes) { + ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes)); + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); + ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false)); + buffer->ZeroPadding(); + return std::move(buffer); +} + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/io/stdio.h b/cpp/src/arrow/io/stdio.h new file mode 100644 index 00000000000..9484ac77124 --- /dev/null +++ b/cpp/src/arrow/io/stdio.h @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "arrow/io/interfaces.h" +#include "arrow/util/visibility.h" + +namespace arrow { +namespace io { + +// Output stream that just writes to stdout. +class ARROW_EXPORT StdoutStream : public OutputStream { + public: + StdoutStream(); + ~StdoutStream() override {} + + Status Close() override; + bool closed() const override; + + Result Tell() const override; + + Status Write(const void* data, int64_t nbytes) override; + + private: + int64_t pos_; +}; + +// Output stream that just writes to stderr. +class ARROW_EXPORT StderrStream : public OutputStream { + public: + StderrStream(); + ~StderrStream() override {} + + Status Close() override; + bool closed() const override; + + Result Tell() const override; + + Status Write(const void* data, int64_t nbytes) override; + + private: + int64_t pos_; +}; + +// Input stream that just reads from stdin. +class ARROW_EXPORT StdinStream : public InputStream { + public: + StdinStream(); + ~StdinStream() override {} + + Status Close() override; + bool closed() const override; + + Result Tell() const override; + + Result Read(int64_t nbytes, void* out) override; + + Result> Read(int64_t nbytes) override; + + private: + int64_t pos_; +}; + +} // namespace io +} // namespace arrow diff --git a/cpp/src/arrow/ipc/file_to_stream.cc b/cpp/src/arrow/ipc/file_to_stream.cc index c15eb6de21f..6ae6a4fa0c8 100644 --- a/cpp/src/arrow/ipc/file_to_stream.cc +++ b/cpp/src/arrow/ipc/file_to_stream.cc @@ -20,13 +20,12 @@ #include #include "arrow/io/file.h" +#include "arrow/io/stdio.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/result.h" #include "arrow/status.h" -#include "arrow/util/io_util.h" - namespace arrow { class RecordBatch; diff --git a/cpp/src/arrow/ipc/stream_to_file.cc b/cpp/src/arrow/ipc/stream_to_file.cc index 3a2a7fb49fe..40288b687cf 100644 --- a/cpp/src/arrow/ipc/stream_to_file.cc +++ b/cpp/src/arrow/ipc/stream_to_file.cc @@ -19,13 +19,12 @@ #include #include +#include "arrow/io/stdio.h" #include "arrow/ipc/reader.h" #include "arrow/ipc/writer.h" #include "arrow/record_batch.h" #include "arrow/status.h" -#include "arrow/util/io_util.h" - namespace arrow { namespace ipc { diff --git a/cpp/src/arrow/util/async_generator.h b/cpp/src/arrow/util/async_generator.h index 8c130c66193..c99bd865e45 100644 --- a/cpp/src/arrow/util/async_generator.h +++ b/cpp/src/arrow/util/async_generator.h @@ -21,11 +21,12 @@ #include #include #include +#include #include -#include #include "arrow/util/functional.h" #include "arrow/util/future.h" +#include "arrow/util/io_util.h" #include "arrow/util/iterator.h" #include "arrow/util/mutex.h" #include "arrow/util/optional.h" @@ -1247,6 +1248,8 @@ class BackgroundGenerator { } protected: + static constexpr uint64_t kUnlikelyThreadId{std::numeric_limits::max()}; + struct State { State(internal::Executor* io_executor, Iterator it, int max_q, int q_restart) : io_executor(io_executor), @@ -1255,9 +1258,7 @@ class BackgroundGenerator { it(std::move(it)), reading(false), finished(false), - should_shutdown(false) { - SetWorkerThreadId({}); // default-initialized thread id - } + should_shutdown(false) {} void ClearQueue() { while (!queue.empty()) { @@ -1316,28 +1317,11 @@ class BackgroundGenerator { return next; } - void SetWorkerThreadId(const std::thread::id tid) { - uint64_t equiv{0}; - // std::thread::id is trivially copyable as per C++ spec, - // so type punning as a uint64_t should work - static_assert(sizeof(std::thread::id) <= sizeof(uint64_t), - "std::thread::id can't fit into uint64_t"); - memcpy(&equiv, reinterpret_cast(&tid), sizeof(tid)); - worker_thread_id.store(equiv); - } - - std::thread::id GetWorkerThreadId() { - const auto equiv = worker_thread_id.load(); - std::thread::id tid; - memcpy(reinterpret_cast(&tid), &equiv, sizeof(tid)); - return tid; - } - internal::Executor* io_executor; const int max_q; const int q_restart; Iterator it; - std::atomic worker_thread_id; + std::atomic worker_thread_id{kUnlikelyThreadId}; // If true, the task is actively pumping items from the queue and does not need a // restart @@ -1346,8 +1330,7 @@ class BackgroundGenerator { bool finished; // Signal to the background task to end early because consumers have given up on it bool should_shutdown; - // If the queue is empty then the consumer will create a waiting future and wait for - // it + // If the queue is empty, the consumer will create a waiting future and wait for it std::queue> queue; util::optional> waiting_future; // Every background task is given a future to complete when it is entirely finished @@ -1365,7 +1348,7 @@ class BackgroundGenerator { /// /// It's a deadlock if we enter cleanup from /// the worker thread but it can happen if the consumer doesn't transfer away - assert(state->GetWorkerThreadId() != std::this_thread::get_id()); + assert(state->worker_thread_id.load() != ::arrow::internal::GetThreadId()); Future<> finish_fut; { auto lock = state->mutex.Lock(); @@ -1384,9 +1367,9 @@ class BackgroundGenerator { }; static void WorkerTask(std::shared_ptr state) { + state->worker_thread_id.store(::arrow::internal::GetThreadId()); // We need to capture the state to read while outside the mutex bool reading = true; - state->SetWorkerThreadId(std::this_thread::get_id()); while (reading) { auto next = state->it.Next(); // Need to capture state->waiting_future inside the mutex to mark finished outside @@ -1438,7 +1421,7 @@ class BackgroundGenerator { // reference it. We can safely transition to idle now. task_finished = state->task_finished; state->task_finished = Future<>(); - state->SetWorkerThreadId({}); // default-initialized thread id + state->worker_thread_id.store(kUnlikelyThreadId); } task_finished.MarkFinished(); } diff --git a/cpp/src/arrow/util/io_util.cc b/cpp/src/arrow/util/io_util.cc index 552417e5a13..f6566ea7e36 100644 --- a/cpp/src/arrow/util/io_util.cc +++ b/cpp/src/arrow/util/io_util.cc @@ -41,6 +41,7 @@ #include #include #include +#include #include #include @@ -99,76 +100,6 @@ namespace arrow { using internal::checked_cast; -namespace io { - -// -// StdoutStream implementation -// - -StdoutStream::StdoutStream() : pos_(0) { set_mode(FileMode::WRITE); } - -Status StdoutStream::Close() { return Status::OK(); } - -bool StdoutStream::closed() const { return false; } - -Result StdoutStream::Tell() const { return pos_; } - -Status StdoutStream::Write(const void* data, int64_t nbytes) { - pos_ += nbytes; - std::cout.write(reinterpret_cast(data), nbytes); - return Status::OK(); -} - -// -// StderrStream implementation -// - -StderrStream::StderrStream() : pos_(0) { set_mode(FileMode::WRITE); } - -Status StderrStream::Close() { return Status::OK(); } - -bool StderrStream::closed() const { return false; } - -Result StderrStream::Tell() const { return pos_; } - -Status StderrStream::Write(const void* data, int64_t nbytes) { - pos_ += nbytes; - std::cerr.write(reinterpret_cast(data), nbytes); - return Status::OK(); -} - -// -// StdinStream implementation -// - -StdinStream::StdinStream() : pos_(0) { set_mode(FileMode::READ); } - -Status StdinStream::Close() { return Status::OK(); } - -bool StdinStream::closed() const { return false; } - -Result StdinStream::Tell() const { return pos_; } - -Result StdinStream::Read(int64_t nbytes, void* out) { - std::cin.read(reinterpret_cast(out), nbytes); - if (std::cin) { - pos_ += nbytes; - return nbytes; - } else { - return 0; - } -} - -Result> StdinStream::Read(int64_t nbytes) { - ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateResizableBuffer(nbytes)); - ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buffer->mutable_data())); - ARROW_RETURN_NOT_OK(buffer->Resize(bytes_read, false)); - buffer->ZeroPadding(); - return std::move(buffer); -} - -} // namespace io - namespace internal { namespace { @@ -1734,5 +1665,21 @@ int64_t GetRandomSeed() { return static_cast(seed_gen()); } +uint64_t GetThreadId() { + uint64_t equiv{0}; + // std::thread::id is trivially copyable as per C++ spec, + // so type punning as a uint64_t should work + static_assert(sizeof(std::thread::id) <= sizeof(uint64_t), + "std::thread::id can't fit into uint64_t"); + const auto tid = std::this_thread::get_id(); + memcpy(&equiv, reinterpret_cast(&tid), sizeof(tid)); + return equiv; +} + +uint64_t GetOptionalThreadId() { + auto tid = GetThreadId(); + return (tid == 0) ? tid - 1 : tid; +} + } // namespace internal } // namespace arrow diff --git a/cpp/src/arrow/util/io_util.h b/cpp/src/arrow/util/io_util.h index 38bcdd4b41f..4255dd37105 100644 --- a/cpp/src/arrow/util/io_util.h +++ b/cpp/src/arrow/util/io_util.h @@ -30,73 +30,12 @@ #include // Needed for struct sigaction #endif -#include "arrow/io/interfaces.h" #include "arrow/status.h" #include "arrow/type_fwd.h" #include "arrow/util/macros.h" #include "arrow/util/windows_fixup.h" namespace arrow { - -class Buffer; - -namespace io { - -// Output stream that just writes to stdout. -class ARROW_EXPORT StdoutStream : public OutputStream { - public: - StdoutStream(); - ~StdoutStream() override {} - - Status Close() override; - bool closed() const override; - - Result Tell() const override; - - Status Write(const void* data, int64_t nbytes) override; - - private: - int64_t pos_; -}; - -// Output stream that just writes to stderr. -class ARROW_EXPORT StderrStream : public OutputStream { - public: - StderrStream(); - ~StderrStream() override {} - - Status Close() override; - bool closed() const override; - - Result Tell() const override; - - Status Write(const void* data, int64_t nbytes) override; - - private: - int64_t pos_; -}; - -// Input stream that just reads from stdin. -class ARROW_EXPORT StdinStream : public InputStream { - public: - StdinStream(); - ~StdinStream() override {} - - Status Close() override; - bool closed() const override; - - Result Tell() const override; - - Result Read(int64_t nbytes, void* out) override; - - Result> Read(int64_t nbytes) override; - - private: - int64_t pos_; -}; - -} // namespace io - namespace internal { // NOTE: 8-bit path strings on Windows are encoded using UTF-8. @@ -399,5 +338,12 @@ Status SendSignalToThread(int signum, uint64_t thread_id); ARROW_EXPORT int64_t GetRandomSeed(); +/// \brief Get the current thread id +/// +/// In addition to having the same properties as std::thread, the returned value +/// is a regular integer value, which is more convenient than an opaque type. +ARROW_EXPORT +uint64_t GetThreadId(); + } // namespace internal } // namespace arrow