From 169a575d0ab9d245f3e9cffcd5f8b60fc931b8c3 Mon Sep 17 00:00:00 2001 From: chaoyli Date: Sun, 16 Feb 2020 21:17:05 +0800 Subject: [PATCH] [util] Import ThreadPool and Thread from KUDU Thread pool design point: All tasks submitted directly to the thread pool enter a FIFO queue and are dispatched to a worker thread when one becomes free. Tasks may also be submitted via ThreadPoolTokens. The token wait() and shutdown() functions can then be used to block on logical groups of tasks. A token operates in one of two ExecutionModes, determined at token construction time: 1. SERIAL: submitted tasks are run one at a time. 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike submitted without a token, but the logical grouping that tokens impart can be useful when a pool is shared by many contexts (e.g. to safely shut down one context, to derive context-specific metrics, etc.). Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are processed in a round-robin fashion, one task at a time. This prevents them from starving one another. However, tokenless (and CONCURRENT token-based) tasks can starve SERIAL token-based tasks. Thread design point: 1. It is a thin wrapper around pthread that can register itself with the singleton ThreadMgr (a private class implemented in thread.cpp entirely, which tracks all live threads so that they may be monitored via the debug webpages). This class has a limited subset of boost::thread's API. Construction is almost the same, but clients must supply a category and a name for each thread so that they can be identified in the debug web UI. Otherwise, join() is the only supported method from boost::thread. 2. Each Thread object knows its operating system thread ID (TID), which can be used to attach debuggers to specific threads, to retrieve resource-usage statistics from the operating system, and to assign threads to resource control groups. 3. Threads are shared objects, but in a degenerate way. They may only have up to two referents: the caller that created the thread (parent), and the thread itself (child). Moreover, the only two methods to mutate state (join() and the destructor) are constrained: the child may not join() on itself, and the destructor is only run when there's one referent left. These constraints allow us to access thread internals without any locks. --- be/src/common/status.h | 29 +- be/src/util/CMakeLists.txt | 2 + be/src/util/barrier.h | 70 +++ be/src/util/question | 3 + be/src/util/thread.cpp | 387 +++++++++++++ be/src/util/thread.h | 270 +++++++++ be/src/util/threadpool.cpp | 620 ++++++++++++++++++++ be/src/util/threadpool.h | 451 +++++++++++++++ be/test/util/CMakeLists.txt | 3 + be/test/util/countdown_latch_test.cpp | 12 +- be/test/util/thread_test.cpp | 126 ++++ be/test/util/threadpool_test.cpp | 796 ++++++++++++++++++++++++++ run-ut.sh | 3 + 13 files changed, 2764 insertions(+), 8 deletions(-) create mode 100644 be/src/util/barrier.h create mode 100644 be/src/util/question create mode 100644 be/src/util/thread.cpp create mode 100644 be/src/util/thread.h create mode 100644 be/src/util/threadpool.cpp create mode 100644 be/src/util/threadpool.h create mode 100644 be/test/util/thread_test.cpp create mode 100644 be/test/util/threadpool_test.cpp diff --git a/be/src/common/status.h b/be/src/common/status.h index b447fb58baeefb..3651576e119583 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -67,7 +67,7 @@ class Status { return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); } static Status MinimumReservationUnavailable(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { - return Status(TStatusCode::INVALID_ARGUMENT, msg, precise_code, msg2); + return Status(TStatusCode::MINIMUM_RESERVATION_UNAVAILABLE, msg, precise_code, msg2); } static Status Corruption(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { return Status(TStatusCode::CORRUPTION, msg, precise_code, msg2); @@ -126,6 +126,21 @@ class Status { static Status TooManyTasks(const Slice& msg, int16_t precise_code = 1, const Slice& msg2 = Slice()) { return Status(TStatusCode::TOO_MANY_TASKS, msg, precise_code, msg2); } + static Status ServiceUnavailable(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::SERVICE_UNAVAILABLE, msg, precise_code, msg2); + } + static Status Uninitialized(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::UNINITIALIZED, msg, precise_code, msg2); + } + static Status Aborted(const Slice& msg, + int16_t precise_code = -1, + const Slice& msg2 = Slice()) { + return Status(TStatusCode::ABORTED, msg, precise_code, msg2); + } bool ok() const { return _state == nullptr; } @@ -137,6 +152,18 @@ class Status { bool is_already_exist() const { return code() == TStatusCode::ALREADY_EXIST; } bool is_io_error() const {return code() == TStatusCode::IO_ERROR; } + /// @return @c true iff the status indicates Uninitialized. + bool is_uninitialized() const { return code() == TStatusCode::UNINITIALIZED; } + + // @return @c true iff the status indicates an Aborted error. + bool is_aborted() const { return code() == TStatusCode::ABORTED; } + + /// @return @c true iff the status indicates an InvalidArgument error. + bool is_invalid_argument() const { return code() == TStatusCode::INVALID_ARGUMENT; } + + // @return @c true iff the status indicates ServiceUnavailable. + bool is_service_unavailable() const { return code() == TStatusCode::SERVICE_UNAVAILABLE; } + // Convert into TStatus. Call this if 'status_container' contains an optional // TStatus field named 'status'. This also sets __isset.status. template diff --git a/be/src/util/CMakeLists.txt b/be/src/util/CMakeLists.txt index c7f1f5c4ee3c48..ed4e1812d142f0 100644 --- a/be/src/util/CMakeLists.txt +++ b/be/src/util/CMakeLists.txt @@ -92,6 +92,8 @@ set(UTIL_FILES monotime.cpp mutex.cpp condition_variable.cpp + thread.cpp + threadpool.cpp ) if (WITH_MYSQL) diff --git a/be/src/util/barrier.h b/be/src/util/barrier.h new file mode 100644 index 00000000000000..6ee77f4e0b2462 --- /dev/null +++ b/be/src/util/barrier.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_UTIL_BARRIER_H +#define DORIS_BE_SRC_UTIL_BARRIER_H + +#include "gutil/macros.h" +#include "olap/olap_define.h" +#include "util/condition_variable.h" +#include "util/mutex.h" + +namespace doris { + +// Implementation of pthread-style Barriers. +class Barrier { +public: + // Initialize the barrier with the given initial count. + explicit Barrier(int count) : + _cond(&_mutex), + _count(count), + _initial_count(count) { + DCHECK_GT(count, 0); + } + + ~Barrier() {} + + // wait until all threads have reached the barrier. + // Once all threads have reached the barrier, the barrier is reset + // to the initial count. + void wait() { + MutexLock l(&_mutex); + if (--_count == 0) { + _count = _initial_count; + _cycle_count++; + _cond.notify_all(); + return; + } + + int initial_cycle = _cycle_count; + while (_cycle_count == initial_cycle) { + _cond.wait(); + } + } + +private: + Mutex _mutex; + ConditionVariable _cond; + int _count; + uint32_t _cycle_count = 0; + const int _initial_count; + DISALLOW_COPY_AND_ASSIGN(Barrier); +}; + +#endif //DORIS_BE_SRC_UTIL_BARRIER_H + +} // namespace doris diff --git a/be/src/util/question b/be/src/util/question new file mode 100644 index 00000000000000..24aef982f616c2 --- /dev/null +++ b/be/src/util/question @@ -0,0 +1,3 @@ +Regression race of lock +timeout of condition_variable +deadlock diff --git a/be/src/util/thread.cpp b/be/src/util/thread.cpp new file mode 100644 index 00000000000000..9bec08a45369fd --- /dev/null +++ b/be/src/util/thread.cpp @@ -0,0 +1,387 @@ +#include "thread.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "gutil/atomicops.h" +#include "gutil/once.h" +#include "gutil/dynamic_annotations.h" +#include "gutil/strings/substitute.h" +#include "olap/olap_define.h" +#include "util/mutex.h" +#include "util/scoped_cleanup.h" + +namespace doris { + +class ThreadMgr; + +__thread Thread* Thread::_tls = NULL; + +// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread. +// // The Thread class adds a reference to thread_manager while it is supervising a thread so +// // that a race between the end of the process's main thread (and therefore the destruction +// // of thread_manager) and the end of a thread that tries to remove itself from the +// // manager after the destruction can be avoided. +static std::shared_ptr thread_manager; +// +// Controls the single (lazy) initialization of thread_manager. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +// A singleton class that tracks all live threads, and groups them together for easy +// auditing. Used only by Thread. +class ThreadMgr { +public: + ThreadMgr() + : _threads_started_metric(0), + _threads_running_metric(0) {} + + ~ThreadMgr() { + MutexLock lock(&_lock); + _thread_categories.clear(); + } + + static void set_thread_name(const std::string& name, int64_t tid); + + // not the system TID, since pthread_t is less prone to being recycled. + void add_thread(const pthread_t& pthread_id, const std::string& name, + const std::string& category, int64_t tid); + + // Removes a thread from the supplied category. If the thread has + // already been removed, this is a no-op. + void remove_thread(const pthread_t& pthread_id, const std::string& category); + +private: + + // Container class for any details we want to capture about a thread + // TODO: Add start-time. + // TODO: Track fragment ID. + class ThreadDescriptor { + public: + ThreadDescriptor() { } + ThreadDescriptor(std::string category, std::string name, int64_t thread_id) + : _name(std::move(name)), + _category(std::move(category)), + _thread_id(thread_id) {} + + const std::string& name() const { return _name; } + const std::string& category() const { return _category; } + int64_t thread_id() const { return _thread_id; } + + private: + std::string _name; + std::string _category; + int64_t _thread_id; + }; + + // A ThreadCategory is a set of threads that are logically related. + // TODO: unordered_map is incompatible with pthread_t, but would be more + // efficient here. + typedef std::map ThreadCategory; + + // All thread categorys, keyed on the category name. + typedef std::map ThreadCategoryMap; + + // Protects _thread_categories and thread metrics. + Mutex _lock; + + // All thread categorys that ever contained a thread, even if empty + ThreadCategoryMap _thread_categories; + + // Counters to track all-time total number of threads, and the + // current number of running threads. + uint64_t _threads_started_metric; + uint64_t _threads_running_metric; + + DISALLOW_COPY_AND_ASSIGN(ThreadMgr); +}; + +void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) { + if (tid == getpid()) { + return ; + } + int err = prctl(PR_SET_NAME, name.c_str()); + if (err < 0 && errno != EPERM) { + LOG(ERROR) << "set_thread_name"; + } +} + +void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name, + const std::string& category, int64_t tid) { + // These annotations cause TSAN to ignore the synchronization on lock_ + // without causing the subsequent mutations to be treated as data races + // in and of themselves (that's what IGNORE_READS_AND_WRITES does). + // + // Why do we need them here and in SuperviseThread()? TSAN operates by + // observing synchronization events and using them to establish "happens + // before" relationships between threads. Where these relationships are + // not built, shared state access constitutes a data race. The + // synchronization events here, in RemoveThread(), and in + // SuperviseThread() may cause TSAN to establish a "happens before" + // relationship between thread functors, ignoring potential data races. + // The annotations prevent this from happening. + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(&_lock); + _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid); + _threads_running_metric++; + _threads_started_metric++; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) { + ANNOTATE_IGNORE_SYNC_BEGIN(); + ANNOTATE_IGNORE_READS_AND_WRITES_BEGIN(); + { + MutexLock l(&_lock); + auto category_it = _thread_categories.find(category); + DCHECK(category_it != _thread_categories.end()); + category_it->second.erase(pthread_id); + _threads_running_metric--; + } + ANNOTATE_IGNORE_SYNC_END(); + ANNOTATE_IGNORE_READS_AND_WRITES_END(); +} + +Thread::~Thread() { + if (_joinable) { + int ret = pthread_detach(_thread); + CHECK_EQ(ret, 0); + } +} + +void Thread::join() { + ThreadJoiner(this).join(); +} + +int64_t Thread::tid() const { + int64_t t = base::subtle::Acquire_Load(&_tid); + if (t != PARENT_WAITING_TID) { + return _tid; + } + return wait_for_tid(); +} + +pthread_t Thread::pthread_id() const { + return _thread; +} + +const std::string& Thread::name() const { + return _name; +} + +const std::string& Thread::category() const { + return _category; +} + +std::string Thread::to_string() const { + return strings::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category); +} + +Thread* Thread::current_thread() { + return _tls; +} + +int64_t Thread::unique_thread_id() { + return static_cast(pthread_self()); +} + +int64_t Thread::current_thread_id() { + return syscall(SYS_gettid); +} + +int64_t Thread::wait_for_tid() const { + int loop_count = 0; + while (true) { + int64_t t = Acquire_Load(&_tid); + if (t != PARENT_WAITING_TID) return t; + boost::detail::yield(loop_count++); + } +} + +Status Thread::start_thread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr* holder) { + GoogleOnceInit(&once, &init_threadmgr); + + // Temporary reference for the duration of this function. + scoped_refptr t(new Thread(category, name, functor)); + + // Optional, and only set if the thread was successfully created. + // + // We have to set this before we even start the thread because it's + // allowed for the thread functor to access 'holder'. + if (holder) { + *holder = t; + } + + t->_tid = PARENT_WAITING_TID; + + // Add a reference count to the thread since SuperviseThread() needs to + // access the thread object, and we have no guarantee that our caller + // won't drop the reference as soon as we return. This is dereferenced + // in FinishThread(). + t->AddRef(); + + auto cleanup = MakeScopedCleanup([&]() { + // If we failed to create the thread, we need to undo all of our prep work. + t->_tid = INVALID_TID; + t->Release(); + }); + + int ret = pthread_create(&t->_thread, NULL, &Thread::supervise_thread, t.get()); + if (ret) { + return Status::RuntimeError("Could not create thread", ret, strerror(ret)); + } + + // The thread has been created and is now joinable. + // + // Why set this in the parent and not the child? Because only the parent + // (or someone communicating with the parent) can join, so joinable must + // be set before the parent returns. + t->_joinable = true; + cleanup.cancel(); + + VLOG(3) << "Started thread " << t->tid()<< " - " << category << ":" << name; + return Status::OK(); +} + +void* Thread::supervise_thread(void* arg) { + Thread* t = static_cast(arg); + int64_t system_tid = Thread::current_thread_id(); + PCHECK(system_tid != -1); + + // Take an additional reference to the thread manager, which we'll need below. + ANNOTATE_IGNORE_SYNC_BEGIN(); + std::shared_ptr thread_mgr_ref = thread_manager; + ANNOTATE_IGNORE_SYNC_END(); + + // Set up the TLS. + // + // We could store a scoped_refptr in the TLS itself, but as its + // lifecycle is poorly defined, we'll use a bare pointer. We + // already incremented the reference count in StartThread. + Thread::_tls = t; + + // Publish our tid to '_tid', which unblocks any callers waiting in + // WaitForTid(). + Release_Store(&t->_tid, system_tid); + + std::string name = strings::Substitute("$0-$1", t->name(), system_tid); + thread_manager->set_thread_name(name, t->_tid); + thread_manager->add_thread(pthread_self(), name, t->category(), t->_tid); + + // FinishThread() is guaranteed to run (even if functor_ throws an + // exception) because pthread_cleanup_push() creates a scoped object + // whose destructor invokes the provided callback. + pthread_cleanup_push(&Thread::finish_thread, t); + t->_functor(); + pthread_cleanup_pop(true); + + return NULL; +} + +void Thread::finish_thread(void* arg) { + Thread* t = static_cast(arg); + + // We're here either because of the explicit pthread_cleanup_pop() in + // SuperviseThread() or through pthread_exit(). In either case, + // thread_manager is guaranteed to be live because thread_mgr_ref in + // SuperviseThread() is still live. + thread_manager->remove_thread(pthread_self(), t->category()); + + // Signal any Joiner that we're done. + t->_done.count_down(); + + VLOG(2) << "Ended thread " << t->_tid << " - " << t->category() << ":" << t->name(); + t->Release(); + // NOTE: the above 'Release' call could be the last reference to 'this', + // so 'this' could be destructed at this point. Do not add any code + // following here! +} + +void Thread::init_threadmgr() { + thread_manager.reset(new ThreadMgr()); +} + +ThreadJoiner::ThreadJoiner(Thread* thr) + : _thread(CHECK_NOTNULL(thr)), + _warn_after_ms(kDefaultWarnAfterMs), + _warn_every_ms(kDefaultWarnEveryMs), + _give_up_after_ms(kDefaultGiveUpAfterMs) {} + +ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) { + _warn_after_ms = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) { + _warn_every_ms = ms; + return *this; +} + +ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) { + _give_up_after_ms = ms; + return *this; +} + +Status ThreadJoiner::join() { + if (Thread::current_thread() && + Thread::current_thread()->tid() == _thread->tid()) { + return Status::InvalidArgument("Can't join on own thread", -1, _thread->_name); + } + + // Early exit: double join is a no-op. + if (!_thread->_joinable) { + return Status::OK(); + } + + int waited_ms = 0; + bool keep_trying = true; + while (keep_trying) { + if (waited_ms >= _warn_after_ms) { + LOG(WARNING) << strings::Substitute("Waited for $0ms trying to join with $1 (tid $2)", + waited_ms, _thread->_name, _thread->_tid); + } + + int remaining_before_giveup = std::numeric_limits::max(); + if (_give_up_after_ms != -1) { + remaining_before_giveup = _give_up_after_ms - waited_ms; + } + + int remaining_before_next_warn = _warn_every_ms; + if (waited_ms < _warn_after_ms) { + remaining_before_next_warn = _warn_after_ms - waited_ms; + } + + if (remaining_before_giveup < remaining_before_next_warn) { + keep_trying = false; + } + + int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn); + + if (_thread->_done.wait_for(MonoDelta::FromMilliseconds(wait_for))) { + // Unconditionally join before returning, to guarantee that any TLS + // has been destroyed (pthread_key_create() destructors only run + // after a pthread's user method has returned). + int ret = pthread_join(_thread->_thread, NULL); + CHECK_EQ(ret, 0); + _thread->_joinable = false; + return Status::OK(); + } + waited_ms += wait_for; + } + return Status::Aborted(strings::Substitute("Timed out after $0ms joining on $1", + waited_ms, _thread->_name)); +} + +} // namespace doris diff --git a/be/src/util/thread.h b/be/src/util/thread.h new file mode 100644 index 00000000000000..d9a6cd788b6d36 --- /dev/null +++ b/be/src/util/thread.h @@ -0,0 +1,270 @@ + +#ifndef DORIS_BE_SRC_UTIL_THREAD_H +#define DORIS_BE_SRC_UTIL_THREAD_H + +#include +#include +#include + +#include "common/status.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" + +namespace doris { + +class Thread : public RefCountedThreadSafe { +public: + enum CreateFlags { + NO_FLAGS = 0, + NO_STACK_WATCHDOG = 1 + }; + + template + static Status create_with_flags(const std::string& category, const std::string& name, + const F& f, uint64_t flags, + scoped_refptr* holder) { + return start_thread(category, name, f, flags, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + scoped_refptr* holder) { + return start_thread(category, name, f, NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, + scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5), NO_FLAGS, holder); + } + + template + static Status create(const std::string& category, const std::string& name, const F& f, + const A1& a1, const A2& a2, const A3& a3, const A4& a4, const A5& a5, + const A6& a6, scoped_refptr* holder) { + return start_thread(category, name, std::bind(f, a1, a2, a3, a4, a5, a6), NO_FLAGS, holder); + } + + ~Thread(); + + // Blocks until this thread finishes execution. Once this method returns, the thread + // will be unregistered with the ThreadMgr and will not appear in the debug UI. + void join(); + + // The thread ID assigned to this thread by the operating system. If the thread + // has not yet started running, returns INVALID_TID. + // + // NOTE: this may block for a short amount of time if the thread has just been + // started. + int64_t tid() const; + + // Returns the thread's pthread ID. + pthread_t pthread_id() const; + + const std::string& name() const; + const std::string& category() const; + std::string to_string() const; + + // The current thread of execution, or NULL if the current thread isn't a kudu::Thread. + // This call is signal-safe. + static Thread* current_thread(); + + // Returns a unique, stable identifier for this thread. Note that this is a static + // method and thus can be used on any thread, including the main thread of the + // process. + // + // In general, this should be used when a value is required that is unique to + // a thread and must work on any thread including the main process thread. + // + // NOTE: this is _not_ the TID, but rather a unique value assigned by the + // thread implementation. So, this value should not be presented to the user + // in log messages, etc. + static int64_t unique_thread_id(); + + // Returns the system thread ID (tid on Linux) for the current thread. Note + // that this is a static method and thus can be used from any thread, + // including the main thread of the process. This is in contrast to + // Thread::tid(), which only works on kudu::Threads. + // + // Thread::tid() will return the same value, but the value is cached in the + // Thread object, so will be faster to call. + // + // Thread::unique_thread_id() (or Thread::tid()) should be preferred for + // performance sensistive code, however it is only guaranteed to return a + // unique and stable thread ID, not necessarily the system thread ID. + static int64_t current_thread_id(); + +private: + friend class ThreadJoiner; + + enum { + INVALID_TID = -1, + PARENT_WAITING_TID = -2, + }; + + // User function to be executed by this thread. + typedef std::function ThreadFunctor; + Thread(const std::string& category, const std::string& name, ThreadFunctor functor) + : _thread(0), + _tid(INVALID_TID), + _functor(std::move(functor)), + _category(std::move(category)), + _name(std::move(name)), + _done(1), + _joinable(false) + {} + + + // Library-specific thread ID. + pthread_t _thread; + + // OS-specific thread ID. Once the constructor finishes start_thread(), + // guaranteed to be set either to a non-negative integer, or to INVALID_TID. + // + // The tid_ member goes through the following states: + // 1. INVALID_TID: the thread has not been started, or has already exited. + // 2. PARENT_WAITING_TID: the parent has started the thread, but the + // thread has not yet begun running. Therefore the TID is not yet known + // but it will be set once the thread starts. + // 3. : the thread is running. + int64_t _tid; + + const ThreadFunctor _functor; + + const std::string _category; + const std::string _name; + + // Joiners wait on this latch to be notified if the thread is done. + // + // Note that Joiners must additionally pthread_join(), otherwise certain + // resources that callers expect to be destroyed (like TLS) may still be + // alive when a Joiner finishes. + CountDownLatch _done; + + bool _joinable; + + // Thread local pointer to the current thread of execution. Will be NULL if the current + // thread is not a Thread. + static __thread Thread* _tls; + + // Wait for the running thread to publish its tid. + int64_t wait_for_tid() const; + + // Starts the thread running supervise_thread(), and returns once that thread has + // initialised and its TID has been read. Waits for notification from the started + // thread that initialisation is complete before returning. On success, stores a + // reference to the thread in holder. + static Status start_thread(const std::string& category, const std::string& name, + const ThreadFunctor& functor, uint64_t flags, + scoped_refptr* holder); + + // Wrapper for the user-supplied function. Invoked from the new thread, + // with the Thread as its only argument. Executes _functor, but before + // doing so registers with the global ThreadMgr and reads the thread's + // system ID. After _functor terminates, unregisters with the ThreadMgr. + // Always returns NULL. + // + // supervise_thread() notifies start_thread() when thread initialisation is + // completed via the _tid, which is set to the new thread's system ID. + // By that point in time supervise_thread() has also taken a reference to + // the Thread object, allowing it to safely refer to it even after the + // caller drops its reference. + // + // Additionally, start_thread() notifies supervise_thread() when the actual + // Thread object has been assigned (supervise_thread() is spinning during + // this time). Without this, the new thread may reference the actual + // Thread object before it has been assigned by start_thread(). See + // KUDU-11 for more details. + static void* supervise_thread(void* arg); + + // Invoked when the user-supplied function finishes or in the case of an + // abrupt exit (i.e. pthread_exit()). Cleans up after supervise_thread(). + static void finish_thread(void* arg); + + static void init_threadmgr(); +}; + +// Utility to join on a thread, printing warning messages if it +// takes too long. For example: +// +// ThreadJoiner(&my_thread, "processing thread") +// .warn_after_ms(1000) +// .warn_every_ms(5000) +// .Join(); +// +// TODO: would be nice to offer a way to use ptrace() or signals to +// dump the stack trace of the thread we're trying to join on if it +// gets stuck. But, after looking for 20 minutes or so, it seems +// pretty complicated to get right. +class ThreadJoiner { +public: + explicit ThreadJoiner(Thread* thread); + + // Start emitting warnings after this many milliseconds. + // + // Default: 1000 ms. + ThreadJoiner& warn_after_ms(int ms); + + // After the warnings after started, emit another warning at the + // given interval. + // + // Default: 1000 ms. + ThreadJoiner& warn_every_ms(int ms); + + // If the thread has not stopped after this number of milliseconds, give up + // joining on it and return Status::Aborted. + // + // -1 (the default) means to wait forever trying to join. + ThreadJoiner& give_up_after_ms(int ms); + + // Join the thread, subject to the above parameters. If the thread joining + // fails for any reason, returns RuntimeError. If it times out, returns + // Aborted. + Status join(); + +private: + enum { + kDefaultWarnAfterMs = 1000, + kDefaultWarnEveryMs = 1000, + kDefaultGiveUpAfterMs = -1 // forever + }; + + Thread* _thread; + + int _warn_after_ms; + int _warn_every_ms; + int _give_up_after_ms; + + DISALLOW_COPY_AND_ASSIGN(ThreadJoiner); +}; + + +} //namespace doris + +#endif //DORIS_BE_SRC_UTIL_THREAD_H diff --git a/be/src/util/threadpool.cpp b/be/src/util/threadpool.cpp new file mode 100644 index 00000000000000..cada3c2e7a6099 --- /dev/null +++ b/be/src/util/threadpool.cpp @@ -0,0 +1,620 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/threadpool.h" + +#include +#include +#include + +#include "common/logging.h" +#include "gutil/macros.h" +#include "gutil/map-util.h" +#include "gutil/strings/substitute.h" +#include "gutil/sysinfo.h" +#include "util/scoped_cleanup.h" +#include "util/thread.h" + +namespace doris { + +using std::string; +using strings::Substitute; + +class FunctionRunnable : public Runnable { +public: + explicit FunctionRunnable(boost::function func) + : _func(std::move(func)) {} + + void run() OVERRIDE { + _func(); + } + +private: + boost::function _func; +}; + +ThreadPoolBuilder::ThreadPoolBuilder(string name) + : _name(std::move(name)), + _min_threads(0), + _max_threads(base::NumCPUs()), + _max_queue_size(std::numeric_limits::max()), + _idle_timeout(MonoDelta::FromMilliseconds(500)) {} + +ThreadPoolBuilder& ThreadPoolBuilder::set_min_threads(int min_threads) { + CHECK_GE(min_threads, 0); + _min_threads = min_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_threads(int max_threads) { + CHECK_GT(max_threads, 0); + _max_threads = max_threads; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_max_queue_size(int max_queue_size) { + _max_queue_size = max_queue_size; + return *this; +} + +ThreadPoolBuilder& ThreadPoolBuilder::set_idle_timeout(const MonoDelta& idle_timeout) { + _idle_timeout = idle_timeout; + return *this; +} + +Status ThreadPoolBuilder::build(std::unique_ptr* pool) const { + pool->reset(new ThreadPool(*this)); + RETURN_IF_ERROR((*pool)->init()); + return Status::OK(); +} + +ThreadPoolToken::ThreadPoolToken(ThreadPool* pool, + ThreadPool::ExecutionMode mode) + : _mode(mode), + _pool(pool), + _state(State::IDLE), + _not_running_cond(&pool->_lock), + _active_threads(0) {} + +ThreadPoolToken::~ThreadPoolToken() { + shutdown(); + _pool->release_token(this); +} + +Status ThreadPoolToken::submit(std::shared_ptr r) { + return _pool->do_submit(std::move(r), this); +} + +Status ThreadPoolToken::submit_func(boost::function f) { + return submit(std::make_shared(std::move(f))); +} + +void ThreadPoolToken::shutdown() { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + + // Clear the queue under the lock, but defer the releasing of the tasks + // outside the lock, in case there are concurrent threads wanting to access + // the ThreadPool. The task's destructors may acquire locks, etc, so this + // also prevents lock inversions. + std::deque to_release = std::move(_entries); + _pool->_total_queued_tasks -= to_release.size(); + + switch (state()) { + case State::IDLE: + // There were no tasks outstanding; we can quiesce the token immediately. + transition(State::QUIESCED); + break; + case State::RUNNING: + // There were outstanding tasks. If any are still running, switch to + // QUIESCING and wait for them to finish (the worker thread executing + // the token's last task will switch the token to QUIESCED). Otherwise, + // we can quiesce the token immediately. + + // Note: this is an O(n) operation, but it's expected to be infrequent. + // Plus doing it this way (rather than switching to QUIESCING and waiting + // for a worker thread to process the queue entry) helps retain state + // transition symmetry with ThreadPool::shutdown. + for (auto it = _pool->_queue.begin(); it != _pool->_queue.end();) { + if (*it == this) { + it = _pool->_queue.erase(it); + } else { + it++; + } + } + + if (_active_threads == 0) { + transition(State::QUIESCED); + break; + } + transition(State::QUIESCING); + FALLTHROUGH_INTENDED; + case State::QUIESCING: + // The token is already quiescing. Just wait for a worker thread to + // switch it to QUIESCED. + while (state() != State::QUIESCED) { + _not_running_cond.wait(); + } + break; + default: + break; + } +} + +void ThreadPoolToken::wait() { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + while (is_active()) { + _not_running_cond.wait(); + } +} + +bool ThreadPoolToken::wait_until(const MonoTime& until) { + MutexLock unique_lock(&(_pool->_lock)); + _pool->check_not_pool_thread_unlocked(); + while (is_active()) { + if (!_not_running_cond.wait_until(until)) { + return false; + } + } + return true; +} + +bool ThreadPoolToken::wait_for(const MonoDelta& delta) { + return wait_until(MonoTime::Now() + delta); +} + +void ThreadPoolToken::transition(State new_state) { +#ifndef NDEBUG + CHECK_NE(_state, new_state); + + switch (_state) { + case State::IDLE: + CHECK(new_state == State::RUNNING || + new_state == State::QUIESCED); + if (new_state == State::RUNNING) { + CHECK(!_entries.empty()); + } else { + CHECK(_entries.empty()); + CHECK_EQ(_active_threads, 0); + } + break; + case State::RUNNING: + CHECK(new_state == State::IDLE || + new_state == State::QUIESCING || + new_state == State::QUIESCED); + CHECK(_entries.empty()); + if (new_state == State::QUIESCING) { + CHECK_GT(_active_threads, 0); + } + break; + case State::QUIESCING: + CHECK(new_state == State::QUIESCED); + CHECK_EQ(_active_threads, 0); + break; + case State::QUIESCED: + CHECK(false); // QUIESCED is a terminal state + break; + default: + LOG(FATAL) << "Unknown token state: " << _state; + } +#endif + + // Take actions based on the state we're entering. + switch (new_state) { + case State::IDLE: + case State::QUIESCED: + _not_running_cond.notify_all(); + break; + default: + break; + } + + _state = new_state; +} + +const char* ThreadPoolToken::state_to_string(State s) { + switch (s) { + case State::IDLE: return "IDLE"; break; + case State::RUNNING: return "RUNNING"; break; + case State::QUIESCING: return "QUIESCING"; break; + case State::QUIESCED: return "QUIESCED"; break; + } + return ""; +} + +ThreadPool::ThreadPool(const ThreadPoolBuilder& builder) + : _name(builder._name), + _min_threads(builder._min_threads), + _max_threads(builder._max_threads), + _max_queue_size(builder._max_queue_size), + _idle_timeout(builder._idle_timeout), + _pool_status(Status::Uninitialized("The pool was not initialized.")), + _idle_cond(&_lock), + _no_threads_cond(&_lock), + _num_threads(0), + _num_threads_pending_start(0), + _active_threads(0), + _total_queued_tasks(0), + _tokenless(new_token(ExecutionMode::CONCURRENT)) +{} + +ThreadPool::~ThreadPool() { + // There should only be one live token: the one used in tokenless submission. + CHECK_EQ(1, _tokens.size()) << Substitute( + "Threadpool $0 destroyed with $1 allocated tokens", + _name, _tokens.size()); + shutdown(); +} + +Status ThreadPool::init() { + if (!_pool_status.is_uninitialized()) { + return Status::NotSupported("The thread pool is already initialized"); + } + _pool_status = Status::OK(); + _num_threads_pending_start = _min_threads; + for (int i = 0; i < _min_threads; i++) { + Status status = create_thread(); + if (!status.ok()) { + shutdown(); + return status; + } + } + return Status::OK(); +} + +void ThreadPool::shutdown() { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + + // Note: this is the same error seen at submission if the pool is at + // capacity, so clients can't tell them apart. This isn't really a practical + // concern though because shutting down a pool typically requires clients to + // be quiesced first, so there's no danger of a client getting confused. + _pool_status = Status::ServiceUnavailable("The pool has been shut down."); + + // Clear the various queues under the lock, but defer the releasing + // of the tasks outside the lock, in case there are concurrent threads + // wanting to access the ThreadPool. The task's destructors may acquire + // locks, etc, so this also prevents lock inversions. + _queue.clear(); + std::deque> to_release; + for (auto* t : _tokens) { + if (!t->_entries.empty()) { + to_release.emplace_back(std::move(t->_entries)); + } + switch (t->state()) { + case ThreadPoolToken::State::IDLE: + // The token is idle; we can quiesce it immediately. + t->transition(ThreadPoolToken::State::QUIESCED); + break; + case ThreadPoolToken::State::RUNNING: + // The token has tasks associated with it. If they're merely queued + // (i.e. there are no active threads), the tasks will have been removed + // above and we can quiesce immediately. Otherwise, we need to wait for + // the threads to finish. + t->transition(t->_active_threads > 0 ? + ThreadPoolToken::State::QUIESCING : + ThreadPoolToken::State::QUIESCED); + break; + default: + break; + } + } + + // The queues are empty. Wake any sleeping worker threads and wait for all + // of them to exit. Some worker threads will exit immediately upon waking, + // while others will exit after they finish executing an outstanding task. + _total_queued_tasks = 0; + while (!_idle_threads.empty()) { + _idle_threads.front().not_empty.notify_one(); + _idle_threads.pop_front(); + } + while (_num_threads + _num_threads_pending_start > 0) { + _no_threads_cond.wait(); + } + + // All the threads have exited. Check the state of each token. + for (auto* t : _tokens) { + DCHECK(t->state() == ThreadPoolToken::State::IDLE || + t->state() == ThreadPoolToken::State::QUIESCED); + } +} + +std::unique_ptr ThreadPool::new_token(ExecutionMode mode) { + MutexLock unique_lock(&_lock); + std::unique_ptr t(new ThreadPoolToken(this,mode)); + InsertOrDie(&_tokens, t.get()); + return t; +} + +void ThreadPool::release_token(ThreadPoolToken* t) { + MutexLock unique_lock(&_lock); + CHECK(!t->is_active()) << Substitute("Token with state $0 may not be released", + ThreadPoolToken::state_to_string(t->state())); + CHECK_EQ(1, _tokens.erase(t)); +} + +Status ThreadPool::submit(std::shared_ptr r) { + return do_submit(std::move(r), _tokenless.get()); +} + +Status ThreadPool::submit_func(boost::function f) { + return submit(std::make_shared(std::move(f))); +} + +Status ThreadPool::do_submit(std::shared_ptr r, ThreadPoolToken* token) { + DCHECK(token); + MonoTime submit_time = MonoTime::Now(); + + MutexLock unique_lock(&_lock); + if (PREDICT_FALSE(!_pool_status.ok())) { + return _pool_status; + } + + if (PREDICT_FALSE(!token->may_submit_new_tasks())) { + return Status::ServiceUnavailable("Thread pool token was shut down"); + } + + // Size limit check. + int64_t capacity_remaining = static_cast(_max_threads) + - _active_threads + + static_cast(_max_queue_size) + - _total_queued_tasks; + if (capacity_remaining < 1) { + return Status::ServiceUnavailable( + Substitute("Thread pool is at capacity ($0/$1 tasks running, $2/$3 tasks queued)", + _num_threads + _num_threads_pending_start, _max_threads, + _total_queued_tasks, _max_queue_size)); + } + + // Should we create another thread? + + // We assume that each current inactive thread will grab one item from the + // queue. If it seems like we'll need another thread, we create one. + // + // Rather than creating the thread here, while holding the lock, we defer + // it to down below. This is because thread creation can be rather slow + // (hundreds of milliseconds in some cases) and we'd like to allow the + // existing threads to continue to process tasks while we do so. + // + // In theory, a currently active thread could finish immediately after this + // calculation but before our new worker starts running. This would mean we + // created a thread we didn't really need. However, this race is unavoidable + // and harmless. + // + // Of course, we never create more than _max_threads threads no matter what. + int threads_from_this_submit = + token->is_active() && token->mode() == ExecutionMode::SERIAL ? 0 : 1; + int inactive_threads = _num_threads + _num_threads_pending_start - _active_threads; + int additional_threads = static_cast(_queue.size()) + + threads_from_this_submit + - inactive_threads; + bool need_a_thread = false; + if (additional_threads > 0 && _num_threads + _num_threads_pending_start < _max_threads) { + need_a_thread = true; + _num_threads_pending_start++; + } + + Task task; + task.runnable = std::move(r); + task.submit_time = submit_time; + + // Add the task to the token's queue. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::IDLE || + state == ThreadPoolToken::State::RUNNING); + token->_entries.emplace_back(std::move(task)); + if (state == ThreadPoolToken::State::IDLE || + token->mode() == ExecutionMode::CONCURRENT) { + _queue.emplace_back(token); + if (state == ThreadPoolToken::State::IDLE) { + token->transition(ThreadPoolToken::State::RUNNING); + } + } + _total_queued_tasks++; + + // Wake up an idle thread for this task. Choosing the thread at the front of + // the list ensures LIFO semantics as idling threads are also added to the front. + // + // If there are no idle threads, the new task remains on the queue and is + // processed by an active thread (or a thread we're about to create) at some + // point in the future. + if (!_idle_threads.empty()) { + _idle_threads.front().not_empty.notify_one(); + _idle_threads.pop_front(); + } + unique_lock.unlock(); + + if (need_a_thread) { + Status status = create_thread(); + if (!status.ok()) { + unique_lock.lock(); + _num_threads_pending_start--; + if (_num_threads + _num_threads_pending_start == 0) { + // If we have no threads, we can't do any work. + return status; + } + // If we failed to create a thread, but there are still some other + // worker threads, log a warning message and continue. + LOG(ERROR) << "Thread pool failed to create thread: " + << status.to_string(); + } + } + + return Status::OK(); +} + +void ThreadPool::wait() { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + while (_total_queued_tasks > 0 || _active_threads > 0) { + _idle_cond.wait(); + } +} + +bool ThreadPool::wait_until(const MonoTime& until) { + MutexLock unique_lock(&_lock); + check_not_pool_thread_unlocked(); + while (_total_queued_tasks > 0 || _active_threads > 0) { + if (!_idle_cond.wait_until(until)) { + return false; + } + } + return true; +} + +bool ThreadPool::wait_for(const MonoDelta& delta) { + return wait_until(MonoTime::Now() + delta); +} + +void ThreadPool::dispatch_thread() { + MutexLock unique_lock(&_lock); + InsertOrDie(&_threads, Thread::current_thread()); + DCHECK_GT(_num_threads_pending_start, 0); + _num_threads++; + _num_threads_pending_start--; + // If we are one of the first '_min_threads' to start, we must be + // a "permanent" thread. + bool permanent = _num_threads <= _min_threads; + + // Owned by this worker thread and added/removed from _idle_threads as needed. + IdleThread me(&_lock); + + while (true) { + // Note: Status::Aborted() is used to indicate normal shutdown. + if (!_pool_status.ok()) { + VLOG(2) << "DispatchThread exiting: " << _pool_status.to_string(); + break; + } + + if (_queue.empty()) { + // There's no work to do, let's go idle. + // + // Note: if FIFO behavior is desired, it's as simple as changing this to push_back(). + _idle_threads.push_front(me); + SCOPED_CLEANUP({ + // For some wake ups (i.e. shutdown or do_submit) this thread is + // guaranteed to be unlinked after being awakened. In others (i.e. + // spurious wake-up or Wait timeout), it'll still be linked. + if (me.is_linked()) { + _idle_threads.erase(_idle_threads.iterator_to(me)); + } + }); + if (permanent) { + me.not_empty.wait(); + } else { + if (!me.not_empty.wait_for(_idle_timeout)) { + // After much investigation, it appears that pthread condition variables have + // a weird behavior in which they can return ETIMEDOUT from timed_wait even if + // another thread did in fact signal. Apparently after a timeout there is some + // brief period during which another thread may actually grab the internal mutex + // protecting the state, signal, and release again before we get the mutex. So, + // we'll recheck the empty queue case regardless. + if (_queue.empty()) { + VLOG(3) << "Releasing worker thread from pool " << _name << " after " + << _idle_timeout.ToMilliseconds() << "ms of idle time."; + break; + } + } + } + continue; + } + + // Get the next token and task to execute. + ThreadPoolToken* token = _queue.front(); + _queue.pop_front(); + DCHECK_EQ(ThreadPoolToken::State::RUNNING, token->state()); + DCHECK(!token->_entries.empty()); + Task task = std::move(token->_entries.front()); + token->_entries.pop_front(); + token->_active_threads++; + --_total_queued_tasks; + ++_active_threads; + + unique_lock.unlock(); + + // Execute the task + task.runnable->run(); + + // Destruct the task while we do not hold the lock. + // + // The task's destructor may be expensive if it has a lot of bound + // objects, and we don't want to block submission of the threadpool. + // In the worst case, the destructor might even try to do something + // with this threadpool, and produce a deadlock. + task.runnable.reset(); + unique_lock.lock(); + + // Possible states: + // 1. The token was shut down while we ran its task. Transition to QUIESCED. + // 2. The token has no more queued tasks. Transition back to IDLE. + // 3. The token has more tasks. Requeue it and transition back to RUNNABLE. + ThreadPoolToken::State state = token->state(); + DCHECK(state == ThreadPoolToken::State::RUNNING || + state == ThreadPoolToken::State::QUIESCING); + if (--token->_active_threads == 0) { + if (state == ThreadPoolToken::State::QUIESCING) { + DCHECK(token->_entries.empty()); + token->transition(ThreadPoolToken::State::QUIESCED); + } else if (token->_entries.empty()) { + token->transition(ThreadPoolToken::State::IDLE); + } else if (token->mode() == ExecutionMode::SERIAL) { + _queue.emplace_back(token); + } + } + if (--_active_threads == 0) { + _idle_cond.notify_all(); + } + } + + // It's important that we hold the lock between exiting the loop and dropping + // _num_threads. Otherwise it's possible someone else could come along here + // and add a new task just as the last running thread is about to exit. + CHECK(unique_lock.own_lock()); + + CHECK_EQ(_threads.erase(Thread::current_thread()), 1); + _num_threads--; + if (_num_threads + _num_threads_pending_start == 0) { + _no_threads_cond.notify_all(); + + // Sanity check: if we're the last thread exiting, the queue ought to be + // empty. Otherwise it will never get processed. + CHECK(_queue.empty()); + DCHECK_EQ(0, _total_queued_tasks); + } +} + +Status ThreadPool::create_thread() { + return Thread::create("thread pool", Substitute("$0 [worker]", _name), + &ThreadPool::dispatch_thread, this, nullptr); +} + +void ThreadPool::check_not_pool_thread_unlocked() { + Thread* current = Thread::current_thread(); + if (ContainsKey(_threads, current)) { + LOG(FATAL) << Substitute("Thread belonging to thread pool '$0' with " + "name '$1' called pool function that would result in deadlock", + _name, current->name()); + } +} + +std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s) { + return o << ThreadPoolToken::state_to_string(s); +} + +} // namespace doris diff --git a/be/src/util/threadpool.h b/be/src/util/threadpool.h new file mode 100644 index 00000000000000..b13f396ca854fe --- /dev/null +++ b/be/src/util/threadpool.h @@ -0,0 +1,451 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef DORIS_BE_SRC_UTIL_THREAD_POOL_H +#define DORIS_BE_SRC_UTIL_THREAD_POOL_H + +#include +#include +#include +#include +#include + +#include "boost/function.hpp" +#include +#include + +#include "common/status.h" +#include "gutil/ref_counted.h" +#include "util/condition_variable.h" +#include "util/mutex.h" +#include "util/monotime.h" + +namespace doris { + +class Thread; +class ThreadPool; +class ThreadPoolToken; + +class Runnable { +public: + virtual void run() = 0; + virtual ~Runnable() {} +}; + +// ThreadPool takes a lot of arguments. We provide sane defaults with a builder. +// +// name: Used for debugging output and default names of the worker threads. +// Since thread names are limited to 16 characters on Linux, it's good to +// choose a short name here. +// Required. +// +// trace_metric_prefix: used to prefix the names of TraceMetric counters. +// When a task on a thread pool has an associated trace, the thread pool +// implementation will increment TraceMetric counters to indicate the +// amount of time spent waiting in the queue as well as the amount of wall +// and CPU time spent executing. By default, these counters are prefixed +// with the name of the thread pool. For example, if the pool is named +// 'apply', then counters such as 'apply.queue_time_us' will be +// incremented. +// +// The TraceMetrics implementation relies on the number of distinct counter +// names being small. Thus, if the thread pool name itself is dynamically +// generated, the default behavior described above would result in an +// unbounded number of distinct counter names. The 'trace_metric_prefix' +// setting can be used to override the prefix used in generating the trace +// metric names. +// +// For example, the Raft thread pools are named "-raft" which +// has unbounded cardinality (a server may have thousands of different +// tablet IDs over its lifetime). In that case, setting the prefix to +// "raft" will avoid any issues. +// +// min_threads: Minimum number of threads we'll have at any time. +// Default: 0. +// +// max_threads: Maximum number of threads we'll have at any time. +// Default: Number of CPUs detected on the system. +// +// max_queue_size: Maximum number of items to enqueue before returning a +// Status::ServiceUnavailable message from Submit(). +// Default: INT_MAX. +// +// idle_timeout: How long we'll keep around an idle thread before timing it out. +// We always keep at least min_threads. +// Default: 500 milliseconds. +// +// metrics: Histograms, counters, etc. to update on various threadpool events. +// Default: not set. +// +class ThreadPoolBuilder { +public: + explicit ThreadPoolBuilder(std::string name); + + // Note: We violate the style guide by returning mutable references here + // in order to provide traditional Builder pattern conveniences. + ThreadPoolBuilder& set_min_threads(int min_threads); + ThreadPoolBuilder& set_max_threads(int max_threads); + ThreadPoolBuilder& set_max_queue_size(int max_queue_size); + ThreadPoolBuilder& set_idle_timeout(const MonoDelta& idle_timeout); + + // Instantiate a new ThreadPool with the existing builder arguments. + Status build(std::unique_ptr* pool) const; + +private: + friend class ThreadPool; + const std::string _name; + int _min_threads; + int _max_threads; + int _max_queue_size; + MonoDelta _idle_timeout; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolBuilder); +}; + +// Thread pool with a variable number of threads. +// +// Tasks submitted directly to the thread pool enter a FIFO queue and are +// dispatched to a worker thread when one becomes free. Tasks may also be +// submitted via ThreadPoolTokens. The token Wait() and Shutdown() functions +// can then be used to block on logical groups of tasks. +// +// A token operates in one of two ExecutionModes, determined at token +// construction time: +// 1. SERIAL: submitted tasks are run one at a time. +// 2. CONCURRENT: submitted tasks may be run in parallel. This isn't unlike +// tasks submitted without a token, but the logical grouping that tokens +// impart can be useful when a pool is shared by many contexts (e.g. to +// safely shut down one context, to derive context-specific metrics, etc.). +// +// Tasks submitted without a token or via ExecutionMode::CONCURRENT tokens are +// processed in FIFO order. On the other hand, ExecutionMode::SERIAL tokens are +// processed in a round-robin fashion, one task at a time. This prevents them +// from starving one another. However, tokenless (and CONCURRENT token-based) +// tasks can starve SERIAL token-based tasks. +// +// Usage Example: +// static void Func(int n) { ... } +// class Task : public Runnable { ... } +// +// std::unique_ptr thread_pool; +// CHECK_OK( +// ThreadPoolBuilder("my_pool") +// .set_min_threads(0) +// .set_max_threads(5) +// .set_max_queue_size(10) +// .set_idle_timeout(MonoDelta::FromMilliseconds(2000)) +// .Build(&thread_pool)); +// thread_pool->Submit(shared_ptr(new Task())); +// thread_pool->SubmitFunc(boost::bind(&Func, 10)); +class ThreadPool { +public: + ~ThreadPool(); + + // Wait for the running tasks to complete and then shutdown the threads. + // All the other pending tasks in the queue will be removed. + // NOTE: That the user may implement an external abort logic for the + // runnables, that must be called before Shutdown(), if the system + // should know about the non-execution of these tasks, or the runnable + // require an explicit "abort" notification to exit from the run loop. + void shutdown(); + + // Submits a Runnable class. + Status submit(std::shared_ptr r); + + // Submits a function bound using boost::bind(&FuncName, args...). + Status submit_func(boost::function f); + + // Waits until all the tasks are completed. + void wait(); + + // Waits for the pool to reach the idle state, or until 'until' time is reached. + // Returns true if the pool reached the idle state, false otherwise. + bool wait_until(const MonoTime& until); + + // Waits for the pool to reach the idle state, or until 'delta' time elapses. + // Returns true if the pool reached the idle state, false otherwise. + bool wait_for(const MonoDelta& delta); + + // Allocates a new token for use in token-based task submission. All tokens + // must be destroyed before their ThreadPool is destroyed. + // + // There is no limit on the number of tokens that may be allocated. + enum class ExecutionMode { + // Tasks submitted via this token will be executed serially. + SERIAL, + + // Tasks submitted via this token may be executed concurrently. + CONCURRENT, + }; + std::unique_ptr new_token(ExecutionMode mode); + + // Return the number of threads currently running (or in the process of starting up) + // for this thread pool. + int num_threads() const { + MutexLock l(&_lock); + return _num_threads + _num_threads_pending_start; + } + +private: + friend class ThreadPoolBuilder; + friend class ThreadPoolToken; + + // Client-provided task to be executed by this pool. + struct Task { + std::shared_ptr runnable; + + // Time at which the entry was submitted to the pool. + MonoTime submit_time; + }; + + // Creates a new thread pool using a builder. + explicit ThreadPool(const ThreadPoolBuilder& builder); + + // Initializes the thread pool by starting the minimum number of threads. + Status init(); + + // Dispatcher responsible for dequeueing and executing the tasks + void dispatch_thread(); + + // Create new thread. + // + // REQUIRES: caller has incremented '_num_threads_pending_start' ahead of this call. + // NOTE: For performance reasons, _lock should not be held. + Status create_thread(); + + // Aborts if the current thread is a member of this thread pool. + void check_not_pool_thread_unlocked(); + + // Submits a task to be run via token. + Status do_submit(std::shared_ptr r, ThreadPoolToken* token); + + // Releases token 't' and invalidates it. + void release_token(ThreadPoolToken* t); + + const std::string _name; + const int _min_threads; + const int _max_threads; + const int _max_queue_size; + const MonoDelta _idle_timeout; + + // Overall status of the pool. Set to an error when the pool is shut down. + // + // Protected by '_lock'. + Status _pool_status; + + // Synchronizes many of the members of the pool and all of its + // condition variables. + mutable Mutex _lock; + + // Condition variable for "pool is idling". Waiters wake up when + // _active_threads reaches zero. + ConditionVariable _idle_cond; + + // Condition variable for "pool has no threads". Waiters wake up when + // _num_threads and num_pending_threads_ are both 0. + ConditionVariable _no_threads_cond; + + // Number of threads currently running. + // + // Protected by _lock. + int _num_threads; + + // Number of threads which are in the process of starting. + // When these threads start, they will decrement this counter and + // accordingly increment '_num_threads'. + // + // Protected by _lock. + int _num_threads_pending_start; + + // Number of threads currently running and executing client tasks. + // + // Protected by _lock. + int _active_threads; + + // Total number of client tasks queued, either directly (_queue) or + // indirectly (_tokens). + // + // Protected by _lock. + int _total_queued_tasks; + + // All allocated tokens. + // + // Protected by _lock. + std::unordered_set _tokens; + + // FIFO of tokens from which tasks should be executed. Does not own the + // tokens; they are owned by clients and are removed from the FIFO on shutdown. + // + // Protected by _lock. + std::deque _queue; + + // Pointers to all running threads. Raw pointers are safe because a Thread + // may only go out of scope after being removed from _threads. + // + // Protected by _lock. + std::unordered_set _threads; + + // List of all threads currently waiting for work. + // + // A thread is added to the front of the list when it goes idle and is + // removed from the front and signaled when new work arrives. This produces a + // LIFO usage pattern that is more efficient than idling on a single + // ConditionVariable (which yields FIFO semantics). + // + // Protected by _lock. + struct IdleThread : public boost::intrusive::list_base_hook<> { + explicit IdleThread(Mutex* m) + : not_empty(m) {} + + // Condition variable for "queue is not empty". Waiters wake up when a new + // task is queued. + ConditionVariable not_empty; + + DISALLOW_COPY_AND_ASSIGN(IdleThread); + }; + boost::intrusive::list _idle_threads; // NOLINT(build/include_what_you_use) + + // ExecutionMode::CONCURRENT token used by the pool for tokenless submission. + std::unique_ptr _tokenless; + + DISALLOW_COPY_AND_ASSIGN(ThreadPool); +}; + +// Entry point for token-based task submission and blocking for a particular +// thread pool. Tokens can only be created via ThreadPool::new_token(). +// +// All functions are thread-safe. Mutable members are protected via the +// ThreadPool's lock. +class ThreadPoolToken { +public: + // Destroys the token. + // + // May be called on a token with outstanding tasks, as Shutdown() will be + // called first to take care of them. + ~ThreadPoolToken(); + + // Submits a Runnable class. + Status submit(std::shared_ptr r); + + // Submits a function bound using boost::bind(&FuncName, args...). + Status submit_func(boost::function f); + + // Marks the token as unusable for future submissions. Any queued tasks not + // yet running are destroyed. If tasks are in flight, Shutdown() will wait + // on their completion before returning. + void shutdown(); + + // Waits until all the tasks submitted via this token are completed. + void wait(); + + // Waits for all submissions using this token are complete, or until 'until' + // time is reached. + // + // Returns true if all submissions are complete, false otherwise. + bool wait_until(const MonoTime& until); + + // Waits for all submissions using this token are complete, or until 'delta' + // time elapses. + // + // Returns true if all submissions are complete, false otherwise. + bool wait_for(const MonoDelta& delta); + +private: + // All possible token states. Legal state transitions: + // IDLE -> RUNNING: task is submitted via token + // IDLE -> QUIESCED: token or pool is shut down + // RUNNING -> IDLE: worker thread finishes executing a task and + // there are no more tasks queued to the token + // RUNNING -> QUIESCING: token or pool is shut down while worker thread + // is executing a task + // RUNNING -> QUIESCED: token or pool is shut down + // QUIESCING -> QUIESCED: worker thread finishes executing a task + // belonging to a shut down token or pool + enum class State { + // Token has no queued tasks. + IDLE, + + // A worker thread is running one of the token's previously queued tasks. + RUNNING, + + // No new tasks may be submitted to the token. A worker thread is still + // running a previously queued task. + QUIESCING, + + // No new tasks may be submitted to the token. There are no active tasks + // either. At this state, the token may only be destroyed. + QUIESCED, + }; + + // Writes a textual representation of the token state in 's' to 'o'. + friend std::ostream& operator<<(std::ostream& o, ThreadPoolToken::State s); + + friend class ThreadPool; + + // Returns a textual representation of 's' suitable for debugging. + static const char* state_to_string(State s); + + // Constructs a new token. + // + // The token may not outlive its thread pool ('pool'). + ThreadPoolToken(ThreadPool* pool, ThreadPool::ExecutionMode mode); + + // Changes this token's state to 'new_state' taking actions as needed. + void transition(State new_state); + + // Returns true if this token has a task queued and ready to run, or if a + // task belonging to this token is already running. + bool is_active() const { + return _state == State::RUNNING || + _state == State::QUIESCING; + } + + // Returns true if new tasks may be submitted to this token. + bool may_submit_new_tasks() const { + return _state != State::QUIESCING && + _state != State::QUIESCED; + } + + State state() const { return _state; } + ThreadPool::ExecutionMode mode() const { return _mode; } + + // Token's configured execution mode. + const ThreadPool::ExecutionMode _mode; + + // Pointer to the token's thread pool. + ThreadPool* _pool; + + // Token state machine. + State _state; + + // Queued client tasks. + std::deque _entries; + + // Condition variable for "token is idle". Waiters wake up when the token + // transitions to IDLE or QUIESCED. + ConditionVariable _not_running_cond; + + // Number of worker threads currently executing tasks belonging to this + // token. + int _active_threads; + + DISALLOW_COPY_AND_ASSIGN(ThreadPoolToken); +}; + +} // namespace doris + +#endif //DORIS_BE_SRC_UTIL_THREAD_POOL_H diff --git a/be/test/util/CMakeLists.txt b/be/test/util/CMakeLists.txt index 85d34f4fb70da3..6aba7715236657 100644 --- a/be/test/util/CMakeLists.txt +++ b/be/test/util/CMakeLists.txt @@ -59,5 +59,8 @@ ADD_BE_TEST(path_util_test) ADD_BE_TEST(file_cache_test) ADD_BE_TEST(file_manager_test) ADD_BE_TEST(parse_util_test) +ADD_BE_TEST(countdown_latch_test) ADD_BE_TEST(monotime_test) ADD_BE_TEST(scoped_cleanup_test) +ADD_BE_TEST(thread_test) +ADD_BE_TEST(threadpool_test) diff --git a/be/test/util/countdown_latch_test.cpp b/be/test/util/countdown_latch_test.cpp index 0bac03fa4365c2..e0174e61b8e6b6 100644 --- a/be/test/util/countdown_latch_test.cpp +++ b/be/test/util/countdown_latch_test.cpp @@ -18,11 +18,9 @@ #include #include -#include "gutil/std::unique_ptr.h" #include "gutil/ref_counted.h" #include "util/countdown_latch.h" #include "util/monotime.h" -#include "util/test_macros.h" #include "util/thread.h" #include "util/threadpool.h" @@ -41,19 +39,19 @@ static void decrement_latch(CountDownLatch* latch, int amount) { TEST(TestCountDownLatch, TestLatch) { std::unique_ptr pool; - ASSERT_OK(ThreadPoolBuilder("cdl-test").set_max_threads(1).build(&pool)); + ASSERT_TRUE(ThreadPoolBuilder("cdl-test").set_max_threads(1).build(&pool).ok()); CountDownLatch latch(1000); // Decrement the count by 1 in another thread, this should not fire the // latch. - ASSERT_OK(pool->submit_func(std::bind(decrement_latch, &latch, 1))); + ASSERT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1)).ok()); ASSERT_FALSE(latch.wait_for(MonoDelta::FromMilliseconds(200))); ASSERT_EQ(999, latch.count()); // Now decrement by 1000 this should decrement to 0 and fire the latch // (even though 1000 is one more than the current count). - ASSERT_OK(pool->submit_func(std::bind(decrement_latch, &latch, 1000))); + ASSERT_TRUE(pool->submit_func(std::bind(decrement_latch, &latch, 1000)).ok()); latch.wait(); ASSERT_EQ(0, latch.count()); } @@ -63,11 +61,11 @@ TEST(TestCountDownLatch, TestLatch) { TEST(TestCountDownLatch, TestResetToZero) { CountDownLatch cdl(100); scoped_refptr t; - ASSERT_OK(Thread::create("test", "cdl-test", &CountDownLatch::wait, &cdl, &t)); + ASSERT_TRUE(Thread::create("test", "cdl-test", &CountDownLatch::wait, &cdl, &t).ok()); // Sleep for a bit until it's likely the other thread is waiting on the latch. SleepFor(MonoDelta::FromMilliseconds(10)); - cdl.Reset(0); + cdl.reset(0); t->join(); } diff --git a/be/test/util/thread_test.cpp b/be/test/util/thread_test.cpp new file mode 100644 index 00000000000000..3adf513ff94816 --- /dev/null +++ b/be/test/util/thread_test.cpp @@ -0,0 +1,126 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "util/thread.h" + +#include +#include + +#include +#include +#include + +#include + +#include "common/status.h" +#include "common/logging.h" +#include "gutil/basictypes.h" +#include "gutil/ref_counted.h" +#include "util/countdown_latch.h" +#include "util/runtime_profile.h" + +using std::string; + +namespace doris { + +class ThreadTest : public ::testing::Test { +public: + virtual void SetUp() {} + virtual void TearDown() {} +}; + +// Join with a thread and emit warnings while waiting to join. +// This has to be manually verified. +TEST_F(ThreadTest, TestJoinAndWarn) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 1000*1000, &holder); + ASSERT_TRUE(status.ok()); + status = ThreadJoiner(holder.get()) + .warn_after_ms(10) + .warn_every_ms(100) + .join(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(ThreadTest, TestFailedJoin) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 1000*1000, &holder); + ASSERT_TRUE(status.ok()); + status = ThreadJoiner(holder.get()) + .give_up_after_ms(50) + .join(); + ASSERT_TRUE(status.is_aborted()); +} + +static void TryJoinOnSelf() { + Status s = ThreadJoiner(Thread::current_thread()).join(); + // Use CHECK instead of ASSERT because gtest isn't thread-safe. + CHECK(s.is_invalid_argument()); +} + +// Try to join on the thread that is currently running. +TEST_F(ThreadTest, TestJoinOnSelf) { + scoped_refptr holder; + ASSERT_TRUE(Thread::create("test", "test", TryJoinOnSelf, &holder).ok()); + holder->join(); + // Actual assertion is done by the thread spawned above. +} + +TEST_F(ThreadTest, TestDoubleJoinIsNoOp) { + scoped_refptr holder; + Status status = Thread::create("test", "sleeper thread", usleep, 0, &holder); + ASSERT_TRUE(status.ok()); + ThreadJoiner joiner(holder.get()); + status = joiner.join(); + ASSERT_TRUE(status.ok()); + status = joiner.join(); + ASSERT_TRUE(status.ok()); +} + +TEST_F(ThreadTest, ThreadStartBenchmark) { + std::vector> threads(1000); + { + int64_t thread_creation_ns = 0; + SCOPED_RAW_TIMER(&thread_creation_ns); + for (auto& t : threads) { + Status status = Thread::create("test", "TestCallOnExit", usleep, 0, &t); + ASSERT_TRUE(status.ok()); + } + std::cout << "create 1000 threads use:" + << thread_creation_ns << "ns" << std::endl; + } + { + int64_t thread_publish_tid_ns = 0; + SCOPED_RAW_TIMER(&thread_publish_tid_ns); + for (auto& t : threads) { + t->tid(); + } + std::cout << "1000 threads publish TIDS use:" + << thread_publish_tid_ns << "ns" << std::endl; + } + + for (auto& t : threads) { + t->join(); + } +} + +} // namespace doris + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/be/test/util/threadpool_test.cpp b/be/test/util/threadpool_test.cpp new file mode 100644 index 00000000000000..b752de3cd2a6c6 --- /dev/null +++ b/be/test/util/threadpool_test.cpp @@ -0,0 +1,796 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include "common/logging.h" +#include "common/status.h" +#include "gutil/atomicops.h" +#include "gutil/port.h" +#include "gutil/ref_counted.h" +#include "gutil/strings/substitute.h" +#include "gutil/sysinfo.h" +#include "gutil/walltime.h" +#include "util/barrier.h" +#include "util/countdown_latch.h" +#include "util/spinlock.h" +#include "util/metrics.h" +#include "util/monotime.h" +#include "util/random.h" +#include "util/scoped_cleanup.h" +#include "util/threadpool.h" + +using std::atomic; +using std::shared_ptr; +using std::string; +using std::thread; +using std::unique_ptr; +using std::vector; + +using strings::Substitute; + +DECLARE_int32(thread_inject_start_latency_ms); + +namespace doris { + +static const char* kDefaultPoolName = "test"; + +class ThreadPoolTest : public ::testing::Test { +public: + virtual void SetUp() override { + ASSERT_TRUE(ThreadPoolBuilder(kDefaultPoolName).build(&_pool).ok()); + } + + Status rebuild_pool_with_builder(const ThreadPoolBuilder& builder) { + return builder.build(&_pool); + } + + Status rebuild_pool_with_min_max(int min_threads, int max_threads) { + return ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(min_threads) + .set_max_threads(max_threads) + .build(&_pool); + } + +protected: + unique_ptr _pool; +}; + +TEST_F(ThreadPoolTest, TestNoTaskOpenClose) { + ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok()); + _pool->shutdown(); +} + +static void simple_task_method(int n, std::atomic* counter) { + while (n--) { + (*counter)++; + boost::detail::yield(n); + } +} + +class SimpleTask : public Runnable { +public: + SimpleTask(int n, std::atomic* counter) + : _n(n), _counter(counter) {} + + void run() override { + simple_task_method(_n, _counter); + } + +private: + int _n; + std::atomic* _counter; +}; + +TEST_F(ThreadPoolTest, TestSimpleTasks) { + ASSERT_TRUE(rebuild_pool_with_min_max(4, 4).ok()); + + std::atomic counter(0); + std::shared_ptr task(new SimpleTask(15, &counter)); + + ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 10, &counter)).ok()); + ASSERT_TRUE(_pool->submit(task).ok()); + ASSERT_TRUE(_pool->submit_func(boost::bind(&simple_task_method, 20, &counter)).ok()); + ASSERT_TRUE(_pool->submit(task).ok()); + _pool->wait(); + ASSERT_EQ(10 + 15 + 20 + 15, counter.load()); + _pool->shutdown(); +} + +class SlowTask : public Runnable { +public: + explicit SlowTask(CountDownLatch* latch) + : _latch(latch) {} + + void run() override { + _latch->wait(); + } + + static shared_ptr new_slow_task(CountDownLatch* latch) { + return std::make_shared(latch); + } + +private: + CountDownLatch* _latch; +}; + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMinimum) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(3) + .set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok()); + + // There are no threads to start with. + ASSERT_TRUE(_pool->num_threads() == 0); + // We get up to 3 threads when submitting work. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.count_down(); + }); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(2, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + // The 4th piece of work gets queued. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + // Finish all work + latch.count_down(); + _pool->wait(); + ASSERT_EQ(0, _pool->_active_threads); + _pool->shutdown(); + ASSERT_EQ(0, _pool->num_threads()); +} + +TEST_F(ThreadPoolTest, TestThreadPoolWithNoMaxThreads) { + // By default a threadpool's max_threads is set to the number of CPUs, so + // this test submits more tasks than that to ensure that the number of CPUs + // isn't some kind of upper bound. + const int kNumCPUs = base::NumCPUs(); + + // Build a threadpool with no limit on the maximum number of threads. + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(std::numeric_limits::max())).ok()); + CountDownLatch latch(1); + auto cleanup_latch = MakeScopedCleanup([&]() { + latch.count_down(); + }); + + // submit tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs * 2; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 2), _pool->num_threads()); + + // submit tasks on two tokens. Only two threads should be created. + unique_ptr t1 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + unique_ptr t2 = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + for (int i = 0; i < kNumCPUs * 2; i++) { + ThreadPoolToken* t = (i % 2 == 0) ? t1.get() : t2.get(); + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 2) + 2, _pool->num_threads()); + + // submit more tokenless tasks. Each should create a new thread. + for (int i = 0; i < kNumCPUs; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ((kNumCPUs * 3) + 2, _pool->num_threads()); + + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +// Regression test for a bug where a task is submitted exactly +// as a thread is about to exit. Previously this could hang forever. +TEST_F(ThreadPoolTest, TestRace) { + alarm(60); + auto cleanup = MakeScopedCleanup([]() { + alarm(0); // Disable alarm on test exit. + }); + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(0) + .set_max_threads(1) + .set_idle_timeout(MonoDelta::FromMicroseconds(1))).ok()); + + for (int i = 0; i < 500; i++) { + CountDownLatch l(1); + ASSERT_TRUE(_pool->submit_func(boost::bind(&CountDownLatch::count_down, &l)).ok()); + l.wait(); + // Sleeping a different amount in each iteration makes it more likely to hit + // the bug. + SleepFor(MonoDelta::FromMicroseconds(i)); + } +} + +TEST_F(ThreadPoolTest, TestVariableSizeThreadPool) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(4) + .set_idle_timeout(MonoDelta::FromMilliseconds(1))).ok()); + + // There is 1 thread to start with. + ASSERT_EQ(1, _pool->num_threads()); + // We get up to 4 threads when submitting work. + CountDownLatch latch(1); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(1, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(2, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(3, _pool->num_threads()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(4, _pool->num_threads()); + // The 5th piece of work gets queued. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_EQ(4, _pool->num_threads()); + // Finish all work + latch.count_down(); + _pool->wait(); + ASSERT_EQ(0, _pool->_active_threads); + _pool->shutdown(); + ASSERT_EQ(0, _pool->num_threads()); +} + +TEST_F(ThreadPoolTest, TestMaxQueueSize) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1)).ok()); + + CountDownLatch latch(1); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + Status s = _pool->submit(SlowTask::new_slow_task(&latch)); + CHECK(s.is_service_unavailable()) << "Expected failure due to queue blowout:" + << s.to_string(); + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +// Test that when we specify a zero-sized queue, the maximum number of threads +// running is used for enforcement. +TEST_F(ThreadPoolTest, TestZeroQueueSize) { + const int kMaxThreads = 4; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_queue_size(0) + .set_max_threads(kMaxThreads)).ok()); + + CountDownLatch latch(1); + for (int i = 0; i < kMaxThreads; i++) { + ASSERT_TRUE(_pool->submit(SlowTask::new_slow_task(&latch)).ok()); + } + Status s = _pool->submit(SlowTask::new_slow_task(&latch)); + ASSERT_TRUE(s.is_service_unavailable()) << s.to_string(); + latch.count_down(); + _pool->wait(); + _pool->shutdown(); +} + +/* +// Test that a thread pool will crash if asked to run its own blocking +// functions in a pool thread. +// +// In a multi-threaded application, TSAN is unsafe to use following a fork(). +// After a fork(), TSAN will: +// 1. Disable verification, expecting an exec() soon anyway, and +// 2. Die on future thread creation. +// For some reason, this test triggers behavior #2. We could disable it with +// the TSAN option die_after_fork=0, but this can (supposedly) lead to +// deadlocks, so we'll disable the entire test instead. +#ifndef THREAD_SANITIZER +TEST_F(ThreadPoolTest, TestDeadlocks) { + const char* death_msg = "called pool function that would result in deadlock"; + ASSERT_DEATH({ + ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); + ASSERT_TRUE(_pool->submit_func( + Bind(&ThreadPool::shutdown, Unretained(_pool.get()))).ok()); + _pool->wait(); + }, death_msg); + + ASSERT_DEATH({ + ASSERT_TRUE(rebuild_pool_with_min_max(1, 1).ok()); + ASSERT_TRUE(_pool->submit_func( + Bind(&ThreadPool::ok(), Unretained(_pool.get()))).ok()); + _pool->wait(); + }, death_msg); +} +#endif +*/ + +class SlowDestructorRunnable : public Runnable { +public: + void run() override {} + + virtual ~SlowDestructorRunnable() { + SleepFor(MonoDelta::FromMilliseconds(100)); + } +}; + +// Test that if a tasks's destructor is slow, it doesn't cause serialization of the tasks +// in the queue. +TEST_F(ThreadPoolTest, TestSlowDestructor) { + ASSERT_TRUE(rebuild_pool_with_min_max(1, 20).ok()); + MonoTime start = MonoTime::Now(); + for (int i = 0; i < 100; i++) { + shared_ptr task(new SlowDestructorRunnable()); + ASSERT_TRUE(_pool->submit(std::move(task)).ok()); + } + _pool->wait(); + ASSERT_LT((MonoTime::Now() - start).ToSeconds(), 5); +} + +// For test cases that should run with both kinds of tokens. +class ThreadPoolTestTokenTypes : public ThreadPoolTest, + public testing::WithParamInterface {}; + +INSTANTIATE_TEST_CASE_P(Tokens, ThreadPoolTestTokenTypes, + ::testing::Values(ThreadPool::ExecutionMode::SERIAL, + ThreadPool::ExecutionMode::CONCURRENT)); + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitAndWait) { + unique_ptr t = _pool->new_token(GetParam()); + int i = 0; + Status status = t->submit_func([&]() { + SleepFor(MonoDelta::FromMilliseconds(1)); + i++; + }); + ASSERT_TRUE(status.ok()); + t->wait(); + ASSERT_EQ(1, i); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsProcessedSerially) { + unique_ptr t = _pool->new_token(ThreadPool::ExecutionMode::SERIAL); + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + string result; + for (char c = 'a'; c < 'f'; c++) { + // Sleep a little first so that there's a higher chance of out-of-order + // appends if the submissions did execute in parallel. + int sleep_ms = r.Next() % 5; + Status status= t->submit_func([&result, c, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + result += c; + }); + ASSERT_TRUE(status.ok()); + } + t->wait(); + ASSERT_EQ("abcde", result); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmitsProcessedConcurrently) { + const int kNumTokens = 5; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumTokens)).ok()); + vector> tokens; + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + std::shared_ptr b = std::make_shared(kNumTokens + 1); + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(_pool->new_token(GetParam())); + ASSERT_TRUE(tokens.back()->submit_func([b]() { + b->wait(); + }).ok()); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->wait(); +} + +TEST_F(ThreadPoolTest, TestTokenSubmitsNonSequential) { + const int kNumSubmissions = 5; + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumSubmissions)).ok()); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + shared_ptr b = std::make_shared(kNumSubmissions + 1); + unique_ptr t = _pool->new_token(ThreadPool::ExecutionMode::CONCURRENT); + for (int i = 0; i < kNumSubmissions; i++) { + ASSERT_TRUE(t->submit_func([b]() { + b->wait(); + }).ok()); + } + + // This will deadlock if the above tasks weren't all running concurrently. + b->wait(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenShutdown) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(4)).ok()); + + unique_ptr t1(_pool->new_token(GetParam())); + unique_ptr t2(_pool->new_token(GetParam())); + CountDownLatch l1(1); + CountDownLatch l2(1); + + // A violation to the tested invariant would yield a deadlock, so let's set + // up an alarm to bail us out. + alarm(60); + SCOPED_CLEANUP({ + alarm(0); // Disable alarm on test exit. + }); + + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(t1->submit_func([&]() { + l1.wait(); + }).ok()); + } + for (int i = 0; i < 3; i++) { + ASSERT_TRUE(t2->submit_func([&]() { + l2.wait(); + }).ok()); + } + + // Unblock all of t1's tasks, but not t2's tasks. + l1.count_down(); + + // If this also waited for t2's tasks, it would deadlock. + t1->shutdown(); + + // We can no longer submit to t1 but we can still submit to t2. + ASSERT_TRUE(t1->submit_func([](){}).is_service_unavailable()); + ASSERT_TRUE(t2->submit_func([](){}).ok()); + + // Unblock t2's tasks. + l2.count_down(); + t2->shutdown(); +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenWaitForAll) { + const int kNumTokens = 3; + const int kNumSubmissions = 20; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + vector> tokens; + for (int i = 0; i < kNumTokens; i++) { + tokens.emplace_back(_pool->new_token(GetParam())); + } + + atomic v(0); + for (int i = 0; i < kNumSubmissions; i++) { + // Sleep a little first to raise the likelihood of the test thread + // reaching wait() before the submissions finish. + int sleep_ms = r.Next() % 5; + + auto task = [&v, sleep_ms]() { + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + v++; + }; + + // Half of the submissions will be token-less, and half will use a token. + if (i % 2 == 0) { + ASSERT_TRUE(_pool->submit_func(task).ok()); + } else { + int token_idx = r.Next() % tokens.size(); + ASSERT_TRUE(tokens[token_idx]->submit_func(task).ok()); + } + } + _pool->wait(); + ASSERT_EQ(kNumSubmissions, v); +} + +TEST_F(ThreadPoolTest, TestFuzz) { + const int kNumOperations = 1000; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random r(seed); + vector> tokens; + + for (int i = 0; i < kNumOperations; i++) { + // Operation distribution: + // + // - submit without a token: 40% + // - submit with a randomly selected token: 35% + // - Allocate a new token: 10% + // - Wait on a randomly selected token: 7% + // - shutdown a randomly selected token: 4% + // - Deallocate a randomly selected token: 2% + // - Wait for all submissions: 2% + int op = r.Next() % 100; + if (op < 40) { + // submit without a token. + int sleep_ms = r.Next() % 5; + ASSERT_TRUE(_pool->submit_func([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }).ok()); + } else if (op < 75) { + // submit with a randomly selected token. + if (tokens.empty()) { + continue; + } + int sleep_ms = r.Next() % 5; + int token_idx = r.Next() % tokens.size(); + Status s = tokens[token_idx]->submit_func([sleep_ms]() { + // Sleep a little first to increase task overlap. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + ASSERT_TRUE(s.ok() || s.is_service_unavailable()); + } else if (op < 85) { + // Allocate a token with a randomly selected policy. + ThreadPool::ExecutionMode mode = r.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens.emplace_back(_pool->new_token(mode)); + } else if (op < 92) { + // Wait on a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->wait(); + } else if (op < 96) { + // shutdown a randomly selected token. + if (tokens.empty()) { + continue; + } + int token_idx = r.Next() % tokens.size(); + tokens[token_idx]->shutdown(); + } else if (op < 98) { + // Deallocate a randomly selected token. + if (tokens.empty()) { + continue; + } + auto it = tokens.begin(); + int token_idx = r.Next() % tokens.size(); + std::advance(it, token_idx); + tokens.erase(it); + } else { + // Wait on everything. + ASSERT_LT(op, 100); + ASSERT_GE(op, 98); + _pool->wait(); + } + } + + // Some test runs will shut down the pool before the tokens, and some won't. + // Either way should be safe. + if (r.Next() % 2 == 0) { + _pool->shutdown(); + } +} + +TEST_P(ThreadPoolTestTokenTypes, TestTokenSubmissionsAdhereToMaxQueueSize) { + ASSERT_TRUE(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_min_threads(1) + .set_max_threads(1) + .set_max_queue_size(1)).ok()); + + CountDownLatch latch(1); + unique_ptr t = _pool->new_token(GetParam()); + SCOPED_CLEANUP({ + latch.count_down(); + }); + // We will be able to submit two tasks: one for max_threads == 1 and one for + // max_queue_size == 1. + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + ASSERT_TRUE(t->submit(SlowTask::new_slow_task(&latch)).ok()); + Status s = t->submit(SlowTask::new_slow_task(&latch)); + ASSERT_TRUE(s.is_service_unavailable()); +} + +TEST_F(ThreadPoolTest, TestTokenConcurrency) { + const int kNumTokens = 20; + const int kTestRuntimeSecs = 1; + const int kCycleThreads = 2; + const int kShutdownThreads = 2; + const int kWaitThreads = 2; + const int kSubmitThreads = 8; + + vector> tokens; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random rng(seed); + + // Protects 'tokens' and 'rng'. + SpinLock lock; + + // Fetch a token from 'tokens' at random. + auto GetRandomToken = [&]() -> shared_ptr { + std::lock_guard l(lock); + int idx = rng.Uniform(kNumTokens); + return tokens[idx]; + }; + + // Preallocate all of the tokens. + for (int i = 0; i < kNumTokens; i++) { + ThreadPool::ExecutionMode mode; + { + std::lock_guard l(lock); + mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + } + tokens.emplace_back(_pool->new_token(mode).release()); + } + + atomic total_num_tokens_cycled(0); + atomic total_num_tokens_shutdown(0); + atomic total_num_tokens_waited(0); + atomic total_num_tokens_submitted(0); + + CountDownLatch latch(1); + vector threads; + + for (int i = 0; i < kCycleThreads; i++) { + // Pick a token at random and replace it. + // + // The replaced token is only destroyed when the last ref is dropped, + // possibly by another thread. + threads.emplace_back([&]() { + int num_tokens_cycled = 0; + while (latch.count()) { + { + std::lock_guard l(lock); + int idx = rng.Uniform(kNumTokens); + ThreadPool::ExecutionMode mode = rng.Next() % 2 ? + ThreadPool::ExecutionMode::SERIAL : + ThreadPool::ExecutionMode::CONCURRENT; + tokens[idx] = shared_ptr(_pool->new_token(mode).release()); + } + num_tokens_cycled++; + + // Sleep a bit, otherwise this thread outpaces the other threads and + // nothing interesting happens to most tokens. + SleepFor(MonoDelta::FromMicroseconds(10)); + } + total_num_tokens_cycled += num_tokens_cycled; + }); + } + + for (int i = 0; i < kShutdownThreads; i++) { + // Pick a token at random and shut it down. Submitting a task to a shut + // down token will return a ServiceUnavailable error. + threads.emplace_back([&]() { + int num_tokens_shutdown = 0; + while (latch.count()) { + GetRandomToken()->shutdown(); + num_tokens_shutdown++; + } + total_num_tokens_shutdown += num_tokens_shutdown; + }); + } + + for (int i = 0; i < kWaitThreads; i++) { + // Pick a token at random and wait for any outstanding tasks. + threads.emplace_back([&]() { + int num_tokens_waited = 0; + while (latch.count()) { + GetRandomToken()->wait(); + num_tokens_waited++; + } + total_num_tokens_waited += num_tokens_waited; + }); + } + + for (int i = 0; i < kSubmitThreads; i++) { + // Pick a token at random and submit a task to it. + threads.emplace_back([&]() { + int num_tokens_submitted = 0; + int32_t seed = static_cast(GetCurrentTimeMicros()); + srand(seed); + Random rng(seed); + while (latch.count()) { + int sleep_ms = rng.Next() % 5; + Status s = GetRandomToken()->submit_func([sleep_ms]() { + // Sleep a little first so that tasks are running during other events. + SleepFor(MonoDelta::FromMilliseconds(sleep_ms)); + }); + CHECK(s.ok() || s.is_service_unavailable()); + num_tokens_submitted++; + } + total_num_tokens_submitted += num_tokens_submitted; + }); + } + + SleepFor(MonoDelta::FromSeconds(kTestRuntimeSecs)); + latch.count_down(); + for (auto& t : threads) { + t.join(); + } + + LOG(INFO) << Substitute("Tokens cycled ($0 threads): $1", + kCycleThreads, total_num_tokens_cycled.load()); + LOG(INFO) << Substitute("Tokens shutdown ($0 threads): $1", + kShutdownThreads, total_num_tokens_shutdown.load()); + LOG(INFO) << Substitute("Tokens waited ($0 threads): $1", + kWaitThreads, total_num_tokens_waited.load()); + LOG(INFO) << Substitute("Tokens submitted ($0 threads): $1", + kSubmitThreads, total_num_tokens_submitted.load()); +} + +/* +TEST_F(ThreadPoolTest, TestLIFOThreadWakeUps) { + const int kNumThreads = 10; + + // Test with a pool that allows for kNumThreads concurrent threads. + ASSERT_OK(rebuild_pool_with_builder(ThreadPoolBuilder(kDefaultPoolName) + .set_max_threads(kNumThreads)).ok()); + + // Submit kNumThreads slow tasks and unblock them, in order to produce + // kNumThreads worker threads. + CountDownLatch latch(1); + SCOPED_CLEANUP({ + latch.CountDown(); + }); + for (int i = 0; i < kNumThreads; i++) { + ASSERT_OK(pool_->submit(SlowTask::new_slow_task(&latch)).ok()); + } + ASSERT_EQ(kNumThreads, _pool->num_threads()); + latch.count_down(); + pool_->wait(); + + // The kNumThreads threads are idle and waiting for the idle timeout. + + // Submit a slow trickle of lightning fast tasks. + // + // If the threads are woken up in FIFO order, this trickle is enough to + // prevent all of them from idling and the AssertEventually will time out. + // + // If LIFO order is used, the same thread will be reused for each task and + // the other threads will eventually time out. + AssertEventually([&]() { + ASSERT_OK(_pool->submit_func([](){}).ok()); + SleepFor(MonoDelta::FromMilliseconds(10)); + ASSERT_EQ(1, _pool->num_threads()); + }, MonoDelta::FromSeconds(10), AssertBackoff::NONE); + NO_PENDING_FATALS(); +} +*/ + +} // namespace doris + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/run-ut.sh b/run-ut.sh index 26f4719e8fed58..5d67c5b760dddb 100755 --- a/run-ut.sh +++ b/run-ut.sh @@ -170,8 +170,11 @@ ${DORIS_TEST_BINARY_DIR}/util/path_util_test ${DORIS_TEST_BINARY_DIR}/util/file_cache_test ${DORIS_TEST_BINARY_DIR}/util/file_manager_test ${DORIS_TEST_BINARY_DIR}/util/parse_util_test +${DORIS_TEST_BINARY_DIR}/util/countdown_latch_test ${DORIS_TEST_BINARY_DIR}/util/monotime_test ${DORIS_TEST_BINARY_DIR}/util/scoped_cleanup_test +${DORIS_TEST_BINARY_DIR}/util/thread_test +${DORIS_TEST_BINARY_DIR}/util/threadpool_test # Running common Unittest ${DORIS_TEST_BINARY_DIR}/common/resource_tls_test