diff --git a/be/src/common/status.h b/be/src/common/status.h index b447fb58baeefb..3651576e119583 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -67,7 +67,7 @@ class Status { return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); } static Status MinimumReservationUnavailable(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); + return Status(TStatusCode::MINIMUM_RESERVATION_UNAVAILABLE, msg, precise_code, msg2); } static Status Corruption(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { return Status(TStatusCode::CORRUPTION, msg, precise_code, msg2); @@ -126,6 +126,21 @@ class Status { static Status TooManyTasks(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { return Status(TStatusCode::TOO_MANY_TASKS, msg, precise_code, msg2); } + static Status ServiceUnavailable(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::SERVICE_UNAVAILABLE, msg, precise_code, msg2); + } + static Status Uninitialized(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::UNINITIALIZED, msg, precise_code, msg2); + } + static Status Aborted(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::ABORTED, msg, precise_code, msg2); + } bool ok() const { return _state == nullptr; } @@ -137,6 +152,18 @@ class Status { bool is_already_exist() const { return code() == TStatusCode::ALREADY_EXIST; } bool is_io_error() const {return code() == TStatusCode::IO_ERROR; } + /// @return @c true iff the status indicates Uninitialized. + bool is_uninitialized() const { return code() == TStatusCode::UNINITIALIZED; } + + // @return @c true iff the status indicates an Aborted error. + bool is_aborted() const { return code() == TStatusCode::ABORTED; } + + /// @return @c true iff the status indicates an InvalidArgument error. + bool is_invalid_argument() const { return code() == TStatusCode::INVALID_ARGUMENT; } + + // @return @c true iff the status indicates ServiceUnavailable. + bool is_service_unavailable() const { return code() == TStatusCode::SERVICE_UNAVAILABLE; } + // Convert into TStatus. Call this if 'status_container' contains an optional // TStatus field named 'status'. This also sets __isset.status. template diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index c7f1f5c4ee3c48..ed4e1812d142f0 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -92,6 +92,8 @@ set(UTIL_FILES monotime.cpp mutex.cpp condition_variable.cpp + thread.cpp + threadpool.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/barrier.h b/be/src/util/barrier.h new file mode 100644 index 00000000000000..6ee77f4e0b2462 --- /dev/null +++ b/be/src/util/barrier.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_UTIL_BARRIER_H +#define DORIS_BE_SRC_UTIL_BARRIER_H + +#include "gutil/macros.h" +#include "olap/olap_define.h" +#include "util/condition_variable.h" +#include "util/mutex.h" + +namespace doris { + +// Implementation of pthread-style Barriers. +class Barrier { +public: + // Initialize the barrier with the given initial count. + explicit Barrier(int count) : + _cond(&_mutex), + _count(count), + _initial_count(count) { + DCHECK_GT(count, 0); + } + + ~Barrier() {} + + // wait until all threads have reached the barrier. + // Once all threads have reached the barrier, the barrier is reset + // to the initial count. + void wait() { + MutexLock l(&_mutex); + if (--_count == 0) { + _count = _initial_count; + _cycle_count++; + _cond.notify_all(); + return; + } + + int initial_cycle = _cycle_count; + while (_cycle_count == initial_cycle) { + _cond.wait(); + } + } + +private: + Mutex _mutex; + ConditionVariable _cond; + int _count; + uint32_t _cycle_count = 0; + const int _initial_count; + DISALLOW_COPY_AND_ASSIGN(Barrier); +}; + +#endif //DORIS_BE_SRC_UTIL_BARRIER_H + +} // namespace doris diff --git a/be/src/util/question b/be/src/util/question new file mode 100644 index 00000000000000..24aef982f616c2 --- /dev/null +++ b/be/src/util/question @@ -0,0 +1,3 @@ +Regression race of lock +timeout of condition_variable +deadlock diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp new file mode 100644 index 00000000000000..9bec08a45369fd --- /dev/null +++ b/be/src/util/thread.cpp @@ -0,0 +1,387 @@ +#include "thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "gutil/atomicops.h" +#include "gutil/once.h" +#include "gutil/dynamic_annotations.h" +#include "gutil/strings/substitute.h" +#include "olap/olap_define.h" +#include "util/mutex.h" +#include "util/scoped_cleanup.h" + +namespace doris { + +class ThreadMgr; + +__thread Thread* Thread::_tls = NULL; + +// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. +// // The Thread class adds a reference to thread_manager while it is supervising a thread so +// // that a race between the end of the process's main thread (and therefore the destruction +// // of thread_manager) and the end of a thread that tries to remove itself from the +// // manager after the destruction can be avoided. +static std::shared_ptr thread_manager; +// +// Controls the single (lazy) initialization of thread_manager. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +// A singleton class that tracks all live threads, and groups them together for easy +// auditing. Used only by Thread. +class ThreadMgr { +public: + ThreadMgr() + : _threads_started_metric(0), + _threads_running_metric(0) {} + + ~ThreadMgr() { + MutexLock lock(&_lock); + _thread_categories.clear(); + } + + static void set_thread_name(const std::string& name, int64_t tid); + + // not the system TID, since pthread_t is less prone to being recycled. + void add_thread(const pthread_t& pthread_id, const std::string& name, + const std::string& category, int64_t tid); + + // Removes a thread from the supplied category. If the thread has + // already been removed, this is a no-op. + void remove_thread(const pthread_t& pthread_id, const std::string& category); + +private: + + // Container class for any details we want to capture about a thread + // TODO: Add start-time. + // TODO: Track fragment ID. + class ThreadDescriptor { + public: + ThreadDescriptor() { } + ThreadDescriptor(std::string category, std::string name, int64_t thread_id) + : _name(std::move(name)), + _category(std::move(category)), + _thread_id(thread_id) {} + + const std::string& name() const { return _name; } + const std::string& category() const { return _category; } + int64_t thread_id() const { return _thread_id; } + + private: + std::string _name; + std::string _category; + int64_t _thread_id; + }; + + // A ThreadCategory is a set of threads that are logically related. + // TODO: unordered_map is incompatible with pthread_t, but would be more + // efficient here. + typedef std::map ThreadCategory; + + // All thread categorys, keyed on the category name. + typedef std::map ThreadCategoryMap; + + // Protects _thread_categories and thread metrics. + Mutex _lock; + + // All thread categorys that ever contained a thread, even if empty + ThreadCategoryMap _thread_categories; + + // Counters to track all-time total number of threads, and the + // current number of running threads. + uint64_t _threads_started_metric; + uint64_t _threads_running_metric; + + DISALLOW_COPY_AND_ASSIGN(ThreadMgr); +}; + +void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) { + if (tid == getpid()) { + return ; + } + int err = prctl(PR_SET_NAME, name.c_str()); + if (err < 0 && errno != EPERM) { + LOG(ERROR) << "set_thread_name"; + } +} + +void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, + const std::string& category, int64_t tid) { + // These annotations cause TSAN to ignore the synchronization on lock_ + // without causing the subsequent mutations to be treated as data races + // in and of themselves (that's what IGNORE_READS_AND_WRITES does). + // + // Why do we need them here and in SuperviseThread()? TSAN operates by + // observing synchronization events and using them to establish "happens + // before" relationships between threads. Where these relationships are + // not built, shared state access constitutes a data race. The + // synchronization events here, in RemoveThread(), and in + // SuperviseThread() may cause TSAN to establish a "happens before" + // relationship between thread functors, ignoring potential data races. + // The annotations prevent this from happening. + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(&_lock); + _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid); + _threads_running_metric++; + _threads_started_metric++; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) { + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(&_lock); + auto category_it = _thread_categories.find(category); + DCHECK(category_it != _thread_categories.end()); + category_it->second.erase(pthread_id); + _threads_running_metric--; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +Thread::~Thread() { + if (_joinable) { + int ret = pthread_detach(_thread); + CHECK_EQ(ret, 0); + } +} + +void Thread::join() { + ThreadJoiner(this).join(); +} + +int64_t Thread::tid() const { + int64_t t = base::subtle::Acquire_Load(&_tid); + if (t != PARENT_WAITING_TID) { + return _tid; + } + return wait_for_tid(); +} + +pthread_t Thread::pthread_id() const { + return _thread; +} + +const std::string& Thread::name() const { + return _name; +} + +const std::string& Thread::category() const { + return _category; +} + +std::string Thread::to_string() const { + return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category); +} + +Thread* Thread::current_thread() { + return _tls; +} + +int64_t Thread::unique_thread_id() { + return static_cast(pthread_self()); +} + +int64_t Thread::current_thread_id() { + return syscall(SYS_gettid); +} + +int64_t Thread::wait_for_tid() const { + int loop_count = 0; + while (true) { + int64_t t = Acquire_Load(&_tid); + if (t != PARENT_WAITING_TID) return t; + boost::detail::yield(loop_count++); + } +} + +Status Thread::start_thread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr* holder) { + GoogleOnceInit(&once, &init_threadmgr); + + // Temporary reference for the duration of this function. + scoped_refptr t(new Thread(category, name, functor)); + + // Optional, and only set if the thread was successfully created. + // + // We have to set this before we even start the thread because it's + // allowed for the thread functor to access 'holder'. + if (holder) { + *holder = t; + } + + t->_tid = PARENT_WAITING_TID; + + // Add a reference count to the thread since SuperviseThread() needs to + // access the thread object, and we have no guarantee that our caller + // won't drop the reference as soon as we return. This is dereferenced + // in FinishThread(). + t->AddRef(); + + auto cleanup = MakeScopedCleanup([&]() { + // If we failed to create the thread, we need to undo all of our prep work. + t->_tid = INVALID_TID; + t->Release(); + }); + + int ret = pthread_create(&t->_thread, NULL, &Thread::supervise_thread, t.get()); + if (ret) { + return Status::RuntimeError("Could not create thread", ret, strerror(ret)); + } + + // The thread has been created and is now joinable. + // + // Why set this in the parent and not the child? Because only the parent + // (or someone communicating with the parent) can join, so joinable must + // be set before the parent returns. + t->_joinable = true; + cleanup.cancel(); + + VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name; + return Status::OK(); +} + +void* Thread::supervise_thread(void* arg) { + Thread* t = static_cast(arg); + int64_t system_tid = Thread::current_thread_id(); + PCHECK(system_tid != -1); + + // Take an additional reference to the thread manager, which we'll need below. + ANNOTATE_IGNORE_SYNC_BEGIN(); + std::shared_ptr thread_mgr_ref = thread_manager; + ANNOTATE_IGNORE_SYNC_END(); + + // Set up the TLS. + // + // We could store a scoped_refptr in the TLS itself, but as its + // lifecycle is poorly defined, we'll use a bare pointer. We + // already incremented the reference count in StartThread. + Thread::_tls = t; + + // Publish our tid to '_tid', which unblocks any callers waiting in + // WaitForTid(). + Release_Store(&t->_tid, system_tid); + + std::string name = strings::Substitute("$0-$1", t->name(), system_tid); + thread_manager->set_thread_name(name, t->_tid); + thread_manager->add_thread(pthread_self(), name, t->category(), t->_tid); + + // FinishThread() is guaranteed to run (even if functor_ throws an + // exception) because pthread_cleanup_push() creates a scoped object + // whose destructor invokes the provided callback. + pthread_cleanup_push(&Thread::finish_thread, t); + t->_functor(); + pthread_cleanup_pop(true); + + return NULL; +} + +void Thread::finish_thread(void* arg) { + Thread* t = static_cast(arg); + + // We're here either because of the explicit pthread_cleanup_pop() in + // SuperviseThread() or through pthread_exit(). In either case, + // thread_manager is guaranteed to be live because thread_mgr_ref in + // SuperviseThread() is still live. + thread_manager->remove_thread(pthread_self(), t->category()); + + // Signal any Joiner that we're done. + t->_done.count_down(); + + VLOG(2) << "Ended thread " << t->_tid << " - " << t->category() << ":" << t->name(); + t->Release(); + // NOTE: the above 'Release' call could be the last reference to 'this', + // so 'this' could be destructed at this point. Do not add any code + // following here! +} + +void Thread::init_threadmgr() { + thread_manager.reset(new ThreadMgr()); +} + +ThreadJoiner::ThreadJoiner(Thread* thr) + : _thread(CHECK_NOTNULL(thr)), + _warn_after_ms(kDefaultWarnAfterMs), + _warn_every_ms(kDefaultWarnEveryMs), + _give_up_after_ms(kDefaultGiveUpAfterMs) {} + +ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { + _warn_after_ms = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) { + _warn_every_ms = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { + _give_up_after_ms = ms; + return *this; +} + +Status ThreadJoiner::join() { + if (Thread::current_thread() && + Thread::current_thread()->tid() == _thread->tid()) { + return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name); + } + + // Early exit: double join is a no-op. + if (!_thread->_joinable) { + return Status::OK(); + } + + int waited_ms = 0; + bool keep_trying = true; + while (keep_trying) { + if (waited_ms >= _warn_after_ms) { + LOG(WARNING) << strings::Substitute("Waited for $0ms trying to join with $1 (tid $2)", + waited_ms, _thread->_name, _thread->_tid); + } + + int remaining_before_giveup = std::numeric_limits::max(); + if (_give_up_after_ms != -1) { + remaining_before_giveup = _give_up_after_ms - waited_ms; + } + + int remaining_before_next_warn = _warn_every_ms; + if (waited_ms < _warn_after_ms) { + remaining_before_next_warn = _warn_after_ms - waited_ms; + } + + if (remaining_before_giveup < remaining_before_next_warn) { + keep_trying = false; + } + + int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); + + if (_thread->_done.wait_for(MonoDelta::FromMilliseconds(wait_for))) { + // Unconditionally join before returning, to guarantee that any TLS + // has been destroyed (pthread_key_create() destructors only run + // after a pthread's user method has returned). + int ret = pthread_join(_thread->_thread, NULL); + CHECK_EQ(ret, 0); + _thread->_joinable = false; + return Status::OK(); + } + waited_ms += wait_for; + } + return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", + waited_ms, _thread->_name)); +} + +} // namespace doris diff --git a/be/src/util/thread.h b/be/src/util/thread.h new file mode 100644 index 00000000000000..d9a6cd788b6d36 --- /dev/null +++ b/be/src/util/thread.h @@ -0,0 +1,270 @@ + +#ifndef DORIS_BE_SRC_UTIL_THREAD_H +#define DORIS_BE_SRC_UTIL_THREAD_H + +#include +#include +#include + +#include "common/status.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" + +namespace doris { + +class Thread : public RefCountedThreadSafe { +public: + enum CreateFlags { + NO_FLAGS = 0, + NO_STACK_WATCHDOG = 1 + }; + + template + static Status create_with_flags(const std::string& category, const std::string& name, + const F& f, uint64_t flags, + scoped_refptr* holder) { + return start_thread(category, name, f, flags, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + scoped_refptr* holder) { + return start_thread(category, name, f, NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, + scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + const A6& a6, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder); + } + + ~Thread(); + + // Blocks until this thread finishes execution. Once this method returns, the thread + // will be unregistered with the ThreadMgr and will not appear in the debug UI. + void join(); + + // The thread ID assigned to this thread by the operating system. If the thread + // has not yet started running, returns INVALID_TID. + // + // NOTE: this may block for a short amount of time if the thread has just been + // started. + int64_t tid() const; + + // Returns the thread's pthread ID. + pthread_t pthread_id() const; + + const std::string& name() const; + const std::string& category() const; + std::string to_string() const; + + // The current thread of execution, or NULL if the current thread isn't a kudu::Thread. + // This call is signal-safe. + static Thread* current_thread(); + + // Returns a unique, stable identifier for this thread. Note that this is a static + // method and thus can be used on any thread, including the main thread of the + // process. + // + // In general, this should be used when a value is required that is unique to + // a thread and must work on any thread including the main process thread. + // + // NOTE: this is _not_ the TID, but rather a unique value assigned by the + // thread implementation. So, this value should not be presented to the user + // in log messages, etc. + static int64_t unique_thread_id(); + + // Returns the system thread ID (tid on Linux) for the current thread. Note + // that this is a static method and thus can be used from any thread, + // including the main thread of the process. This is in contrast to + // Thread::tid(), which only works on kudu::Threads. + // + // Thread::tid() will return the same value, but the value is cached in the + // Thread object, so will be faster to call. + // + // Thread::unique_thread_id() (or Thread::tid()) should be preferred for + // performance sensistive code, however it is only guaranteed to return a + // unique and stable thread ID, not necessarily the system thread ID. + static int64_t current_thread_id(); + +private: + friend class ThreadJoiner; + + enum { + INVALID_TID = -1, + PARENT_WAITING_TID = -2, + }; + + // User function to be executed by this thread. + typedef std::function ThreadFunctor; + Thread(const std::string& category, const std::string& name, ThreadFunctor functor) + : _thread(0), + _tid(INVALID_TID), + _functor(std::move(functor)), + _category(std::move(category)), + _name(std::move(name)), + _done(1), + _joinable(false) + {} + + + // Library-specific thread ID. + pthread_t _thread; + + // OS-specific thread ID. Once the constructor finishes start_thread(), + // guaranteed to be set either to a non-negative integer, or to INVALID_TID. + // + // The tid_ member goes through the following states: + // 1. INVALID_TID: the thread has not been started, or has already exited. + // 2. PARENT_WAITING_TID: the parent has started the thread, but the + // thread has not yet begun running. Therefore the TID is not yet known + // but it will be set once the thread starts. + // 3. : the thread is running. + int64_t _tid; + + const ThreadFunctor _functor; + + const std::string _category; + const std::string _name; + + // Joiners wait on this latch to be notified if the thread is done. + // + // Note that Joiners must additionally pthread_join(), otherwise certain + // resources that callers expect to be destroyed (like TLS) may still be + // alive when a Joiner finishes. + CountDownLatch _done; + + bool _joinable; + + // Thread local pointer to the current thread of execution. Will be NULL if the current + // thread is not a Thread. + static __thread Thread* _tls; + + // Wait for the running thread to publish its tid. + int64_t wait_for_tid() const; + + // Starts the thread running supervise_thread(), and returns once that thread has + // initialised and its TID has been read. Waits for notification from the started + // thread that initialisation is complete before returning. On success, stores a + // reference to the thread in holder. + static Status start_thread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr* holder); + + // Wrapper for the user-supplied function. Invoked from the new thread, + // with the Thread as its only argument. Executes _functor, but before + // doing so registers with the global ThreadMgr and reads the thread's + // system ID. After _functor terminates, unregisters with the ThreadMgr. + // Always returns NULL. + // + // supervise_thread() notifies start_thread() when thread initialisation is + // completed via the _tid, which is set to the new thread's system ID. + // By that point in time supervise_thread() has also taken a reference to + // the Thread object, allowing it to safely refer to it even after the + // caller drops its reference. + // + // Additionally, start_thread() notifies supervise_thread() when the actual + // Thread object has been assigned (supervise_thread() is spinning during + // this time). Without this, the new thread may reference the actual + // Thread object before it has been assigned by start_thread(). See + // KUDU-11 for more details. + static void* supervise_thread(void* arg); + + // Invoked when the user-supplied function finishes or in the case of an + // abrupt exit (i.e. pthread_exit()). Cleans up after supervise_thread(). + static void finish_thread(void* arg); + + static void init_threadmgr(); +}; + +// Utility to join on a thread, printing warning messages if it +// takes too long. For example: +// +// ThreadJoiner(&my_thread, "processing thread") +// .warn_after_ms(1000) +// .warn_every_ms(5000) +// .Join(); +// +// TODO: would be nice to offer a way to use ptrace() or signals to +// dump the stack trace of the thread we're trying to join on if it +// gets stuck. But, after looking for 20 minutes or so, it seems +// pretty complicated to get right. +class ThreadJoiner { +public: + explicit ThreadJoiner(Thread* thread); + + // Start emitting warnings after this many milliseconds. + // + // Default: 1000 ms. + ThreadJoiner& warn_after_ms(int ms); + + // After the warnings after started, emit another warning at the + // given interval. + // + // Default: 1000 ms. + ThreadJoiner& warn_every_ms(int ms); + + // If the thread has not stopped after this number of milliseconds, give up + // joining on it and return Status::Aborted. + // + // -1 (the default) means to wait forever trying to join. + ThreadJoiner& give_up_after_ms(int ms); + + // Join the thread, subject to the above parameters. If the thread joining + // fails for any reason, returns RuntimeError. If it times out, returns + // Aborted. + Status join(); + +private: + enum { + kDefaultWarnAfterMs = 1000, + kDefaultWarnEveryMs = 1000, + kDefaultGiveUpAfterMs = -1 // forever + }; + + Thread* _thread; + + int _warn_after_ms; + int _warn_every_ms; + int _give_up_after_ms; + + DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); +}; + + +} //namespace doris + +#endif //DORIS_BE_SRC_UTIL_THREAD_H diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp new file mode 100644 index 00000000000000..cada3c2e7a6099 --- /dev/null +++ b/be/src/util/threadpool.cpp @@ -0,0 +1,620 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/threadpool.h" + +#include +#include +#include + +#include "common/logging.h" +#include "gutil/macros.h" +#include "gutil/map-util.h" +#include "gutil/strings/substitute.h" +#include "gutil/sysinfo.h" +#include "util/scoped_cleanup.h" +#include "util/thread.h" + +namespace doris { + +using std::string; +using strings::Substitute; + +class FunctionRunnable : public Runnable { +public: + explicit FunctionRunnable(boost::function func) + : _func(std::move(func)) {} + + void run() OVERRIDE { + _func(); + } + +private: + boost::function _func; +}; + +ThreadPoolBuilder::ThreadPoolBuilder(string name) + : _name(std::move(name)), + _min_threads(0), + _max_threads(base::NumCPUs()), + _max_queue_size(std::numeric_limits::max()), + _idle_timeout(MonoDelta::FromMilliseconds(500)) {} + +ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { + CHECK_GE(min_threads, 0); + _min_threads = min_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { + CHECK_GT(max_threads, 0); + _max_threads = max_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { + _max_queue_size = max_queue_size; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { + _idle_timeout = idle_timeout; + return *this; +} + +Status ThreadPoolBuilder::build(std::unique_ptr* pool) const { + pool->reset(new ThreadPool(*this)); + RETURN_IF_ERROR((*pool)->init()); + return Status::OK(); +} + +ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, + ThreadPool::ExecutionMode mode) + : _mode(mode), + _pool(pool), + _state(State::IDLE), + _not_running_cond(&pool->_lock), + _active_threads(0) {} + +ThreadPoolToken::~ThreadPoolToken() { + shutdown(); + _pool->release_token(this); +} + +Status ThreadPoolToken::submit(std::shared_ptr r) { + return _pool->do_submit(std::move(r), this); +} + +Status ThreadPoolToken::submit_func(boost::function f) { + return submit(std::make_shared(std::move(f))); +} + +void ThreadPoolToken::shutdown() { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + + // Clear the queue under the lock, but defer the releasing of the tasks + // outside the lock, in case there are concurrent threads wanting to access + // the ThreadPool. The task's destructors may acquire locks, etc, so this + // also prevents lock inversions. + std::deque to_release = std::move(_entries); + _pool->_total_queued_tasks -= to_release.size(); + + switch (state()) { + case State::IDLE: + // There were no tasks outstanding; we can quiesce the token immediately. + transition(State::QUIESCED); + break; + case State::RUNNING: + // There were outstanding tasks. If any are still running, switch to + // QUIESCING and wait for them to finish (the worker thread executing + // the token's last task will switch the token to QUIESCED). Otherwise, + // we can quiesce the token immediately. + + // Note: this is an O(n) operation, but it's expected to be infrequent. + // Plus doing it this way (rather than switching to QUIESCING and waiting + // for a worker thread to process the queue entry) helps retain state + // transition symmetry with ThreadPool::shutdown. + for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) { + if (*it == this) { + it = _pool->_queue.erase(it); + } else { + it++; + } + } + + if (_active_threads == 0) { + transition(State::QUIESCED); + break; + } + transition(State::QUIESCING); + FALLTHROUGH_INTENDED; + case State::QUIESCING: + // The token is already quiescing. Just wait for a worker thread to + // switch it to QUIESCED. + while (state() != State::QUIESCED) { + _not_running_cond.wait(); + } + break; + default: + break; + } +} + +void ThreadPoolToken::wait() { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + while (is_active()) { + _not_running_cond.wait(); + } +} + +bool ThreadPoolToken::wait_until(const MonoTime& until) { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + while (is_active()) { + if (!_not_running_cond.wait_until(until)) { + return false; + } + } + return true; +} + +bool ThreadPoolToken::wait_for(const MonoDelta& delta) { + return wait_until(MonoTime::Now() + delta); +} + +void ThreadPoolToken::transition(State new_state) { +#ifndef NDEBUG + CHECK_NE(_state, new_state); + + switch (_state) { + case State::IDLE: + CHECK(new_state == State::RUNNING || + new_state == State::QUIESCED); + if (new_state == State::RUNNING) { + CHECK(!_entries.empty()); + } else { + CHECK(_entries.empty()); + CHECK_EQ(_active_threads, 0); + } + break; + case State::RUNNING: + CHECK(new_state == State::IDLE || + new_state == State::QUIESCING || + new_state == State::QUIESCED); + CHECK(_entries.empty()); + if (new_state == State::QUIESCING) { + CHECK_GT(_active_threads, 0); + } + break; + case State::QUIESCING: + CHECK(new_state == State::QUIESCED); + CHECK_EQ(_active_threads, 0); + break; + case State::QUIESCED: + CHECK(false); // QUIESCED is a terminal state + break; + default: + LOG(FATAL) << "Unknown token state: " << _state; + } +#endif + + // Take actions based on the state we're entering. + switch (new_state) { + case State::IDLE: + case State::QUIESCED: + _not_running_cond.notify_all(); + break; + default: + break; + } + + _state = new_state; +} + +const char* ThreadPoolToken::state_to_string(State s) { + switch (s) { + case State::IDLE: return "IDLE"; break; + case State::RUNNING: return "RUNNING"; break; + case State::QUIESCING: return "QUIESCING"; break; + case State::QUIESCED: return "QUIESCED"; break; + } + return ""; +} + +ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) + : _name(builder._name), + _min_threads(builder._min_threads), + _max_threads(builder._max_threads), + _max_queue_size(builder._max_queue_size), + _idle_timeout(builder._idle_timeout), + _pool_status(Status::Uninitialized("The pool was not initialized.")), + _idle_cond(&_lock), + _no_threads_cond(&_lock), + _num_threads(0), + _num_threads_pending_start(0), + _active_threads(0), + _total_queued_tasks(0), + _tokenless(new_token(ExecutionMode::CONCURRENT)) +{} + +ThreadPool::~ThreadPool() { + // There should only be one live token: the one used in tokenless submission. + CHECK_EQ(1, _tokens.size()) << Substitute( + "Threadpool $0 destroyed with $1 allocated tokens", + _name, _tokens.size()); + shutdown(); +} + +Status ThreadPool::init() { + if (!_pool_status.is_uninitialized()) { + return Status::NotSupported("The thread pool is already initialized"); + } + _pool_status = Status::OK(); + _num_threads_pending_start = _min_threads; + for (int i = 0; i < _min_threads; i++) { + Status status = create_thread(); + if (!status.ok()) { + shutdown(); + return status; + } + } + return Status::OK(); +} + +void ThreadPool::shutdown() { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + + // Note: this is the same error seen at submission if the pool is at + // capacity, so clients can't tell them apart. This isn't really a practical + // concern though because shutting down a pool typically requires clients to + // be quiesced first, so there's no danger of a client getting confused. + _pool_status = Status::ServiceUnavailable("The pool has been shut down."); + + // Clear the various queues under the lock, but defer the releasing + // of the tasks outside the lock, in case there are concurrent threads + // wanting to access the ThreadPool. The task's destructors may acquire + // locks, etc, so this also prevents lock inversions. + _queue.clear(); + std::deque> to_release; + for (auto* t : _tokens) { + if (!t->_entries.empty()) { + to_release.emplace_back(std::move(t->_entries)); + } + switch (t->state()) { + case ThreadPoolToken::State::IDLE: + // The token is idle; we can quiesce it immediately. + t->transition(ThreadPoolToken::State::QUIESCED); + break; + case ThreadPoolToken::State::RUNNING: + // The token has tasks associated with it. If they're merely queued + // (i.e. there are no active threads), the tasks will have been removed + // above and we can quiesce immediately. Otherwise, we need to wait for + // the threads to finish. + t->transition(t->_active_threads > 0 ? + ThreadPoolToken::State::QUIESCING : + ThreadPoolToken::State::QUIESCED); + break; + default: + break; + } + } + + // The queues are empty. Wake any sleeping worker threads and wait for all + // of them to exit. Some worker threads will exit immediately upon waking, + // while others will exit after they finish executing an outstanding task. + _total_queued_tasks = 0; + while (!_idle_threads.empty()) { + _idle_threads.front().not_empty.notify_one(); + _idle_threads.pop_front(); + } + while (_num_threads + _num_threads_pending_start > 0) { + _no_threads_cond.wait(); + } + + // All the threads have exited. Check the state of each token. + for (auto* t : _tokens) { + DCHECK(t->state() == ThreadPoolToken::State::IDLE || + t->state() == ThreadPoolToken::State::QUIESCED); + } +} + +std::unique_ptr ThreadPool::new_token(ExecutionMode mode) { + MutexLock unique_lock(&_lock); + std::unique_ptr t(new ThreadPoolToken(this,mode)); + InsertOrDie(&_tokens, t.get()); + return t; +} + +void ThreadPool::release_token(ThreadPoolToken* t) { + MutexLock unique_lock(&_lock); + CHECK(!t->is_active()) << Substitute("Token with state $0 may not be released", + ThreadPoolToken::state_to_string(t->state())); + CHECK_EQ(1, _tokens.erase(t)); +} + +Status ThreadPool::submit(std::shared_ptr r) { + return do_submit(std::move(r), _tokenless.get()); +} + +Status ThreadPool::submit_func(boost::function f) { + return submit(std::make_shared(std::move(f))); +} + +Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token) { + DCHECK(token); + MonoTime submit_time = MonoTime::Now(); + + MutexLock unique_lock(&_lock); + if (PREDICT_FALSE(!_pool_status.ok())) { + return _pool_status; + } + + if (PREDICT_FALSE(!token->may_submit_new_tasks())) { + return Status::ServiceUnavailable("Thread pool token was shut down"); + } + + // Size limit check. + int64_t capacity_remaining = static_cast(_max_threads) + - _active_threads + + static_cast(_max_queue_size) + - _total_queued_tasks; + if (capacity_remaining < 1) { + return Status::ServiceUnavailable( + Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", + _num_threads + _num_threads_pending_start, _max_threads, + _total_queued_tasks, _max_queue_size)); + } + + // Should we create another thread? + + // We assume that each current inactive thread will grab one item from the + // queue. If it seems like we'll need another thread, we create one. + // + // Rather than creating the thread here, while holding the lock, we defer + // it to down below. This is because thread creation can be rather slow + // (hundreds of milliseconds in some cases) and we'd like to allow the + // existing threads to continue to process tasks while we do so. + // + // In theory, a currently active thread could finish immediately after this + // calculation but before our new worker starts running. This would mean we + // created a thread we didn't really need. However, this race is unavoidable + // and harmless. + // + // Of course, we never create more than _max_threads threads no matter what. + int threads_from_this_submit = + token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; + int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads; + int additional_threads = static_cast(_queue.size()) + + threads_from_this_submit + - inactive_threads; + bool need_a_thread = false; + if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) { + need_a_thread = true; + _num_threads_pending_start++; + } + + Task task; + task.runnable = std::move(r); + task.submit_time = submit_time; + + // Add the task to the token's queue. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::IDLE || + state == ThreadPoolToken::State::RUNNING); + token->_entries.emplace_back(std::move(task)); + if (state == ThreadPoolToken::State::IDLE || + token->mode() == ExecutionMode::CONCURRENT) { + _queue.emplace_back(token); + if (state == ThreadPoolToken::State::IDLE) { + token->transition(ThreadPoolToken::State::RUNNING); + } + } + _total_queued_tasks++; + + // Wake up an idle thread for this task. Choosing the thread at the front of + // the list ensures LIFO semantics as idling threads are also added to the front. + // + // If there are no idle threads, the new task remains on the queue and is + // processed by an active thread (or a thread we're about to create) at some + // point in the future. + if (!_idle_threads.empty()) { + _idle_threads.front().not_empty.notify_one(); + _idle_threads.pop_front(); + } + unique_lock.unlock(); + + if (need_a_thread) { + Status status = create_thread(); + if (!status.ok()) { + unique_lock.lock(); + _num_threads_pending_start--; + if (_num_threads + _num_threads_pending_start == 0) { + // If we have no threads, we can't do any work. + return status; + } + // If we failed to create a thread, but there are still some other + // worker threads, log a warning message and continue. + LOG(ERROR) << "Thread pool failed to create thread: " + << status.to_string(); + } + } + + return Status::OK(); +} + +void ThreadPool::wait() { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + while (_total_queued_tasks > 0 || _active_threads > 0) { + _idle_cond.wait(); + } +} + +bool ThreadPool::wait_until(const MonoTime& until) { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + while (_total_queued_tasks > 0 || _active_threads > 0) { + if (!_idle_cond.wait_until(until)) { + return false; + } + } + return true; +} + +bool ThreadPool::wait_for(const MonoDelta& delta) { + return wait_until(MonoTime::Now() + delta); +} + +void ThreadPool::dispatch_thread() { + MutexLock unique_lock(&_lock); + InsertOrDie(&_threads, Thread::current_thread()); + DCHECK_GT(_num_threads_pending_start, 0); + _num_threads++; + _num_threads_pending_start--; + // If we are one of the first '_min_threads' to start, we must be + // a "permanent" thread. + bool permanent = _num_threads <= _min_threads; + + // Owned by this worker thread and added/removed from _idle_threads as needed. + IdleThread me(&_lock); + + while (true) { + // Note: Status::Aborted() is used to indicate normal shutdown. + if (!_pool_status.ok()) { + VLOG(2) << "DispatchThread exiting: " << _pool_status.to_string(); + break; + } + + if (_queue.empty()) { + // There's no work to do, let's go idle. + // + // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). + _idle_threads.push_front(me); + SCOPED_CLEANUP({ + // For some wake ups (i.e. shutdown or do_submit) this thread is + // guaranteed to be unlinked after being awakened. In others (i.e. + // spurious wake-up or Wait timeout), it'll still be linked. + if (me.is_linked()) { + _idle_threads.erase(_idle_threads.iterator_to(me)); + } + }); + if (permanent) { + me.not_empty.wait(); + } else { + if (!me.not_empty.wait_for(_idle_timeout)) { + // After much investigation, it appears that pthread condition variables have + // a weird behavior in which they can return ETIMEDOUT from timed_wait even if + // another thread did in fact signal. Apparently after a timeout there is some + // brief period during which another thread may actually grab the internal mutex + // protecting the state, signal, and release again before we get the mutex. So, + // we'll recheck the empty queue case regardless. + if (_queue.empty()) { + VLOG(3) << "Releasing worker thread from pool " << _name << " after " + << _idle_timeout.ToMilliseconds() << "ms of idle time."; + break; + } + } + } + continue; + } + + // Get the next token and task to execute. + ThreadPoolToken* token = _queue.front(); + _queue.pop_front(); + DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); + DCHECK(!token->_entries.empty()); + Task task = std::move(token->_entries.front()); + token->_entries.pop_front(); + token->_active_threads++; + --_total_queued_tasks; + ++_active_threads; + + unique_lock.unlock(); + + // Execute the task + task.runnable->run(); + + // Destruct the task while we do not hold the lock. + // + // The task's destructor may be expensive if it has a lot of bound + // objects, and we don't want to block submission of the threadpool. + // In the worst case, the destructor might even try to do something + // with this threadpool, and produce a deadlock. + task.runnable.reset(); + unique_lock.lock(); + + // Possible states: + // 1. The token was shut down while we ran its task. Transition to QUIESCED. + // 2. The token has no more queued tasks. Transition back to IDLE. + // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::RUNNING || + state == ThreadPoolToken::State::QUIESCING); + if (--token->_active_threads == 0) { + if (state == ThreadPoolToken::State::QUIESCING) { + DCHECK(token->_entries.empty()); + token->transition(ThreadPoolToken::State::QUIESCED); + } else if (token->_entries.empty()) { + token->transition(ThreadPoolToken::State::IDLE); + } else if (token->mode() == ExecutionMode::SERIAL) { + _queue.emplace_back(token); + } + } + if (--_active_threads == 0) { + _idle_cond.notify_all(); + } + } + + // It's important that we hold the lock between exiting the loop and dropping + // _num_threads. Otherwise it's possible someone else could come along here + // and add a new task just as the last running thread is about to exit. + CHECK(unique_lock.own_lock()); + + CHECK_EQ(_threads.erase(Thread::current_thread()), 1); + _num_threads--; + if (_num_threads + _num_threads_pending_start == 0) { + _no_threads_cond.notify_all(); + + // Sanity check: if we're the last thread exiting, the queue ought to be + // empty. Otherwise it will never get processed. + CHECK(_queue.empty()); + DCHECK_EQ(0, _total_queued_tasks); + } +} + +Status ThreadPool::create_thread() { + return Thread::create("thread pool", Substitute("$0 [worker]", _name), + &ThreadPool::dispatch_thread, this, nullptr); +} + +void ThreadPool::check_not_pool_thread_unlocked() { + Thread* current = Thread::current_thread(); + if (ContainsKey(_threads, current)) { + LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " + "name '$1' called pool function that would result in deadlock", + _name, current->name()); + } +} + +std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { + return o << ThreadPoolToken::state_to_string(s); +} + +} // namespace doris diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h new file mode 100644 index 00000000000000..b13f396ca854fe --- /dev/null +++ b/be/src/util/threadpool.h @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_UTIL_THREAD_POOL_H +#define DORIS_BE_SRC_UTIL_THREAD_POOL_H + +#include +#include +#include +#include +#include + +#include "boost/function.hpp" +#include +#include + +#include "common/status.h" +#include "gutil/ref_counted.h" +#include "util/condition_variable.h" +#include "util/mutex.h" +#include "util/monotime.h" + +namespace doris { + +class Thread; +class ThreadPool; +class ThreadPoolToken; + +class Runnable { +public: + virtual void run() = 0; + virtual ~Runnable() {} +}; + +// ThreadPool takes a lot of arguments. We provide sane defaults with a builder. +// +// name: Used for debugging output and default names of the worker threads. +// Since thread names are limited to 16 characters on Linux, it's good to +// choose a short name here. +// Required. +// +// trace_metric_prefix: used to prefix the names of TraceMetric counters. +// When a task on a thread pool has an associated trace, the thread pool +// implementation will increment TraceMetric counters to indicate the +// amount of time spent waiting in the queue as well as the amount of wall +// and CPU time spent executing. By default, these counters are prefixed +// with the name of the thread pool. For example, if the pool is named +// 'apply', then counters such as 'apply.queue_time_us' will be +// incremented. +// +// The TraceMetrics implementation relies on the number of distinct counter +// names being small. Thus, if the thread pool name itself is dynamically +// generated, the default behavior described above would result in an +// unbounded number of distinct counter names. The 'trace_metric_prefix' +// setting can be used to override the prefix used in generating the trace +// metric names. +// +// For example, the Raft thread pools are named "-raft" which +// has unbounded cardinality (a server may have thousands of different +// tablet IDs over its lifetime). In that case, setting the prefix to +// "raft" will avoid any issues. +// +// min_threads: Minimum number of threads we'll have at any time. +// Default: 0. +// +// max_threads: Maximum number of threads we'll have at any time. +// Default: Number of CPUs detected on the system. +// +// max_queue_size: Maximum number of items to enqueue before returning a +// Status::ServiceUnavailable message from Submit(). +// Default: INT_MAX. +// +// idle_timeout: How long we'll keep around an idle thread before timing it out. +// We always keep at least min_threads. +// Default: 500 milliseconds. +// +// metrics: Histograms, counters, etc. to update on various threadpool events. +// Default: not set. +// +class ThreadPoolBuilder { +public: + explicit ThreadPoolBuilder(std::string name); + + // Note: We violate the style guide by returning mutable references here + // in order to provide traditional Builder pattern conveniences. + ThreadPoolBuilder& set_min_threads(int min_threads); + ThreadPoolBuilder& set_max_threads(int max_threads); + ThreadPoolBuilder& set_max_queue_size(int max_queue_size); + ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); + + // Instantiate a new ThreadPool with the existing builder arguments. + Status build(std::unique_ptr* pool) const; + +private: + friend class ThreadPool; + const std::string _name; + int _min_threads; + int _max_threads; + int _max_queue_size; + MonoDelta _idle_timeout; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); +}; + +// Thread pool with a variable number of threads. +// +// Tasks submitted directly to the thread pool enter a FIFO queue and are +// dispatched to a worker thread when one becomes free. Tasks may also be +// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions +// can then be used to block on logical groups of tasks. +// +// A token operates in one of two ExecutionModes, determined at token +// construction time: +// 1. SERIAL: submitted tasks are run one at a time. +// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike +// tasks submitted without a token, but the logical grouping that tokens +// impart can be useful when a pool is shared by many contexts (e.g. to +// safely shut down one context, to derive context-specific metrics, etc.). +// +// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are +// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are +// processed in a round-robin fashion, one task at a time. This prevents them +// from starving one another. However, tokenless (and CONCURRENT token-based) +// tasks can starve SERIAL token-based tasks. +// +// Usage Example: +// static void Func(int n) { ... } +// class Task : public Runnable { ... } +// +// std::unique_ptr thread_pool; +// CHECK_OK( +// ThreadPoolBuilder("my_pool") +// .set_min_threads(0) +// .set_max_threads(5) +// .set_max_queue_size(10) +// .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) +// .Build(&thread_pool)); +// thread_pool->Submit(shared_ptr(new Task())); +// thread_pool->SubmitFunc(boost::bind(&Func, 10)); +class ThreadPool { +public: + ~ThreadPool(); + + // Wait for the running tasks to complete and then shutdown the threads. + // All the other pending tasks in the queue will be removed. + // NOTE: That the user may implement an external abort logic for the + // runnables, that must be called before Shutdown(), if the system + // should know about the non-execution of these tasks, or the runnable + // require an explicit "abort" notification to exit from the run loop. + void shutdown(); + + // Submits a Runnable class. + Status submit(std::shared_ptr r); + + // Submits a function bound using boost::bind(&FuncName, args...). + Status submit_func(boost::function f); + + // Waits until all the tasks are completed. + void wait(); + + // Waits for the pool to reach the idle state, or until 'until' time is reached. + // Returns true if the pool reached the idle state, false otherwise. + bool wait_until(const MonoTime& until); + + // Waits for the pool to reach the idle state, or until 'delta' time elapses. + // Returns true if the pool reached the idle state, false otherwise. + bool wait_for(const MonoDelta& delta); + + // Allocates a new token for use in token-based task submission. All tokens + // must be destroyed before their ThreadPool is destroyed. + // + // There is no limit on the number of tokens that may be allocated. + enum class ExecutionMode { + // Tasks submitted via this token will be executed serially. + SERIAL, + + // Tasks submitted via this token may be executed concurrently. + CONCURRENT, + }; + std::unique_ptr new_token(ExecutionMode mode); + + // Return the number of threads currently running (or in the process of starting up) + // for this thread pool. + int num_threads() const { + MutexLock l(&_lock); + return _num_threads + _num_threads_pending_start; + } + +private: + friend class ThreadPoolBuilder; + friend class ThreadPoolToken; + + // Client-provided task to be executed by this pool. + struct Task { + std::shared_ptr runnable; + + // Time at which the entry was submitted to the pool. + MonoTime submit_time; + }; + + // Creates a new thread pool using a builder. + explicit ThreadPool(const ThreadPoolBuilder& builder); + + // Initializes the thread pool by starting the minimum number of threads. + Status init(); + + // Dispatcher responsible for dequeueing and executing the tasks + void dispatch_thread(); + + // Create new thread. + // + // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call. + // NOTE: For performance reasons, _lock should not be held. + Status create_thread(); + + // Aborts if the current thread is a member of this thread pool. + void check_not_pool_thread_unlocked(); + + // Submits a task to be run via token. + Status do_submit(std::shared_ptr r, ThreadPoolToken* token); + + // Releases token 't' and invalidates it. + void release_token(ThreadPoolToken* t); + + const std::string _name; + const int _min_threads; + const int _max_threads; + const int _max_queue_size; + const MonoDelta _idle_timeout; + + // Overall status of the pool. Set to an error when the pool is shut down. + // + // Protected by '_lock'. + Status _pool_status; + + // Synchronizes many of the members of the pool and all of its + // condition variables. + mutable Mutex _lock; + + // Condition variable for "pool is idling". Waiters wake up when + // _active_threads reaches zero. + ConditionVariable _idle_cond; + + // Condition variable for "pool has no threads". Waiters wake up when + // _num_threads and num_pending_threads_ are both 0. + ConditionVariable _no_threads_cond; + + // Number of threads currently running. + // + // Protected by _lock. + int _num_threads; + + // Number of threads which are in the process of starting. + // When these threads start, they will decrement this counter and + // accordingly increment '_num_threads'. + // + // Protected by _lock. + int _num_threads_pending_start; + + // Number of threads currently running and executing client tasks. + // + // Protected by _lock. + int _active_threads; + + // Total number of client tasks queued, either directly (_queue) or + // indirectly (_tokens). + // + // Protected by _lock. + int _total_queued_tasks; + + // All allocated tokens. + // + // Protected by _lock. + std::unordered_set _tokens; + + // FIFO of tokens from which tasks should be executed. Does not own the + // tokens; they are owned by clients and are removed from the FIFO on shutdown. + // + // Protected by _lock. + std::deque _queue; + + // Pointers to all running threads. Raw pointers are safe because a Thread + // may only go out of scope after being removed from _threads. + // + // Protected by _lock. + std::unordered_set _threads; + + // List of all threads currently waiting for work. + // + // A thread is added to the front of the list when it goes idle and is + // removed from the front and signaled when new work arrives. This produces a + // LIFO usage pattern that is more efficient than idling on a single + // ConditionVariable (which yields FIFO semantics). + // + // Protected by _lock. + struct IdleThread : public boost::intrusive::list_base_hook<> { + explicit IdleThread(Mutex* m) + : not_empty(m) {} + + // Condition variable for "queue is not empty". Waiters wake up when a new + // task is queued. + ConditionVariable not_empty; + + DISALLOW_COPY_AND_ASSIGN(IdleThread); + }; + boost::intrusive::list _idle_threads; // NOLINT(build/include_what_you_use) + + // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. + std::unique_ptr _tokenless; + + DISALLOW_COPY_AND_ASSIGN(ThreadPool); +}; + +// Entry point for token-based task submission and blocking for a particular +// thread pool. Tokens can only be created via ThreadPool::new_token(). +// +// All functions are thread-safe. Mutable members are protected via the +// ThreadPool's lock. +class ThreadPoolToken { +public: + // Destroys the token. + // + // May be called on a token with outstanding tasks, as Shutdown() will be + // called first to take care of them. + ~ThreadPoolToken(); + + // Submits a Runnable class. + Status submit(std::shared_ptr r); + + // Submits a function bound using boost::bind(&FuncName, args...). + Status submit_func(boost::function f); + + // Marks the token as unusable for future submissions. Any queued tasks not + // yet running are destroyed. If tasks are in flight, Shutdown() will wait + // on their completion before returning. + void shutdown(); + + // Waits until all the tasks submitted via this token are completed. + void wait(); + + // Waits for all submissions using this token are complete, or until 'until' + // time is reached. + // + // Returns true if all submissions are complete, false otherwise. + bool wait_until(const MonoTime& until); + + // Waits for all submissions using this token are complete, or until 'delta' + // time elapses. + // + // Returns true if all submissions are complete, false otherwise. + bool wait_for(const MonoDelta& delta); + +private: + // All possible token states. Legal state transitions: + // IDLE -> RUNNING: task is submitted via token + // IDLE -> QUIESCED: token or pool is shut down + // RUNNING -> IDLE: worker thread finishes executing a task and + // there are no more tasks queued to the token + // RUNNING -> QUIESCING: token or pool is shut down while worker thread + // is executing a task + // RUNNING -> QUIESCED: token or pool is shut down + // QUIESCING -> QUIESCED: worker thread finishes executing a task + // belonging to a shut down token or pool + enum class State { + // Token has no queued tasks. + IDLE, + + // A worker thread is running one of the token's previously queued tasks. + RUNNING, + + // No new tasks may be submitted to the token. A worker thread is still + // running a previously queued task. + QUIESCING, + + // No new tasks may be submitted to the token. There are no active tasks + // either. At this state, the token may only be destroyed. + QUIESCED, + }; + + // Writes a textual representation of the token state in 's' to 'o'. + friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); + + friend class ThreadPool; + + // Returns a textual representation of 's' suitable for debugging. + static const char* state_to_string(State s); + + // Constructs a new token. + // + // The token may not outlive its thread pool ('pool'). + ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode); + + // Changes this token's state to 'new_state' taking actions as needed. + void transition(State new_state); + + // Returns true if this token has a task queued and ready to run, or if a + // task belonging to this token is already running. + bool is_active() const { + return _state == State::RUNNING || + _state == State::QUIESCING; + } + + // Returns true if new tasks may be submitted to this token. + bool may_submit_new_tasks() const { + return _state != State::QUIESCING && + _state != State::QUIESCED; + } + + State state() const { return _state; } + ThreadPool::ExecutionMode mode() const { return _mode; } + + // Token's configured execution mode. + const ThreadPool::ExecutionMode _mode; + + // Pointer to the token's thread pool. + ThreadPool* _pool; + + // Token state machine. + State _state; + + // Queued client tasks. + std::deque _entries; + + // Condition variable for "token is idle". Waiters wake up when the token + // transitions to IDLE or QUIESCED. + ConditionVariable _not_running_cond; + + // Number of worker threads currently executing tasks belonging to this + // token. + int _active_threads; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken); +}; + +} // namespace doris + +#endif //DORIS_BE_SRC_UTIL_THREAD_POOL_H diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 85d34f4fb70da3..6aba7715236657 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -59,5 +59,8 @@ ADD_BE_TEST(path_util_test) ADD_BE_TEST(file_cache_test) ADD_BE_TEST(file_manager_test) ADD_BE_TEST(parse_util_test) +ADD_BE_TEST(countdown_latch_test) ADD_BE_TEST(monotime_test) ADD_BE_TEST(scoped_cleanup_test) +ADD_BE_TEST(thread_test) +ADD_BE_TEST(threadpool_test) diff --git a/be/test/util/countdown_latch_test.cpp b/be/test/util/countdown_latch_test.cpp index 0bac03fa4365c2..e0174e61b8e6b6 100644 --- a/be/test/util/countdown_latch_test.cpp +++ b/be/test/util/countdown_latch_test.cpp @@ -18,11 +18,9 @@ #include #include -#include "gutil/std::unique_ptr.h" #include "gutil/ref_counted.h" #include "util/countdown_latch.h" #include "util/monotime.h" -#include "util/test_macros.h" #include "util/thread.h" #include "util/threadpool.h" @@ -41,19 +39,19 @@ static void decrement_latch(CountDownLatch* latch, int amount) { TEST(TestCountDownLatch, TestLatch) { std::unique_ptr pool; - ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).build(&pool)); + ASSERT_TRUE(ThreadPoolBuilder("cdl-test").set_max_threads(1).build(&pool).ok()); CountDownLatch latch(1000); // Decrement the count by 1 in another thread, this should not fire the // latch. - ASSERT_OK(pool->submit_func(std::bind(decrement_latch, &latch, 1))); + ASSERT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1)).ok()); ASSERT_FALSE(latch.wait_for(MonoDelta::FromMilliseconds(200))); ASSERT_EQ(999, latch.count()); // Now decrement by 1000 this should decrement to 0 and fire the latch // (even though 1000 is one more than the current count). - ASSERT_OK(pool->submit_func(std::bind(decrement_latch, &latch, 1000))); + ASSERT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1000)).ok()); latch.wait(); ASSERT_EQ(0, latch.count()); } @@ -63,11 +61,11 @@ TEST(TestCountDownLatch, TestLatch) { TEST(TestCountDownLatch, TestResetToZero) { CountDownLatch cdl(100); scoped_refptr t; - ASSERT_OK(Thread::create("test", "cdl-test", &CountDownLatch::wait, &cdl, &t)); + ASSERT_TRUE(Thread::create("test", "cdl-test", &CountDownLatch::wait, &cdl, &t).ok()); // Sleep for a bit until it's likely the other thread is waiting on the latch. SleepFor(MonoDelta::FromMilliseconds(10)); - cdl.Reset(0); + cdl.reset(0); t->join(); } diff --git a/be/test/util/thread_test.cpp b/be/test/util/thread_test.cpp new file mode 100644 index 00000000000000..3adf513ff94816 --- /dev/null +++ b/be/test/util/thread_test.cpp @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/thread.h" + +#include +#include + +#include +#include +#include + +#include + +#include "common/status.h" +#include "common/logging.h" +#include "gutil/basictypes.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" + +using std::string; + +namespace doris { + +class ThreadTest : public ::testing::Test { +public: + virtual void SetUp() {} + virtual void TearDown() {} +}; + +// Join with a thread and emit warnings while waiting to join. +// This has to be manually verified. +TEST_F(ThreadTest, TestJoinAndWarn) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 1000*1000, &holder); + ASSERT_TRUE(status.ok()); + status = ThreadJoiner(holder.get()) + .warn_after_ms(10) + .warn_every_ms(100) + .join(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(ThreadTest, TestFailedJoin) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 1000*1000, &holder); + ASSERT_TRUE(status.ok()); + status = ThreadJoiner(holder.get()) + .give_up_after_ms(50) + .join(); + ASSERT_TRUE(status.is_aborted()); +} + +static void TryJoinOnSelf() { + Status s = ThreadJoiner(Thread::current_thread()).join(); + // Use CHECK instead of ASSERT because gtest isn't thread-safe. + CHECK(s.is_invalid_argument()); +} + +// Try to join on the thread that is currently running. +TEST_F(ThreadTest, TestJoinOnSelf) { + scoped_refptr holder; + ASSERT_TRUE(Thread::create("test", "test", TryJoinOnSelf, &holder).ok()); + holder->join(); + // Actual assertion is done by the thread spawned above. +} + +TEST_F(ThreadTest, TestDoubleJoinIsNoOp) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 0, &holder); + ASSERT_TRUE(status.ok()); + ThreadJoiner joiner(holder.get()); + status = joiner.join(); + ASSERT_TRUE(status.ok()); + status = joiner.join(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(ThreadTest, ThreadStartBenchmark) { + std::vector> threads(1000); + { + int64_t thread_creation_ns = 0; + SCOPED_RAW_TIMER(&thread_creation_ns); + for (auto& t : threads) { + Status status = Thread::create("test", "TestCallOnExit", usleep, 0, &t); + ASSERT_TRUE(status.ok()); + } + std::cout << "create 1000 threads use:" + << thread_creation_ns << "ns" << std::endl; + } + { + int64_t thread_publish_tid_ns = 0; + SCOPED_RAW_TIMER(&thread_publish_tid_ns); + for (auto& t : threads) { + t->tid(); + } + std::cout << "1000 threads publish TIDS use:" + << thread_publish_tid_ns << "ns" << std::endl; + } + + for (auto& t : threads) { + t->join(); + } +} + +} // namespace doris + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp new file mode 100644 index 00000000000000..b752de3cd2a6c6 --- /dev/null +++ b/be/test/util/threadpool_test.cpp @@ -0,0 +1,796 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include "common/logging.h" +#include "common/status.h" +#include "gutil/atomicops.h" +#include "gutil/port.h" +#include "gutil/ref_counted.h" +#include "gutil/strings/substitute.h" +#include "gutil/sysinfo.h" +#include "gutil/walltime.h" +#include "util/barrier.h" +#include "util/countdown_latch.h" +#include "util/spinlock.h" +#include "util/metrics.h" +#include "util/monotime.h" +#include "util/random.h" +#include "util/scoped_cleanup.h" +#include "util/threadpool.h" + +using std::atomic; +using std::shared_ptr; +using std::string; +using std::thread; +using std::unique_ptr; +using std::vector; + +using strings::Substitute; + +DECLARE_int32(thread_inject_start_latency_ms); + +namespace doris { + +static const char* kDefaultPoolName = "test"; + +class ThreadPoolTest : public ::testing::Test { +public: + virtual void SetUp() override { + ASSERT_TRUE(ThreadPoolBuilder(kDefaultPoolName).build(&_pool).ok()); + } + + Status rebuild_pool_with_builder(const ThreadPoolBuilder& builder) { + return builder.build(&_pool); + } + + Status rebuild_pool_with_min_max(int min_threads, int max_threads) { + return ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_pool); + } + +protected: + unique_ptr _pool; +}; + +TEST_F(ThreadPoolTest, TestNoTaskOpenClose) { + ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok()); + _pool->shutdown(); +} + +static void simple_task_method(int n, std::atomic* counter) { + while (n--) { + (*counter)++; + boost::detail::yield(n); + } +} + +class SimpleTask : public Runnable { +public: + SimpleTask(int n, std::atomic* counter) + : _n(n), _counter(counter) {} + + void run() override { + simple_task_method(_n, _counter); + } + +private: + int _n; + std::atomic* _counter; +}; + +TEST_F(ThreadPoolTest, TestSimpleTasks) { + ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok()); + + std::atomic counter(0); + std::shared_ptr task(new SimpleTask(15, &counter)); + + ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok()); + ASSERT_TRUE(_pool->submit(task).ok()); + ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok()); + ASSERT_TRUE(_pool->submit(task).ok()); + _pool->wait(); + ASSERT_EQ(10 + 15 + 20 + 15, counter.load()); + _pool->shutdown(); +} + +class SlowTask : public Runnable { +public: + explicit SlowTask(CountDownLatch* latch) + : _latch(latch) {} + + void run() override { + _latch->wait(); + } + + static shared_ptr new_slow_task(CountDownLatch* latch) { + return std::make_shared(latch); + } + +private: + CountDownLatch* _latch; +}; + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok()); + + // There are no threads to start with. + ASSERT_TRUE(_pool->num_threads() == 0); + // We get up to 3 threads when submitting work. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.count_down(); + }); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(2, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + // The 4th piece of work gets queued. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + // Finish all work + latch.count_down(); + _pool->wait(); + ASSERT_EQ(0, _pool->_active_threads); + _pool->shutdown(); + ASSERT_EQ(0, _pool->num_threads()); +} + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) { + // By default a threadpool's max_threads is set to the number of CPUs, so + // this test submits more tasks than that to ensure that the number of CPUs + // isn't some kind of upper bound. + const int kNumCPUs = base::NumCPUs(); + + // Build a threadpool with no limit on the maximum number of threads. + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(std::numeric_limits::max())).ok()); + CountDownLatch latch(1); + auto cleanup_latch = MakeScopedCleanup([&]() { + latch.count_down(); + }); + + // submit tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs * 2; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 2), _pool->num_threads()); + + // submit tasks on two tokens. Only two threads should be created. + unique_ptr t1 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + unique_ptr t2 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + for (int i = 0; i < kNumCPUs * 2; i++) { + ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get(); + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 2) + 2, _pool->num_threads()); + + // submit more tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 3) + 2, _pool->num_threads()); + + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +// Regression test for a bug where a task is submitted exactly +// as a thread is about to exit. Previously this could hang forever. +TEST_F(ThreadPoolTest, TestRace) { + alarm(60); + auto cleanup = MakeScopedCleanup([]() { + alarm(0); // Disable alarm on test exit. + }); + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(1) + .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok()); + + for (int i = 0; i < 500; i++) { + CountDownLatch l(1); + ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok()); + l.wait(); + // Sleeping a different amount in each iteration makes it more likely to hit + // the bug. + SleepFor(MonoDelta::FromMicroseconds(i)); + } +} + +TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(4) + .set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok()); + + // There is 1 thread to start with. + ASSERT_EQ(1, _pool->num_threads()); + // We get up to 4 threads when submitting work. + CountDownLatch latch(1); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(1, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(2, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(4, _pool->num_threads()); + // The 5th piece of work gets queued. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(4, _pool->num_threads()); + // Finish all work + latch.count_down(); + _pool->wait(); + ASSERT_EQ(0, _pool->_active_threads); + _pool->shutdown(); + ASSERT_EQ(0, _pool->num_threads()); +} + +TEST_F(ThreadPoolTest, TestMaxQueueSize) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1)).ok()); + + CountDownLatch latch(1); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + Status s = _pool->submit(SlowTask::new_slow_task(&latch)); + CHECK(s.is_service_unavailable()) << "Expected failure due to queue blowout:" + << s.to_string(); + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +// Test that when we specify a zero-sized queue, the maximum number of threads +// running is used for enforcement. +TEST_F(ThreadPoolTest, TestZeroQueueSize) { + const int kMaxThreads = 4; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_queue_size(0) + .set_max_threads(kMaxThreads)).ok()); + + CountDownLatch latch(1); + for (int i = 0; i < kMaxThreads; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + Status s = _pool->submit(SlowTask::new_slow_task(&latch)); + ASSERT_TRUE(s.is_service_unavailable()) << s.to_string(); + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +/* +// Test that a thread pool will crash if asked to run its own blocking +// functions in a pool thread. +// +// In a multi-threaded application, TSAN is unsafe to use following a fork(). +// After a fork(), TSAN will: +// 1. Disable verification, expecting an exec() soon anyway, and +// 2. Die on future thread creation. +// For some reason, this test triggers behavior #2. We could disable it with +// the TSAN option die_after_fork=0, but this can (supposedly) lead to +// deadlocks, so we'll disable the entire test instead. +#ifndef THREAD_SANITIZER +TEST_F(ThreadPoolTest, TestDeadlocks) { + const char* death_msg = "called pool function that would result in deadlock"; + ASSERT_DEATH({ + ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); + ASSERT_TRUE(_pool->submit_func( + Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok()); + _pool->wait(); + }, death_msg); + + ASSERT_DEATH({ + ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); + ASSERT_TRUE(_pool->submit_func( + Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok()); + _pool->wait(); + }, death_msg); +} +#endif +*/ + +class SlowDestructorRunnable : public Runnable { +public: + void run() override {} + + virtual ~SlowDestructorRunnable() { + SleepFor(MonoDelta::FromMilliseconds(100)); + } +}; + +// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks +// in the queue. +TEST_F(ThreadPoolTest, TestSlowDestructor) { + ASSERT_TRUE(rebuild_pool_with_min_max(1, 20).ok()); + MonoTime start = MonoTime::Now(); + for (int i = 0; i < 100; i++) { + shared_ptr task(new SlowDestructorRunnable()); + ASSERT_TRUE(_pool->submit(std::move(task)).ok()); + } + _pool->wait(); + ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5); +} + +// For test cases that should run with both kinds of tokens. +class ThreadPoolTestTokenTypes : public ThreadPoolTest, + public testing::WithParamInterface {}; + +INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes, + ::testing::Values(ThreadPool::ExecutionMode::SERIAL, + ThreadPool::ExecutionMode::CONCURRENT)); + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) { + unique_ptr t = _pool->new_token(GetParam()); + int i = 0; + Status status = t->submit_func([&]() { + SleepFor(MonoDelta::FromMilliseconds(1)); + i++; + }); + ASSERT_TRUE(status.ok()); + t->wait(); + ASSERT_EQ(1, i); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) { + unique_ptr t = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + string result; + for (char c = 'a'; c < 'f'; c++) { + // Sleep a little first so that there's a higher chance of out-of-order + // appends if the submissions did execute in parallel. + int sleep_ms = r.Next() % 5; + Status status= t->submit_func([&result, c, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + result += c; + }); + ASSERT_TRUE(status.ok()); + } + t->wait(); + ASSERT_EQ("abcde", result); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) { + const int kNumTokens = 5; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumTokens)).ok()); + vector> tokens; + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + std::shared_ptr b = std::make_shared(kNumTokens + 1); + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(_pool->new_token(GetParam())); + ASSERT_TRUE(tokens.back()->submit_func([b]() { + b->wait(); + }).ok()); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->wait(); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) { + const int kNumSubmissions = 5; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumSubmissions)).ok()); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + shared_ptr b = std::make_shared(kNumSubmissions + 1); + unique_ptr t = _pool->new_token(ThreadPool::ExecutionMode::CONCURRENT); + for (int i = 0; i < kNumSubmissions; i++) { + ASSERT_TRUE(t->submit_func([b]() { + b->wait(); + }).ok()); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->wait(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(4)).ok()); + + unique_ptr t1(_pool->new_token(GetParam())); + unique_ptr t2(_pool->new_token(GetParam())); + CountDownLatch l1(1); + CountDownLatch l2(1); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(t1->submit_func([&]() { + l1.wait(); + }).ok()); + } + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(t2->submit_func([&]() { + l2.wait(); + }).ok()); + } + + // Unblock all of t1's tasks, but not t2's tasks. + l1.count_down(); + + // If this also waited for t2's tasks, it would deadlock. + t1->shutdown(); + + // We can no longer submit to t1 but we can still submit to t2. + ASSERT_TRUE(t1->submit_func([](){}).is_service_unavailable()); + ASSERT_TRUE(t2->submit_func([](){}).ok()); + + // Unblock t2's tasks. + l2.count_down(); + t2->shutdown(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) { + const int kNumTokens = 3; + const int kNumSubmissions = 20; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + vector> tokens; + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(_pool->new_token(GetParam())); + } + + atomic v(0); + for (int i = 0; i < kNumSubmissions; i++) { + // Sleep a little first to raise the likelihood of the test thread + // reaching wait() before the submissions finish. + int sleep_ms = r.Next() % 5; + + auto task = [&v, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + v++; + }; + + // Half of the submissions will be token-less, and half will use a token. + if (i % 2 == 0) { + ASSERT_TRUE(_pool->submit_func(task).ok()); + } else { + int token_idx = r.Next() % tokens.size(); + ASSERT_TRUE(tokens[token_idx]->submit_func(task).ok()); + } + } + _pool->wait(); + ASSERT_EQ(kNumSubmissions, v); +} + +TEST_F(ThreadPoolTest, TestFuzz) { + const int kNumOperations = 1000; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + vector> tokens; + + for (int i = 0; i < kNumOperations; i++) { + // Operation distribution: + // + // - submit without a token: 40% + // - submit with a randomly selected token: 35% + // - Allocate a new token: 10% + // - Wait on a randomly selected token: 7% + // - shutdown a randomly selected token: 4% + // - Deallocate a randomly selected token: 2% + // - Wait for all submissions: 2% + int op = r.Next() % 100; + if (op < 40) { + // submit without a token. + int sleep_ms = r.Next() % 5; + ASSERT_TRUE(_pool->submit_func([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }).ok()); + } else if (op < 75) { + // submit with a randomly selected token. + if (tokens.empty()) { + continue; + } + int sleep_ms = r.Next() % 5; + int token_idx = r.Next() % tokens.size(); + Status s = tokens[token_idx]->submit_func([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + ASSERT_TRUE(s.ok() || s.is_service_unavailable()); + } else if (op < 85) { + // Allocate a token with a randomly selected policy. + ThreadPool::ExecutionMode mode = r.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens.emplace_back(_pool->new_token(mode)); + } else if (op < 92) { + // Wait on a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->wait(); + } else if (op < 96) { + // shutdown a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->shutdown(); + } else if (op < 98) { + // Deallocate a randomly selected token. + if (tokens.empty()) { + continue; + } + auto it = tokens.begin(); + int token_idx = r.Next() % tokens.size(); + std::advance(it, token_idx); + tokens.erase(it); + } else { + // Wait on everything. + ASSERT_LT(op, 100); + ASSERT_GE(op, 98); + _pool->wait(); + } + } + + // Some test runs will shut down the pool before the tokens, and some won't. + // Either way should be safe. + if (r.Next() % 2 == 0) { + _pool->shutdown(); + } +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1)).ok()); + + CountDownLatch latch(1); + unique_ptr t = _pool->new_token(GetParam()); + SCOPED_CLEANUP({ + latch.count_down(); + }); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + Status s = t->submit(SlowTask::new_slow_task(&latch)); + ASSERT_TRUE(s.is_service_unavailable()); +} + +TEST_F(ThreadPoolTest, TestTokenConcurrency) { + const int kNumTokens = 20; + const int kTestRuntimeSecs = 1; + const int kCycleThreads = 2; + const int kShutdownThreads = 2; + const int kWaitThreads = 2; + const int kSubmitThreads = 8; + + vector> tokens; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random rng(seed); + + // Protects 'tokens' and 'rng'. + SpinLock lock; + + // Fetch a token from 'tokens' at random. + auto GetRandomToken = [&]() -> shared_ptr { + std::lock_guard l(lock); + int idx = rng.Uniform(kNumTokens); + return tokens[idx]; + }; + + // Preallocate all of the tokens. + for (int i = 0; i < kNumTokens; i++) { + ThreadPool::ExecutionMode mode; + { + std::lock_guard l(lock); + mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + } + tokens.emplace_back(_pool->new_token(mode).release()); + } + + atomic total_num_tokens_cycled(0); + atomic total_num_tokens_shutdown(0); + atomic total_num_tokens_waited(0); + atomic total_num_tokens_submitted(0); + + CountDownLatch latch(1); + vector threads; + + for (int i = 0; i < kCycleThreads; i++) { + // Pick a token at random and replace it. + // + // The replaced token is only destroyed when the last ref is dropped, + // possibly by another thread. + threads.emplace_back([&]() { + int num_tokens_cycled = 0; + while (latch.count()) { + { + std::lock_guard l(lock); + int idx = rng.Uniform(kNumTokens); + ThreadPool::ExecutionMode mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens[idx] = shared_ptr(_pool->new_token(mode).release()); + } + num_tokens_cycled++; + + // Sleep a bit, otherwise this thread outpaces the other threads and + // nothing interesting happens to most tokens. + SleepFor(MonoDelta::FromMicroseconds(10)); + } + total_num_tokens_cycled += num_tokens_cycled; + }); + } + + for (int i = 0; i < kShutdownThreads; i++) { + // Pick a token at random and shut it down. Submitting a task to a shut + // down token will return a ServiceUnavailable error. + threads.emplace_back([&]() { + int num_tokens_shutdown = 0; + while (latch.count()) { + GetRandomToken()->shutdown(); + num_tokens_shutdown++; + } + total_num_tokens_shutdown += num_tokens_shutdown; + }); + } + + for (int i = 0; i < kWaitThreads; i++) { + // Pick a token at random and wait for any outstanding tasks. + threads.emplace_back([&]() { + int num_tokens_waited = 0; + while (latch.count()) { + GetRandomToken()->wait(); + num_tokens_waited++; + } + total_num_tokens_waited += num_tokens_waited; + }); + } + + for (int i = 0; i < kSubmitThreads; i++) { + // Pick a token at random and submit a task to it. + threads.emplace_back([&]() { + int num_tokens_submitted = 0; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random rng(seed); + while (latch.count()) { + int sleep_ms = rng.Next() % 5; + Status s = GetRandomToken()->submit_func([sleep_ms]() { + // Sleep a little first so that tasks are running during other events. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + CHECK(s.ok() || s.is_service_unavailable()); + num_tokens_submitted++; + } + total_num_tokens_submitted += num_tokens_submitted; + }); + } + + SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs)); + latch.count_down(); + for (auto& t : threads) { + t.join(); + } + + LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1", + kCycleThreads, total_num_tokens_cycled.load()); + LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1", + kShutdownThreads, total_num_tokens_shutdown.load()); + LOG(INFO) << Substitute("Tokens waited ($0 threads): $1", + kWaitThreads, total_num_tokens_waited.load()); + LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1", + kSubmitThreads, total_num_tokens_submitted.load()); +} + +/* +TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) { + const int kNumThreads = 10; + + // Test with a pool that allows for kNumThreads concurrent threads. + ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumThreads)).ok()); + + // Submit kNumThreads slow tasks and unblock them, in order to produce + // kNumThreads worker threads. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.CountDown(); + }); + for (int i = 0; i < kNumThreads; i++) { + ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ(kNumThreads, _pool->num_threads()); + latch.count_down(); + pool_->wait(); + + // The kNumThreads threads are idle and waiting for the idle timeout. + + // Submit a slow trickle of lightning fast tasks. + // + // If the threads are woken up in FIFO order, this trickle is enough to + // prevent all of them from idling and the AssertEventually will time out. + // + // If LIFO order is used, the same thread will be reused for each task and + // the other threads will eventually time out. + AssertEventually([&]() { + ASSERT_OK(_pool->submit_func([](){}).ok()); + SleepFor(MonoDelta::FromMilliseconds(10)); + ASSERT_EQ(1, _pool->num_threads()); + }, MonoDelta::FromSeconds(10), AssertBackoff::NONE); + NO_PENDING_FATALS(); +} +*/ + +} // namespace doris + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 26f4719e8fed58..5d67c5b760dddb 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -170,8 +170,11 @@ ${DORIS_TEST_BINARY_DIR}/util/path_util_test ${DORIS_TEST_BINARY_DIR}/util/file_cache_test ${DORIS_TEST_BINARY_DIR}/util/file_manager_test ${DORIS_TEST_BINARY_DIR}/util/parse_util_test +${DORIS_TEST_BINARY_DIR}/util/countdown_latch_test ${DORIS_TEST_BINARY_DIR}/util/monotime_test ${DORIS_TEST_BINARY_DIR}/util/scoped_cleanup_test +${DORIS_TEST_BINARY_DIR}/util/thread_test +${DORIS_TEST_BINARY_DIR}/util/threadpool_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test