From 8ba0d03b4439fc0dd3298462caba47bb0a22578b Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Fri, 9 Aug 2024 08:27:16 -0400 Subject: [PATCH] Bugfix: Clean up ThreadContext pointers when Connection is destroyed Currently ThreadContext Connection* pointers are not removed up when a connection is destroyed. This is only a problem if a Connection instance is destroyed and new Connection is allocated at the same address, because the code assumes pointers uniquely identify connections. This causes a bug in a bitcoin IPC test which creates multiple connections in a loop, described in https://github.com/bitcoin/bitcoin/pull/30509#issuecomment-2276739868, where connections are created and destroyed in a loop, and depending on how the heap allocator behaves, a new Connection could have the same address as a previously destroyed connection, and the code tries to use a thread reference associated with the previous connection when making a new call, and there is a segfault because the thread no longer exists. Fix this problem by adding Connection cleanup callbacks to remove Connection* pointers from the ThreadContext struct if the connection is destroyed before the thread is. --- include/mp/proxy-io.h | 25 +++++++++++-- include/mp/proxy-types.h | 78 +++++++++++++++++++--------------------- src/mp/proxy.cpp | 34 ++++++++++++++++++ 3 files changed, 94 insertions(+), 43 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 12f11613..f22ad3d9 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -14,6 +14,7 @@ #include #include +#include #include #include #include @@ -60,6 +61,18 @@ struct ProxyClient : public ProxyClientBase using ProxyClientBase::ProxyClientBase; // https://stackoverflow.com/questions/22357887/comparing-two-mapiterators-why-does-it-need-the-copy-constructor-of-stdpair ProxyClient(const ProxyClient&) = delete; + ~ProxyClient(); + + void setCleanup(std::function cleanup); + + //! Cleanup function to run when the connection is closed. If the Connection + //! gets destroyed before this ProxyClient object, this cleanup + //! callback lets it destroy this object and remove its entry in the + //! thread's request_threads or callback_threads map (after resetting + //! m_cleanup so the destructor does not try to access it). But if this + //! object gets destroyed before the Connection, there's no need to run the + //! cleanup function and the destructor will unregister it. + std::optional m_cleanup; }; template <> @@ -503,6 +516,14 @@ void ProxyServerBase::invokeDestroy() m_context.cleanup.clear(); } +using ConnThreads = std::map>; +using ConnThread = ConnThreads::iterator; + +// Retrieve ProxyClient object associated with this connection from a +// map, or create a new one and insert it into the map. Return map iterator and +// inserted bool. +std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function make_thread); + struct ThreadContext { //! Identifying string for debug. @@ -517,7 +538,7 @@ struct ThreadContext //! `callbackThread` argument it passes in the request, used by the server //! in case it needs to make callbacks into the client that need to execute //! while the client is waiting. This will be set to a local thread object. - std::map> callback_threads; + ConnThreads callback_threads; //! When client is making a request to a server, this is the `thread` //! argument it passes in the request, used to control which thread on @@ -526,7 +547,7 @@ struct ThreadContext //! by makeThread. If a client call is being made from a thread currently //! handling a server request, this will be set to the `callbackThread` //! request thread argument passed in that request. - std::map> request_threads; + ConnThreads request_threads; //! Whether this thread is a capnp event loop thread. Not really used except //! to assert false if there's an attempt to execute a blocking operation diff --git a/include/mp/proxy-types.h b/include/mp/proxy-types.h index f120268e..c951d07f 100644 --- a/include/mp/proxy-types.h +++ b/include/mp/proxy-types.h @@ -62,36 +62,33 @@ void CustomBuildField(TypeList<>, { auto& connection = invoke_context.connection; auto& thread_context = invoke_context.thread_context; - auto& request_threads = thread_context.request_threads; - auto& callback_threads = thread_context.callback_threads; - - auto callback_thread = callback_threads.find(&connection); - if (callback_thread == callback_threads.end()) { - callback_thread = - callback_threads - .emplace(std::piecewise_construct, std::forward_as_tuple(&connection), - std::forward_as_tuple( - connection.m_threads.add(kj::heap>(thread_context, std::thread{})), - &connection, /* destroy_connection= */ false)) - .first; - } - auto request_thread = request_threads.find(&connection); - if (request_thread == request_threads.end()) { - // This code will only run if IPC client call is being made for the - // first time on a new thread. After the first call, subsequent calls + // Create local Thread::Server object corresponding to the current thread + // and pass a Thread::Client reference to it in the Context.callbackThread + // field so the function being called can make callbacks to this thread. + // Also store the Thread::Client reference in the callback_threads map so + // future calls over this connection can reuse it. + auto [callback_thread, _]{SetThread( + thread_context.callback_threads, thread_context.waiter->m_mutex, &connection, + [&] { return connection.m_threads.add(kj::heap>(thread_context, std::thread{})); })}; + + // Call remote ThreadMap.makeThread function so server will create a + // dedicated worker thread to run function calls from this thread. Store the + // Thread::Client reference it returns in the request_threads map. + auto make_request_thread{[&]{ + // This code will only run if an IPC client call is being made for the + // first time on this thread. After the first call, subsequent calls // will use the existing request thread. This code will also never run at // all if the current thread is a request thread created for a different // IPC client, because in that case PassField code (below) will have set // request_thread to point to the calling thread. auto request = connection.m_thread_map.makeThreadRequest(); request.setName(thread_context.thread_name); - request_thread = - request_threads - .emplace(std::piecewise_construct, std::forward_as_tuple(&connection), - std::forward_as_tuple(request.send().getResult(), &connection, /* destroy_connection= */ false)) - .first; // Nonblocking due to capnp request pipelining. - } + return request.send().getResult(); // Nonblocking due to capnp request pipelining. + }}; + auto [request_thread, _1]{SetThread( + thread_context.request_threads, thread_context.waiter->m_mutex, + &connection, make_request_thread)}; auto context = output.init(); context.setThread(request_thread->second.m_client); @@ -143,24 +140,23 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // call. In this case, the callbackThread value should point // to the same thread already in the map, so there is no // need to update the map. - auto& request_threads = g_thread_context.request_threads; - auto request_thread = request_threads.find(server.m_context.connection); - if (request_thread == request_threads.end()) { - request_thread = - g_thread_context.request_threads - .emplace(std::piecewise_construct, std::forward_as_tuple(server.m_context.connection), - std::forward_as_tuple(context_arg.getCallbackThread(), server.m_context.connection, - /* destroy_connection= */ false)) - .first; - } else { - // The requests_threads map already has an entry for - // this connection, so this must be a recursive call. - // Avoid modifying the map in this case by resetting the - // request_thread iterator, so the KJ_DEFER statement - // below doesn't do anything. - request_thread = request_threads.end(); - } - KJ_DEFER(if (request_thread != request_threads.end()) request_threads.erase(request_thread)); + auto& thread_context = g_thread_context; + auto& request_threads = thread_context.request_threads; + auto [request_thread, inserted]{SetThread( + request_threads, thread_context.waiter->m_mutex, + server.m_context.connection, + [&] { return context_arg.getCallbackThread(); })}; + + // If an entry was inserted into the requests_threads map, + // remove it after calling fn.invoke. If an entry was not + // inserted, one already existed, meaning this must be a + // recursive call (IPC call calling back to the caller which + // makes another IPC call), so avoid modifying the map. + auto erase_thread{inserted ? request_thread : request_threads.end()}; + KJ_DEFER(if (erase_thread != request_threads.end()) { + std::unique_lock lock(thread_context.waiter->m_mutex); + request_threads.erase(erase_thread); + }); fn.invoke(server_context, args...); } KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index 406e50cf..d0dad5fd 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -262,6 +262,40 @@ void EventLoop::startAsyncThread(std::unique_lock& lock) } } +std::tuple SetThread(ConnThreads& threads, std::mutex& mutex, Connection* connection, std::function make_thread) +{ + std::unique_lock lock(mutex); + auto thread = threads.find(connection); + if (thread != threads.end()) return {thread, false}; + thread = threads.emplace( + std::piecewise_construct, std::forward_as_tuple(connection), + std::forward_as_tuple(make_thread(), connection, /* destroy_connection= */ false)).first; + thread->second.setCleanup([&threads, &mutex, thread] { + // Connection is being destroyed before thread client is, so reset + // thread client m_cleanup member so thread client destructor does not + // try unregister this callback after connection is destroyed. + thread->second.m_cleanup.reset(); + // Remove connection pointer about to be destroyed from the map + std::unique_lock lock(mutex); + threads.erase(thread); + }); + return {thread, true}; +} + +ProxyClient::~ProxyClient() +{ + if (m_cleanup) { + m_context.connection->removeSyncCleanup(*m_cleanup); + } +} + +void ProxyClient::setCleanup(std::function cleanup) +{ + assert(cleanup); + assert(!m_cleanup); + m_cleanup = m_context.connection->addSyncCleanup(cleanup); +} + ProxyServer::ProxyServer(ThreadContext& thread_context, std::thread&& thread) : m_thread_context(thread_context), m_thread(std::move(thread)) {