From 44d9a33e828ad521ec634b18a9441bda58342a1c Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Sun, 28 Nov 2021 20:56:53 +0800 Subject: [PATCH 1/2] thread local context --- be/src/runtime/CMakeLists.txt | 2 + be/src/runtime/thread_context.cpp | 32 +++++++ be/src/runtime/thread_context.h | 150 ++++++++++++++++++++++++++++++ be/src/runtime/threadlocal.cc | 84 +++++++++++++++++ be/src/runtime/threadlocal.h | 123 ++++++++++++++++++++++++ 5 files changed, 391 insertions(+) create mode 100644 be/src/runtime/thread_context.cpp create mode 100644 be/src/runtime/thread_context.h create mode 100644 be/src/runtime/threadlocal.cc create mode 100644 be/src/runtime/threadlocal.h diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt index 142934d5cbf602..f9bb017bd38ef6 100644 --- a/be/src/runtime/CMakeLists.txt +++ b/be/src/runtime/CMakeLists.txt @@ -46,7 +46,9 @@ set(RUNTIME_FILES runtime_state.cpp runtime_filter_mgr.cpp string_value.cpp + thread_context.cpp thread_resource_mgr.cpp + threadlocal.cc decimalv2_value.cpp large_int_value.cpp collection_value.cpp diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp new file mode 100644 index 00000000000000..871cd1ebf0c34d --- /dev/null +++ b/be/src/runtime/thread_context.cpp @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "runtime/thread_context.h" + +namespace doris { + +DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx); + +ThreadContextPtr::ThreadContextPtr() { + INIT_STATIC_THREAD_LOCAL(ThreadContext, thread_local_ctx); +} + +ThreadContext* ThreadContextPtr::get() { + return thread_local_ctx; +} + +} // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h new file mode 100644 index 00000000000000..8b969a4e2388b8 --- /dev/null +++ b/be/src/runtime/thread_context.h @@ -0,0 +1,150 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include + +#include "common/logging.h" +#include "gen_cpp/Types_types.h" +#include "runtime/threadlocal.h" + +#define SCOPED_ATTACH_TASK_THREAD_4ARG(query_type, task_id, fragment_instance_id) \ + auto VARNAME_LINENUM(attach_task_thread) = \ + AttachTaskThread(query_type, task_id, fragment_instance_id) + +namespace doris { + +class TUniqueId; + +// The thread context saves some info about a working thread. +// 2 requried info: +// 1. thread_id: Current thread id, Auto generated. +// 2. type: The type is a enum value indicating which type of task current thread is running. +// For example: QUERY, LOAD, COMPACTION, ... +// 3. task id: A unique id to identify this task. maybe query id, load job id, etc. +// +// There may be other optional info to be added later. +class ThreadContext { +public: + enum TaskType { + UNKNOWN = 0, + QUERY = 1, + LOAD = 2, + COMPACTION = 3 + // to be added ... + }; + +public: + ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) {} + + void attach(const TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id) { + DCHECK(_type == TaskType::UNKNOWN && _task_id == ""); + _type = type; + _task_id = task_id; + _fragment_instance_id = fragment_instance_id; + } + + void detach() { + _type = TaskType::UNKNOWN; + _task_id = ""; + _fragment_instance_id = TUniqueId(); + } + + const std::string type() const; + const std::string& task_id() const { return _task_id; } + const std::thread::id& thread_id() const { return _thread_id; } + const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; } + +private: + std::thread::id _thread_id; + TaskType _type; + std::string _task_id; + TUniqueId _fragment_instance_id; +}; + +// Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error, +// see https://github.com/apache/incubator-doris/pull/7911 +// +// If we want to avoid this error, +// 1. For non-trivial variables in thread_local, such as std::string, you need to store them as pointers to +// ensure that thread_local is trivial, these non-trivial pointers will uniformly call destructors elsewhere. +// 2. The default destructor of the thread_local variable cannot be overridden. +// +// This is difficult to implement. Because the destructor is not overwritten, it means that the outside cannot +// be notified when the thread terminates, and the non-trivial pointers in thread_local cannot be released in time. +// The func provided by pthread and std::thread doesn't help either. +// +// So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by +// Thread-scopedthread local + Class-scoped thread local. +// +// This may look very track, but it's the best way I can find. +// +// refer to: +// https://gcc.gnu.org/onlinedocs/gcc-3.3.1/gcc/Thread-Local.html +// https://stackoverflow.com/questions/12049684/ +// https://sourceware.org/glibc/wiki/Destructor%20support%20for%20thread_local%20variables +// https://www.jianshu.com/p/756240e837dd +// https://man7.org/linux/man-pages/man3/pthread_tryjoin_np.3.html +class ThreadContextPtr { +public: + ThreadContextPtr(); + + ThreadContext* get(); + +private: + DECLARE_STATIC_THREAD_LOCAL(ThreadContext, thread_local_ctx); +}; + +inline thread_local ThreadContextPtr thread_local_ctx; + +inline const std::string task_type_string(ThreadContext::TaskType type) { + switch (type) { + case ThreadContext::TaskType::QUERY: + return "QUERY"; + case ThreadContext::TaskType::LOAD: + return "LOAD"; + case ThreadContext::TaskType::COMPACTION: + return "COMPACTION"; + default: + return "UNKNOWN"; + } +} + +inline const std::string ThreadContext::type() const { + return task_type_string(_type); +} + +class AttachTaskThread { +public: + explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id) { + DCHECK(task_id != "" && fragment_instance_id != TUniqueId()); + init(type, task_id, fragment_instance_id); + } + + void init(const ThreadContext::TaskType& type, const std::string& task_id, + const TUniqueId& fragment_instance_id) { + thread_local_ctx.get()->attach(type, task_id, fragment_instance_id); + } + + ~AttachTaskThread() { thread_local_ctx.get()->detach(); } +}; + +} // namespace doris diff --git a/be/src/runtime/threadlocal.cc b/be/src/runtime/threadlocal.cc new file mode 100644 index 00000000000000..ac2bf2e62a9094 --- /dev/null +++ b/be/src/runtime/threadlocal.cc @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#include "runtime/threadlocal.h" + +#include + +#include +#include +#include + +#include "common/logging.h" +#include "gutil/once.h" +#include "util/errno.h" + +namespace doris { + +// One key used by the entire process to attach destructors on thread exit. +static pthread_key_t destructors_key; + +// The above key must only be initialized once per process. +static GoogleOnceType once = GOOGLE_ONCE_INIT; + +namespace { + +// List of destructors for all thread locals instantiated on a given thread. +struct PerThreadDestructorList { + void (*destructor)(void*); + void* arg; + PerThreadDestructorList* next; +}; + +} // anonymous namespace + +// Call all the destructors associated with all THREAD_LOCAL instances in this +// thread. +static void invoke_destructors(void* t) { + PerThreadDestructorList* d = reinterpret_cast(t); + while (d != nullptr) { + d->destructor(d->arg); + PerThreadDestructorList* next = d->next; + delete d; + d = next; + } +} + +// This key must be initialized only once. +static void create_key() { + int ret = pthread_key_create(&destructors_key, &invoke_destructors); + // Linux supports up to 1024 keys, we will use only one for all thread locals. + CHECK_EQ(0, ret) << "pthread_key_create() failed, cannot add destructor to thread: " + << "error " << ret << ": " << errno_to_string(ret); +} + +// Adds a destructor to the list. +void add_destructor(void (*destructor)(void*), void* arg) { + GoogleOnceInit(&once, &create_key); + + // Returns NULL if nothing is set yet. + std::unique_ptr p(new PerThreadDestructorList()); + p->destructor = destructor; + p->arg = arg; + p->next = reinterpret_cast(pthread_getspecific(destructors_key)); + int ret = pthread_setspecific(destructors_key, p.release()); + // The only time this check should fail is if we are out of memory, or if + // somehow key creation failed, which should be caught by the above CHECK. + CHECK_EQ(0, ret) << "pthread_setspecific() failed, cannot update destructor list: " + << "error " << ret << ": " << errno_to_string(ret); +} + +} // namespace doris diff --git a/be/src/runtime/threadlocal.h b/be/src/runtime/threadlocal.h new file mode 100644 index 00000000000000..aa7507ddff8a89 --- /dev/null +++ b/be/src/runtime/threadlocal.h @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Block-scoped static thread local implementation. +// +// Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro +// defines a thread-local pointer to the specified type, which is lazily +// instantiated by any thread entering the block for the first time. The +// constructor for the type T is invoked at macro execution time, as expected, +// and its destructor is invoked when the corresponding thread's Runnable +// returns, or when the thread exits. +// +// Inspired by Poco , +// Andrew Tomazos , and +// the C++11 thread_local API. +// +// Example usage: +// +// // Invokes a 3-arg constructor on SomeClass: +// BLOCK_STATIC_THREAD_LOCAL(SomeClass, instance, arg1, arg2, arg3); +// instance->DoSomething(); +// + +#pragma once + +#include "gutil/port.h" + +#define BLOCK_STATIC_THREAD_LOCAL(T, t, ...) \ + static __thread T* t; \ + do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + add_destructor(destroy, t); \ + } \ + } while (false) + +// Class-scoped static thread local implementation. +// +// Very similar in implementation to the above block-scoped version, but +// requires a bit more syntax and vigilance to use properly. +// +// DECLARE_STATIC_THREAD_LOCAL(Type, instance_var_) must be placed in the +// class header, as usual for variable declarations. +// +// Because these variables are static, they must also be defined in the impl +// file with DEFINE_STATIC_THREAD_LOCAL(Type, Classname, instance_var_), +// which is very much like defining any static member, i.e. int Foo::member_. +// +// Finally, each thread must initialize the instance before using it by calling +// INIT_STATIC_THREAD_LOCAL(Type, instance_var_, ...). This is a cheap +// call, and may be invoked at the top of any method which may reference a +// thread-local variable. +// +// Due to all of these requirements, you should probably declare TLS members +// as private. +// +// Example usage: +// +// // foo.h +// #include "kudu/utils/file.h" +// class Foo { +// public: +// void DoSomething(std::string s); +// private: +// DECLARE_STATIC_THREAD_LOCAL(utils::File, file_); +// }; +// +// // foo.cc +// #include "kudu/foo.h" +// DEFINE_STATIC_THREAD_LOCAL(utils::File, Foo, file_); +// void Foo::WriteToFile(std::string s) { +// // Call constructor if necessary. +// INIT_STATIC_THREAD_LOCAL(utils::File, file_, "/tmp/file_location.txt"); +// file_->Write(s); +// } + +// Goes in the class declaration (usually in a header file). +// dtor must be destructed _after_ t, so it gets defined first. +// Uses a mangled variable name for dtor since it must also be a member of the +// class. +#define DECLARE_STATIC_THREAD_LOCAL(T, t) static __thread T* t + +// You must also define the instance in the .cc file. +#define DEFINE_STATIC_THREAD_LOCAL(T, Class, t) __thread T* Class::t + +// Must be invoked at least once by each thread that will access t. +#define INIT_STATIC_THREAD_LOCAL(T, t, ...) \ + do { \ + if (PREDICT_FALSE(t == NULL)) { \ + t = new T(__VA_ARGS__); \ + add_destructor(destroy, t); \ + } \ + } while (false) + +// Internal implementation below. + +namespace doris { + +// Add a destructor to the list. +void add_destructor(void (*destructor)(void*), void* arg); + +// Destroy the passed object of type T. +template +static void destroy(void* t) { + // With tcmalloc, this should be pretty cheap (same thread as new). + delete reinterpret_cast(t); +} + +} // namespace doris From 841f226f23e92af5220a8d1ffc871234194c0bbd Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 9 Mar 2022 11:46:03 +0800 Subject: [PATCH 2/2] fix comment --- be/src/runtime/thread_context.h | 17 +++++------------ be/src/runtime/threadlocal.h | 4 ++++ 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 8b969a4e2388b8..7c2a97faac173b 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -24,9 +24,8 @@ #include "gen_cpp/Types_types.h" #include "runtime/threadlocal.h" -#define SCOPED_ATTACH_TASK_THREAD_4ARG(query_type, task_id, fragment_instance_id) \ - auto VARNAME_LINENUM(attach_task_thread) = \ - AttachTaskThread(query_type, task_id, fragment_instance_id) +#define SCOPED_ATTACH_TASK_THREAD(type, ...) \ + auto VARNAME_LINENUM(attach_task_thread) = AttachTaskThread(type, ## __VA_ARGS__) namespace doris { @@ -94,7 +93,7 @@ class ThreadContext { // So, kudu Class-scoped static thread local implementation was introduced. Solve the above problem by // Thread-scopedthread local + Class-scoped thread local. // -// This may look very track, but it's the best way I can find. +// This may look very trick, but it's the best way I can find. // // refer to: // https://gcc.gnu.org/onlinedocs/gcc-3.3.1/gcc/Thread-Local.html @@ -133,14 +132,8 @@ inline const std::string ThreadContext::type() const { class AttachTaskThread { public: - explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id, - const TUniqueId& fragment_instance_id) { - DCHECK(task_id != "" && fragment_instance_id != TUniqueId()); - init(type, task_id, fragment_instance_id); - } - - void init(const ThreadContext::TaskType& type, const std::string& task_id, - const TUniqueId& fragment_instance_id) { + explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id = "", + const TUniqueId& fragment_instance_id = TUniqueId()) { thread_local_ctx.get()->attach(type, task_id, fragment_instance_id); } diff --git a/be/src/runtime/threadlocal.h b/be/src/runtime/threadlocal.h index aa7507ddff8a89..a2028154570ecc 100644 --- a/be/src/runtime/threadlocal.h +++ b/be/src/runtime/threadlocal.h @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +// Reference from kudu, Solve the problem of gcc11 compiling +// non-trivial thread_local variables on lower versions of GLIBC. +// see https://github.com/apache/incubator-doris/pull/7911 +// // Block-scoped static thread local implementation. // // Usage is similar to a C++11 thread_local. The BLOCK_STATIC_THREAD_LOCAL macro