From b69c1a1905a50d2699878e911573d340e8d712d8 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 16:02:45 +0000 Subject: [PATCH 01/15] fix: avoid dangling pipes during failed `WakeupPipe` initialization --- src/util/wpipe.cpp | 28 ++++++++++++++++++++-------- src/util/wpipe.h | 4 ++++ 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp index e3e3979256c1..5a4ca3761f30 100644 --- a/src/util/wpipe.cpp +++ b/src/util/wpipe.cpp @@ -21,15 +21,17 @@ WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) } for (size_t idx = 0; idx < m_pipe.size(); idx++) { int flags = fcntl(m_pipe[idx], F_GETFL, 0); - if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) { + if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == SOCKET_ERROR) { LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed with error %s\n", idx, NetworkErrorString(WSAGetLastError())); + Close(); return; } } if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) { LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed for m_pipe[0] = %d\n", m_pipe[0]); + Close(); return; } m_valid = true; @@ -46,18 +48,28 @@ WakeupPipe::~WakeupPipe() LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n", m_pipe[0]); } - for (size_t idx = 0; idx < m_pipe.size(); idx++) { - if (close(m_pipe[idx]) != 0) { - LogPrintf("Destroying WakeupPipe instance, close() failed for m_pipe[%d] = %d with error %s\n", - idx, m_pipe[idx], NetworkErrorString(WSAGetLastError())); - } - } + Close(); #else assert(false); #endif /* USE_WAKEUP_PIPE */ } } +void WakeupPipe::Close() +{ +#ifdef USE_WAKEUP_PIPE + for (size_t idx{0}; idx < m_pipe.size(); idx++) { + if (m_pipe[idx] != -1 && close(m_pipe[idx]) != 0) { + LogPrintf("close() failed for m_pipe[%d] = %d with error %s\n", idx, m_pipe[idx], + NetworkErrorString(WSAGetLastError())); + } + m_pipe[idx] = -1; + } +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} + void WakeupPipe::Drain() const { #ifdef USE_WAKEUP_PIPE @@ -80,7 +92,7 @@ void WakeupPipe::Write() std::array buf{}; int ret = write(m_pipe[1], buf.data(), buf.size()); - if (ret == -1) { + if (ret == SOCKET_ERROR) { LogPrintf("write() to m_pipe[1] = %d failed with error %s\n", m_pipe[1], NetworkErrorString(WSAGetLastError())); } if (ret != EXPECTED_PIPE_WRITTEN_BYTES) { diff --git a/src/util/wpipe.h b/src/util/wpipe.h index 30912149b48d..e936899ea71c 100644 --- a/src/util/wpipe.h +++ b/src/util/wpipe.h @@ -21,6 +21,10 @@ class EdgeTriggeredEvents; */ class WakeupPipe { +private: + /* Iterate through m_pipe and ::close() them */ + void Close(); + public: explicit WakeupPipe(EdgeTriggeredEvents* edge_trig_events); ~WakeupPipe(); From 7cffc0b21b0c8811defc60b7f45df193b741279b Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 27 Jun 2025 08:50:18 +0000 Subject: [PATCH 02/15] fix: drain before winding down `WakeupPipe` object to avoid `SIGPIPE` --- src/util/wpipe.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp index 5a4ca3761f30..b04c323af077 100644 --- a/src/util/wpipe.cpp +++ b/src/util/wpipe.cpp @@ -44,6 +44,7 @@ WakeupPipe::~WakeupPipe() { if (m_valid) { #ifdef USE_WAKEUP_PIPE + Drain(); if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n", m_pipe[0]); From ca1ec0b92ae4109df4b128c23d7b32ecd51c69ab Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Fri, 27 Jun 2025 10:39:07 +0000 Subject: [PATCH 03/15] net: split out `poll` and `select` variants from `Sock::Wait()` --- src/util/sock.cpp | 16 ++++++++++++++-- src/util/sock.h | 4 ++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/src/util/sock.cpp b/src/util/sock.cpp index d379467b0eb0..715cb38d2a9c 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -143,6 +143,15 @@ bool Sock::IsSelectable() const bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const { #ifdef USE_POLL + return WaitPoll(timeout, requested, occurred); +#else + return WaitSelect(timeout, requested, occurred); +#endif /* USE_POLL */ +} + +#ifdef USE_POLL +bool Sock::WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +{ pollfd fd; fd.fd = m_socket; fd.events = 0; @@ -171,7 +180,11 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur } return true; -#else +} +#endif /* USE_POLL */ + +bool Sock::WaitSelect(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +{ if (!IsSelectable()) { return false; } @@ -213,7 +226,6 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occur } return true; -#endif /* USE_POLL */ } void Sock::SendComplete(const std::string& data, diff --git a/src/util/sock.h b/src/util/sock.h index fbd1909d60f9..e99f04a95edc 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -221,6 +221,10 @@ class Sock [[nodiscard]] virtual bool Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; +#ifdef USE_POLL + bool WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; +#endif /* USE_POLL */ + bool WaitSelect(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; /* Higher level, convenience, methods. These may throw. */ From 41eaed26ee8aa7c5528e4986bef5a8d13e8b8e0a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 09:52:31 +0000 Subject: [PATCH 04/15] net: allow selection of `Wait()` API by specifying `SocketEventsMode` No behavior has changed as we don't support any additional APIs and SEM_LT_DEFAULT preserves old behavior but upcoming commits will utilize this to able to effectuate the preferences set by `-socketevents`. --- src/i2p.cpp | 2 +- src/netbase.cpp | 2 +- src/test/fuzz/util.cpp | 2 +- src/test/fuzz/util.h | 2 +- src/test/util/net.h | 1 + src/util/sock.cpp | 30 +++++++++++++++++++++++++++++- src/util/sock.h | 8 ++++++++ 7 files changed, 42 insertions(+), 5 deletions(-) diff --git a/src/i2p.cpp b/src/i2p.cpp index 76a0b6196a80..fa02e48a6eff 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -160,7 +160,7 @@ bool Session::Accept(Connection& conn) while (!*m_interrupt) { Sock::Event occurred; - if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) { + if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SEM_LT_DEFAULT, &occurred)) { errmsg = "wait on socket failed"; break; } diff --git a/src/netbase.cpp b/src/netbase.cpp index 319bc7c9d88d..ea8877cb4386 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -572,7 +572,7 @@ static bool ConnectToSocket(const Sock& sock, struct sockaddr* sockaddr, socklen // synchronously to check for successful connection with a timeout. const Sock::Event requested = Sock::RECV | Sock::SEND; Sock::Event occurred; - if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, &occurred)) { + if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SEM_LT_DEFAULT, &occurred)) { LogPrintf("wait for connect to %s failed: %s\n", dest_str, NetworkErrorString(WSAGetLastError())); diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index 2f44a69561da..a58e15ec29cb 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -270,7 +270,7 @@ bool FuzzedSock::IsSelectable() const return m_selectable; } -bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const { constexpr std::array wait_errnos{ EBADF, diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index 5273ea4e3743..33b3e9809984 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -89,7 +89,7 @@ class FuzzedSock : public Sock bool IsSelectable() const override; - bool Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const override; + bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override; bool IsConnected(std::string& errmsg) const override; }; diff --git a/src/test/util/net.h b/src/test/util/net.h index dd29d8675cc0..670f65914c31 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -202,6 +202,7 @@ class StaticContentsSock : public Sock bool Wait(std::chrono::milliseconds timeout, Event requested, + SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override { if (occurred != nullptr) { diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 715cb38d2a9c..3827ac7d8817 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -140,8 +140,36 @@ bool Sock::IsSelectable() const #endif } -bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const { + std::string debug_str; + + switch (event_mode) + { + case SocketEventsMode::Poll: +#ifdef USE_POLL + return WaitPoll(timeout, requested, occurred); +#else + debug_str += "Sock::Wait -- Support for poll not compiled in, falling back on "; + break; +#endif /* USE_POLL */ + case SocketEventsMode::Select: + return WaitSelect(timeout, requested, occurred); + case SocketEventsMode::EPoll: + debug_str += "Sock::Wait -- Unimplemented for epoll, falling back on "; + break; + case SocketEventsMode::KQueue: + debug_str += "Sock::Wait -- Unimplemented for kqueue, falling back on "; + break; + default: + assert(false); + } +#ifdef USE_POLL + debug_str += "poll"; +#else + debug_str += "select"; +#endif /* USE_POLL*/ + LogPrint(BCLog::NET, "%s\n", debug_str); #ifdef USE_POLL return WaitPoll(timeout, requested, occurred); #else diff --git a/src/util/sock.h b/src/util/sock.h index e99f04a95edc..f1e29adb9c4a 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -28,6 +28,12 @@ enum class SocketEventsMode : int8_t { Unknown = -1 }; +#ifdef USE_POLL +#define SEM_LT_DEFAULT SocketEventsMode::Poll +#else +#define SEM_LT_DEFAULT SocketEventsMode::Select +#endif /* USE_POLL */ + /* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ constexpr std::string_view SEMToString(const SocketEventsMode val) { switch (val) { @@ -212,6 +218,7 @@ class Sock * Wait for readiness for input (recv) or output (send). * @param[in] timeout Wait this much for at least one of the requested events to occur. * @param[in] requested Wait for those events, bitwise-or of `RECV` and `SEND`. + * @param[in] event_mode Wait using the API specified. * @param[out] occurred If not nullptr and the function returns `true`, then this * indicates which of the requested events occurred (`ERR` will be added, even if * not requested, if an exceptional event occurs on the socket). @@ -220,6 +227,7 @@ class Sock */ [[nodiscard]] virtual bool Wait(std::chrono::milliseconds timeout, Event requested, + SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const; #ifdef USE_POLL bool WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; From a33f88f3663e20337f3d25cf6f27c0b06d766b2a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 10:06:18 +0000 Subject: [PATCH 05/15] net: reintroduce `IsSelectableSocket()` and make it SEM-aware `IsSelectableSocket()` was subsumed into `Sock` for mockability's sake but creates problems when our event-socket map uses the raw `SOCKET` and not `Sock`, so we need to bring it back. On top of that, `IsSelectableSocket()`'s behavior was defined at compile-time when it should've been sensitive to the runtime capabilities we give it. Currently, this does not cause a change in behavior but makes way for future changes. --- src/test/fuzz/util.cpp | 2 +- src/test/fuzz/util.h | 2 +- src/test/util/net.h | 2 +- src/util/sock.cpp | 19 ++++++++++++------- src/util/sock.h | 2 +- 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index a58e15ec29cb..58ecd1fead58 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -265,7 +265,7 @@ bool FuzzedSock::SetNonBlocking() const return true; } -bool FuzzedSock::IsSelectable() const +bool FuzzedSock::IsSelectable(bool is_select) const { return m_selectable; } diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index 33b3e9809984..c9a53eb7a401 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -87,7 +87,7 @@ class FuzzedSock : public Sock bool SetNonBlocking() const override; - bool IsSelectable() const override; + bool IsSelectable(bool is_select) const override; bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override; diff --git a/src/test/util/net.h b/src/test/util/net.h index 670f65914c31..47add71da89d 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -198,7 +198,7 @@ class StaticContentsSock : public Sock bool SetNonBlocking() const override { return true; } - bool IsSelectable() const override { return true; } + bool IsSelectable(bool is_select) const override { return true; } bool Wait(std::chrono::milliseconds timeout, Event requested, diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 3827ac7d8817..5e521de1f687 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -24,6 +24,15 @@ static inline bool IOErrorIsPermanent(int err) return err != WSAEAGAIN && err != WSAEINTR && err != WSAEWOULDBLOCK && err != WSAEINPROGRESS; } +static inline bool IsSelectableSocket(const SOCKET& s, bool is_select) +{ +#if defined(WIN32) + return true; +#else + return is_select ? (s < FD_SETSIZE) : true; +#endif +} + Sock::Sock() : m_socket(INVALID_SOCKET) {} Sock::Sock(SOCKET s) : m_socket(s) {} @@ -131,13 +140,9 @@ bool Sock::SetNonBlocking() const return true; } -bool Sock::IsSelectable() const +bool Sock::IsSelectable(bool is_select) const { -#if defined(USE_POLL) || defined(WIN32) - return true; -#else - return m_socket < FD_SETSIZE; -#endif + return IsSelectableSocket(m_socket, is_select); } bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const @@ -213,7 +218,7 @@ bool Sock::WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* o bool Sock::WaitSelect(std::chrono::milliseconds timeout, Event requested, Event* occurred) const { - if (!IsSelectable()) { + if (!IsSelectableSocket(m_socket, /*is_select=*/true)) { return false; } diff --git a/src/util/sock.h b/src/util/sock.h index f1e29adb9c4a..05b1dbc24617 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -194,7 +194,7 @@ class Sock * Check if the underlying socket can be used for `select(2)` (or the `Wait()` method). * @return true if selectable */ - [[nodiscard]] virtual bool IsSelectable() const; + [[nodiscard]] virtual bool IsSelectable(bool is_select = (SEM_LT_DEFAULT == SocketEventsMode::Select)) const; using Event = uint8_t; From 4a7114fbf6838292b451fb30910ddca34354944f Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 29 Jun 2025 13:17:36 +0000 Subject: [PATCH 06/15] refactor: clean up `CConnman::SocketWait{Epoll,Kqueue}()` logic --- src/net.cpp | 61 ++++++++++++++++++++++++++--------------------- src/util/sock.h | 2 ++ src/util/time.cpp | 13 ++++++++++ src/util/time.h | 2 ++ 4 files changed, 51 insertions(+), 27 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index 97c457979afb..c2ea90a323a6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2384,32 +2384,31 @@ void CConnman::SocketEventsKqueue(std::set& recv_set, std::set& error_set, bool only_poll) { - const size_t maxEvents = 64; - struct kevent events[maxEvents]; + std::array events{}; + struct timespec timeout = MillisToTimespec(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - struct timespec timeout; - timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; - timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; - - int n{-1}; - ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events, maxEvents, &timeout);}); - if (n == -1) { + int ret{-1}; + ToggleWakeupPipe([&](){ + ret = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events.data(), events.size(), &timeout); + }); + if (ret == SOCKET_ERROR) { LogPrintf("kevent wait error\n"); - } else if (n > 0) { - for (int i = 0; i < n; i++) { - auto& event = events[i]; - if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) { - error_set.insert((SOCKET)event.ident); - continue; - } + return; + } - if (event.filter == EVFILT_READ) { - recv_set.insert((SOCKET)event.ident); - } + for (int i = 0; i < ret; i++) { + auto& event = events[i]; + if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) { + error_set.insert((SOCKET)event.ident); + continue; + } - if (event.filter == EVFILT_WRITE) { - send_set.insert((SOCKET)event.ident); - } + if (event.filter == EVFILT_READ) { + recv_set.insert((SOCKET)event.ident); + } + + if (event.filter == EVFILT_WRITE) { + send_set.insert((SOCKET)event.ident); } } } @@ -2421,12 +2420,20 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, std::set& error_set, bool only_poll) { - const size_t maxEvents = 64; - epoll_event events[maxEvents]; + std::array events{}; + + int ret{-1}; + ToggleWakeupPipe([&](){ + ret = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events.data(), events.size(), + only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + }); + + if (ret == SOCKET_ERROR) { + LogPrintf("epoll_wait error\n"); + return; + } - int n{-1}; - ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); - for (int i = 0; i < n; i++) { + for (int i = 0; i < ret; i++) { auto& e = events[i]; if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { error_set.insert((SOCKET)e.data.fd); diff --git a/src/util/sock.h b/src/util/sock.h index 05b1dbc24617..237abcad6e09 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -19,6 +19,8 @@ */ static constexpr auto MAX_WAIT_FOR_IO = 1s; +static constexpr size_t MAX_EVENTS = 64; + enum class SocketEventsMode : int8_t { Select = 0, Poll = 1, diff --git a/src/util/time.cpp b/src/util/time.cpp index acfe845d25d0..e04f43ae6151 100644 --- a/src/util/time.cpp +++ b/src/util/time.cpp @@ -108,6 +108,14 @@ int64_t ParseISO8601DateTime(const std::string& str) return (ptime - epoch).total_seconds(); } +struct timespec MillisToTimespec(int64_t nTimeout) +{ + struct timespec timeout; + timeout.tv_sec = nTimeout / 1000; + timeout.tv_nsec = (nTimeout % 1000) * 1000 * 1000; + return timeout; +} + struct timeval MillisToTimeval(int64_t nTimeout) { struct timeval timeout; @@ -116,6 +124,11 @@ struct timeval MillisToTimeval(int64_t nTimeout) return timeout; } +struct timespec MillisToTimespec(std::chrono::milliseconds ms) +{ + return MillisToTimespec(count_milliseconds(ms)); +} + struct timeval MillisToTimeval(std::chrono::milliseconds ms) { return MillisToTimeval(count_milliseconds(ms)); diff --git a/src/util/time.h b/src/util/time.h index 7334293c60c7..d1fca85e85a6 100644 --- a/src/util/time.h +++ b/src/util/time.h @@ -116,10 +116,12 @@ int64_t ParseISO8601DateTime(const std::string& str); * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(int64_t nTimeout); +struct timespec MillisToTimespec(int64_t nTimeout); /** * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(std::chrono::milliseconds ms); +struct timespec MillisToTimespec(std::chrono::milliseconds ms); #endif // BITCOIN_UTIL_TIME_H From 0a8b8a68d4d40c6f538fc0d126be4010e188c7f2 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 29 Jun 2025 13:16:56 +0000 Subject: [PATCH 07/15] merge bitcoin#24356: replace CConnman::SocketEvents() with mockable Sock::WaitMany() Co-authored-by: UdjinM6 continuation of 8b23bfba from dash#6630 --- src/net.cpp | 292 +++++++++-------------------------------- src/net.h | 49 +------ src/test/fuzz/util.cpp | 9 ++ src/test/fuzz/util.h | 2 + src/test/util/net.h | 9 ++ src/util/sock.cpp | 134 +++++++++++-------- src/util/sock.h | 57 +++++++- 7 files changed, 223 insertions(+), 329 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index c2ea90a323a6..11f5d84739b2 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -56,10 +56,6 @@ #include #endif -#ifdef USE_POLL -#include -#endif - #ifdef USE_EPOLL #include #endif @@ -2338,13 +2334,12 @@ bool CConnman::InactivityCheck(const CNode& node) const return false; } -bool CConnman::GenerateSelectSet(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set) +Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) { + Sock::EventsPerSock events_per_sock; + for (const ListenSocket& hListenSocket : vhListenSocket) { - recv_set.insert(hListenSocket.sock->Get()); + events_per_sock.emplace(hListenSocket.sock->Get(), Sock::Events{Sock::RECV}); } for (CNode* pnode : nodes) { @@ -2357,13 +2352,15 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, continue; } - error_set.insert(pnode->m_sock->Get()); + Sock::Event requested{0}; if (select_send) { - send_set.insert(pnode->m_sock->Get()); + requested |= Sock::SEND; } if (select_recv) { - recv_set.insert(pnode->m_sock->Get()); + requested |= Sock::RECV; } + + events_per_sock.emplace(pnode->m_sock->Get(), Sock::Events{requested}); } if (m_wakeup_pipe) { @@ -2372,10 +2369,10 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually // run on Linux and friends. - recv_set.insert(m_wakeup_pipe->m_pipe[0]); + events_per_sock.emplace(m_wakeup_pipe->m_pipe[0], Sock::Events{Sock::RECV}); } - return !recv_set.empty() || !send_set.empty() || !error_set.empty(); + return events_per_sock; } #ifdef USE_KQUEUE @@ -2451,173 +2448,11 @@ void CConnman::SocketEventsEpoll(std::set& recv_set, } #endif -#ifdef USE_POLL -void CConnman::SocketEventsPoll(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { - if (!only_poll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); - return; - } - - std::unordered_map pollfds; - for (SOCKET socket_id : recv_select_set) { - pollfds[socket_id].fd = socket_id; - pollfds[socket_id].events |= POLLIN; - } - - for (SOCKET socket_id : send_select_set) { - pollfds[socket_id].fd = socket_id; - pollfds[socket_id].events |= POLLOUT; - } - - for (SOCKET socket_id : error_select_set) { - pollfds[socket_id].fd = socket_id; - // These flags are ignored, but we set them for clarity - pollfds[socket_id].events |= POLLERR|POLLHUP; - } - - std::vector vpollfds; - vpollfds.reserve(pollfds.size()); - for (auto it : pollfds) { - vpollfds.push_back(std::move(it.second)); - } - - int r{-1}; - ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); - if (r < 0) { - return; - } - - if (interruptNet) return; - - for (struct pollfd pollfd_entry : vpollfds) { - if (pollfd_entry.revents & POLLIN) recv_set.insert(pollfd_entry.fd); - if (pollfd_entry.revents & POLLOUT) send_set.insert(pollfd_entry.fd); - if (pollfd_entry.revents & (POLLERR|POLLHUP)) error_set.insert(pollfd_entry.fd); - } -} -#endif - -void CConnman::SocketEventsSelect(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { - interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); - return; - } - - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - - for (SOCKET hSocket : recv_select_set) { - FD_SET(hSocket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hSocket); - } - - for (SOCKET hSocket : send_select_set) { - FD_SET(hSocket, &fdsetSend); - hSocketMax = std::max(hSocketMax, hSocket); - } - - for (SOCKET hSocket : error_select_set) { - FD_SET(hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, hSocket); - } - - int nSelect{-1}; - ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);}); - if (interruptNet) - return; - - if (nSelect == SOCKET_ERROR) - { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) - FD_SET(i, &fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS))) - return; - } - - for (SOCKET hSocket : recv_select_set) { - if (FD_ISSET(hSocket, &fdsetRecv)) { - recv_set.insert(hSocket); - } - } - - for (SOCKET hSocket : send_select_set) { - if (FD_ISSET(hSocket, &fdsetSend)) { - send_set.insert(hSocket); - } - } - - for (SOCKET hSocket : error_select_set) { - if (FD_ISSET(hSocket, &fdsetError)) { - error_set.insert(hSocket); - } - } -} - -void CConnman::SocketEvents(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - switch (socketEventsMode) { -#ifdef USE_KQUEUE - case SocketEventsMode::KQueue: - SocketEventsKqueue(recv_set, send_set, error_set, only_poll); - break; -#endif -#ifdef USE_EPOLL - case SocketEventsMode::EPoll: - SocketEventsEpoll(recv_set, send_set, error_set, only_poll); - break; -#endif -#ifdef USE_POLL - case SocketEventsMode::Poll: - SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); - break; -#endif - case SocketEventsMode::Select: - SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); - break; - default: - assert(false); - } -} - void CConnman::SocketHandler(CMasternodeSync& mn_sync) { AssertLockNotHeld(m_total_bytes_sent_mutex); - std::set recv_set; - std::set send_set; - std::set error_set; + Sock::EventsPerSock events_per_sock; bool only_poll = [this]() { // Check if we have work to do and thus should avoid waiting for events @@ -2637,72 +2472,64 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) { const NodesSnapshot snap{*this, /* cond = */ CConnman::AllNodes, /* shuffle = */ false}; + const auto timeout = std::chrono::milliseconds(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + const bool is_lt = socketEventsMode == SocketEventsMode::Poll || socketEventsMode == SocketEventsMode::Select; + // Check for the readiness of the already connected sockets and the // listening sockets in one call ("readiness" as in poll(2) or // select(2)). If none are ready, wait for a short while and return // empty sets. - SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); + events_per_sock = GenerateWaitSockets(snap.Nodes()); + if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, socketEventsMode)) { + if (is_lt) { + interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + } + } - // Drain the wakeup pipe - if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) { - m_wakeup_pipe->Drain(); - } + // Drain the wakeup pipe + if (m_wakeup_pipe && events_per_sock.find(m_wakeup_pipe->m_pipe[0]) != events_per_sock.end()) { + m_wakeup_pipe->Drain(); + } // Service (send/receive) each of the already connected nodes. - SocketHandlerConnected(recv_set, send_set, error_set); + SocketHandlerConnected(events_per_sock); } // Accept new connections from listening sockets. - SocketHandlerListening(recv_set, mn_sync); + SocketHandlerListening(events_per_sock, mn_sync); } -void CConnman::SocketHandlerConnected(const std::set& recv_set, - const std::set& send_set, - const std::set& error_set) +void CConnman::SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) { AssertLockNotHeld(m_total_bytes_sent_mutex); if (interruptNet) return; - std::set vErrorNodes; - std::set vReceivableNodes; - std::set vSendableNodes; + std::set node_err_set; + std::set node_recv_set; + std::set node_send_set; { LOCK(cs_mapSocketToNode); - for (auto hSocket : error_set) { - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; + for (const auto& [sock, events] : events_per_sock) { + auto it = mapSocketToNode.find(sock); + if (it == mapSocketToNode.end()) continue; + if (events.occurred & Sock::ERR) { + it->second->AddRef(); + node_err_set.emplace(it->second); } - it->second->AddRef(); - vErrorNodes.emplace(it->second); - } - for (auto hSocket : recv_set) { - if (error_set.count(hSocket)) { - // no need to handle it twice - continue; + if (events.occurred & Sock::RECV) { + if (events.occurred & Sock::ERR) continue; + LOCK(cs_sendable_receivable_nodes); + auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fHasRecvData = true; } - - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; + if (events.occurred & Sock::SEND) { + LOCK(cs_sendable_receivable_nodes); + auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fCanSendData = true; } - - LOCK(cs_sendable_receivable_nodes); - auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); - assert(jt.first->second == it->second); - it->second->fHasRecvData = true; - } - for (auto hSocket : send_set) { - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; - } - - LOCK(cs_sendable_receivable_nodes); - auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); - assert(jt.first->second == it->second); - it->second->fCanSendData = true; } } @@ -2719,17 +2546,17 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect && (!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) { pnode->AddRef(); - vReceivableNodes.emplace(pnode); + node_recv_set.emplace(pnode); } // Collect nodes that have data to send and have a socket with non-empty write buffers if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) { pnode->AddRef(); - vSendableNodes.emplace(pnode); + node_send_set.emplace(pnode); } }); - for (CNode* pnode : vSendableNodes) { + for (CNode* pnode : node_send_set) { if (interruptNet) { break; } @@ -2746,13 +2573,13 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // sending actually succeeded to make sure progress is always made; otherwise a // deadlock would be possible when both sides have data to send, but neither is // receiving. - if (data_left && vReceivableNodes.erase(pnode)) { + if (data_left && node_recv_set.erase(pnode)) { pnode->Release(); } } } - for (CNode* pnode : vErrorNodes) + for (CNode* pnode : node_err_set) { if (interruptNet) { break; @@ -2761,7 +2588,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (CNode* pnode : vReceivableNodes) + for (CNode* pnode : node_recv_set) { if (interruptNet) { break; @@ -2773,13 +2600,13 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (auto& node : vErrorNodes) { + for (auto& node : node_err_set) { node->Release(); } - for (auto& node : vReceivableNodes) { + for (auto& node : node_recv_set) { node->Release(); } - for (auto& node : vSendableNodes) { + for (auto& node : node_send_set) { node->Release(); } @@ -2811,13 +2638,14 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, } } -void CConnman::SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) +void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock, CMasternodeSync& mn_sync) { for (const ListenSocket& listen_socket : vhListenSocket) { if (interruptNet) { return; } - if (recv_set.count(listen_socket.sock->Get()) > 0) { + const auto it = events_per_sock.find(listen_socket.sock->Get()); + if (it != events_per_sock.end() && (it->second.occurred & Sock::RECV)) { AcceptConnection(listen_socket, mn_sync); } } diff --git a/src/net.h b/src/net.h index e353f8a09542..81f609897cbc 100644 --- a/src/net.h +++ b/src/net.h @@ -1619,30 +1619,9 @@ friend class CNode; /** * Generate a collection of sockets to check for IO readiness. * @param[in] nodes Select from these nodes' sockets. - * @param[out] recv_set Sockets to check for read readiness. - * @param[out] send_set Sockets to check for write readiness. - * @param[out] error_set Sockets to check for errors. - * @return true if at least one socket is to be checked (the returned set is not empty) + * @return sockets to check for readiness */ - bool GenerateSelectSet(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set); - - /** - * Check which sockets are ready for IO. - * @param[in] nodes Select from these nodes' sockets (in supported event methods). - * @param[in] only_poll Permit zero timeout polling - * @param[out] recv_set Sockets which are ready for read. - * @param[out] send_set Sockets which are ready for write. - * @param[out] error_set Sockets which have errors. - * This calls `GenerateSelectSet()` to gather a list of sockets to check. - */ - void SocketEvents(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); + Sock::EventsPerSock GenerateWaitSockets(Span nodes); #ifdef USE_KQUEUE void SocketEventsKqueue(std::set& recv_set, @@ -1656,18 +1635,6 @@ friend class CNode; std::set& error_set, bool only_poll); #endif -#ifdef USE_POLL - void SocketEventsPoll(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif - void SocketEventsSelect(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); /** * Check connected and listening sockets for IO readiness and process them accordingly. @@ -1676,20 +1643,16 @@ friend class CNode; /** * Do the read/write for connected sockets that are ready for IO. - * @param[in] recv_set Sockets that are ready for read. - * @param[in] send_set Sockets that are ready for send. - * @param[in] error_set Sockets that have an exceptional condition (error). + * @param[in] events_per_sock Sockets that are ready for IO. */ - void SocketHandlerConnected(const std::set& recv_set, - const std::set& send_set, - const std::set& error_set) + void SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); /** * Accept incoming connections, one from each read-ready listening socket. - * @param[in] recv_set Sockets that are ready for read. + * @param[in] events_per_sock Sockets that are ready for IO. */ - void SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) + void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock, CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadSocketHandler(CMasternodeSync& mn_sync) diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index 58ecd1fead58..c285994f5b44 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -287,6 +287,15 @@ bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Socket return true; } +bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const +{ + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = m_fuzzed_data_provider.ConsumeBool() ? events.requested : 0; + } + return true; +} + bool FuzzedSock::IsConnected(std::string& errmsg) const { if (m_fuzzed_data_provider.ConsumeBool()) { diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index c9a53eb7a401..2a2125ec6a0c 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -91,6 +91,8 @@ class FuzzedSock : public Sock bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override; + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override; + bool IsConnected(std::string& errmsg) const override; }; diff --git a/src/test/util/net.h b/src/test/util/net.h index 47add71da89d..c7b786b5645f 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -211,6 +211,15 @@ class StaticContentsSock : public Sock return true; } + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override + { + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = events.requested; + } + return true; + } + private: const std::string m_contents; mutable size_t m_consumed; diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 5e521de1f687..a8932a51adab 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -146,6 +146,26 @@ bool Sock::IsSelectable(bool is_select) const } bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const +{ + EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})}; + + if (!WaitMany(timeout, events_per_sock)) { + return false; + } + + if (occurred != nullptr) { + *occurred = events_per_sock.begin()->second.occurred; + } + + return true; +} + +bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const +{ + return WaitManyInternal(timeout, events_per_sock, event_mode); +} + +bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) { std::string debug_str; @@ -153,13 +173,13 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents { case SocketEventsMode::Poll: #ifdef USE_POLL - return WaitPoll(timeout, requested, occurred); + return WaitManyPoll(timeout, events_per_sock); #else debug_str += "Sock::Wait -- Support for poll not compiled in, falling back on "; break; #endif /* USE_POLL */ case SocketEventsMode::Select: - return WaitSelect(timeout, requested, occurred); + return WaitManySelect(timeout, events_per_sock); case SocketEventsMode::EPoll: debug_str += "Sock::Wait -- Unimplemented for epoll, falling back on "; break; @@ -176,85 +196,95 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents #endif /* USE_POLL*/ LogPrint(BCLog::NET, "%s\n", debug_str); #ifdef USE_POLL - return WaitPoll(timeout, requested, occurred); + return WaitManyPoll(timeout, events_per_sock); #else - return WaitSelect(timeout, requested, occurred); + return WaitManySelect(timeout, events_per_sock); #endif /* USE_POLL */ } #ifdef USE_POLL -bool Sock::WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) { - pollfd fd; - fd.fd = m_socket; - fd.events = 0; - if (requested & RECV) { - fd.events |= POLLIN; - } - if (requested & SEND) { - fd.events |= POLLOUT; + std::vector pfds; + for (const auto& [socket, events] : events_per_sock) { + pfds.emplace_back(); + auto& pfd = pfds.back(); + pfd.fd = socket; + if (events.requested & RECV) { + pfd.events |= POLLIN; + } + if (events.requested & SEND) { + pfd.events |= POLLOUT; + } } - if (poll(&fd, 1, count_milliseconds(timeout)) == SOCKET_ERROR) { + if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) { return false; } - if (occurred != nullptr) { - *occurred = 0; - if (fd.revents & POLLIN) { - *occurred |= RECV; + assert(pfds.size() == events_per_sock.size()); + size_t i{0}; + for (auto& [socket, events] : events_per_sock) { + assert(socket == static_cast(pfds[i].fd)); + events.occurred = 0; + if (pfds[i].revents & POLLIN) { + events.occurred |= RECV; } - if (fd.revents & POLLOUT) { - *occurred |= SEND; + if (pfds[i].revents & POLLOUT) { + events.occurred |= SEND; } - if (fd.revents & (POLLERR | POLLHUP)) { - *occurred |= ERR; + if (pfds[i].revents & (POLLERR | POLLHUP)) { + events.occurred |= ERR; } + ++i; } return true; } #endif /* USE_POLL */ -bool Sock::WaitSelect(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) { - if (!IsSelectableSocket(m_socket, /*is_select=*/true)) { - return false; - } - - fd_set fdset_recv; - fd_set fdset_send; - fd_set fdset_err; - FD_ZERO(&fdset_recv); - FD_ZERO(&fdset_send); - FD_ZERO(&fdset_err); - - if (requested & RECV) { - FD_SET(m_socket, &fdset_recv); - } - - if (requested & SEND) { - FD_SET(m_socket, &fdset_send); + fd_set recv; + fd_set send; + fd_set err; + FD_ZERO(&recv); + FD_ZERO(&send); + FD_ZERO(&err); + SOCKET socket_max{0}; + + for (const auto& [sock, events] : events_per_sock) { + if (!IsSelectableSocket(sock, /*is_select=*/true)) { + return false; + } + const auto& s = sock; + if (events.requested & RECV) { + FD_SET(s, &recv); + } + if (events.requested & SEND) { + FD_SET(s, &send); + } + FD_SET(s, &err); + socket_max = std::max(socket_max, s); } - FD_SET(m_socket, &fdset_err); - - timeval timeout_struct = MillisToTimeval(timeout); + timeval tv = MillisToTimeval(timeout); - if (select(m_socket + 1, &fdset_recv, &fdset_send, &fdset_err, &timeout_struct) == SOCKET_ERROR) { + if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) { return false; } - if (occurred != nullptr) { - *occurred = 0; - if (FD_ISSET(m_socket, &fdset_recv)) { - *occurred |= RECV; + for (auto& [sock, events] : events_per_sock) { + const auto& s = sock; + events.occurred = 0; + if (FD_ISSET(s, &recv)) { + events.occurred |= RECV; } - if (FD_ISSET(m_socket, &fdset_send)) { - *occurred |= SEND; + if (FD_ISSET(s, &send)) { + events.occurred |= SEND; } - if (FD_ISSET(m_socket, &fdset_err)) { - *occurred |= ERR; + if (FD_ISSET(s, &err)) { + events.occurred |= ERR; } } diff --git a/src/util/sock.h b/src/util/sock.h index 237abcad6e09..1bbf4e2e959c 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -12,6 +12,7 @@ #include #include #include +#include /** * Maximum time to wait for I/O readiness. @@ -231,10 +232,62 @@ class Sock Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const; + /** + * Auxiliary requested/occurred events to wait for in `WaitMany()`. + */ + struct Events { + explicit Events(Event req) : requested{req}, occurred{0} {} + Event requested; + Event occurred; + }; + + /** + * On which socket to wait for what events in `WaitMany()`. + * + * Bitcoin: + * The `shared_ptr` is copied into the map to ensure that the `Sock` object + * is not destroyed (its destructor would close the underlying socket). + * If this happens shortly before or after we call `poll(2)` and a new + * socket gets created under the same file descriptor number then the report + * from `WaitMany()` will be bogus. + * + * Dash: + * The raw `SOCKET` file descriptor is copied into the map (generally taken from + * Sock::Get()) to allow sockets managed by external logic (e.g. WakeupPipes) to + * be used without wrapping it into a Sock object and risk handing control over. + */ + using EventsPerSock = std::unordered_map; + + /** + * Same as `Wait()`, but wait on many sockets within the same timeout. + * @param[in] timeout Wait this long for at least one of the requested events to occur. + * @param[in,out] events_per_sock Wait for the requested events on these sockets and set + * `occurred` for the events that actually occurred. + * @return true on success (or timeout, if all `what[].occurred` are returned as 0), + * false otherwise + */ + [[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsMode event_mode = SEM_LT_DEFAULT) const; + + /** + * As an EventsPerSock map no longer contains a Sock object (it now contains the raw SOCKET file + * descriptor), we lose access to all the logic implemented in Sock. Except that as WaitMany + * doesn't interact with the raw socket stored within Sock, it can be safely declared as static + * and we can pass all other parameters as arguments as it should ordinarily remain the same for + * entire runtime duration of the program. We keep around the virtual WaitMany to allow mockability + * in tests, so keep in mind that using WaitManyInternal *bypasses* any override for WaitMany. + * + * This doesn't apply to Sock::Wait(), as it populates an EventsPerSock map with its own raw + * socket before passing it to WaitMany. + */ + static bool WaitManyInternal(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsMode event_mode = SEM_LT_DEFAULT); #ifdef USE_POLL - bool WaitPoll(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; + static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock); #endif /* USE_POLL */ - bool WaitSelect(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const; + static bool WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock); /* Higher level, convenience, methods. These may throw. */ From 0d92d404f8f11619e11f2215be651e0f4e6798c5 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 29 Jun 2025 13:25:16 +0000 Subject: [PATCH 08/15] net: implement `WaitMany` variants for {`epoll`, `kqueue`} Co-authored-by: UdjinM6 --- src/i2p.cpp | 2 +- src/net.cpp | 83 +------------------------------ src/net.h | 21 +++----- src/netbase.cpp | 2 +- src/test/fuzz/util.cpp | 4 +- src/test/fuzz/util.h | 4 +- src/test/util/net.h | 6 ++- src/util/sock.cpp | 108 ++++++++++++++++++++++++++++++++++++++--- src/util/sock.h | 38 ++++++++++++--- 9 files changed, 149 insertions(+), 119 deletions(-) diff --git a/src/i2p.cpp b/src/i2p.cpp index fa02e48a6eff..24d20d7fc7f7 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -160,7 +160,7 @@ bool Session::Accept(Connection& conn) while (!*m_interrupt) { Sock::Event occurred; - if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SEM_LT_DEFAULT, &occurred)) { + if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SocketEventsParams(), &occurred)) { errmsg = "wait on socket failed"; break; } diff --git a/src/net.cpp b/src/net.cpp index 11f5d84739b2..b2459cc2a811 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -56,14 +56,6 @@ #include #endif -#ifdef USE_EPOLL -#include -#endif - -#ifdef USE_KQUEUE -#include -#endif - #include #include #include @@ -2375,79 +2367,6 @@ Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) return events_per_sock; } -#ifdef USE_KQUEUE -void CConnman::SocketEventsKqueue(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::array events{}; - struct timespec timeout = MillisToTimespec(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - - int ret{-1}; - ToggleWakeupPipe([&](){ - ret = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events.data(), events.size(), &timeout); - }); - if (ret == SOCKET_ERROR) { - LogPrintf("kevent wait error\n"); - return; - } - - for (int i = 0; i < ret; i++) { - auto& event = events[i]; - if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) { - error_set.insert((SOCKET)event.ident); - continue; - } - - if (event.filter == EVFILT_READ) { - recv_set.insert((SOCKET)event.ident); - } - - if (event.filter == EVFILT_WRITE) { - send_set.insert((SOCKET)event.ident); - } - } -} -#endif - -#ifdef USE_EPOLL -void CConnman::SocketEventsEpoll(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::array events{}; - - int ret{-1}; - ToggleWakeupPipe([&](){ - ret = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events.data(), events.size(), - only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); - }); - - if (ret == SOCKET_ERROR) { - LogPrintf("epoll_wait error\n"); - return; - } - - for (int i = 0; i < ret; i++) { - auto& e = events[i]; - if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { - error_set.insert((SOCKET)e.data.fd); - continue; - } - - if (e.events & EPOLLIN) { - recv_set.insert((SOCKET)e.data.fd); - } - - if (e.events & EPOLLOUT) { - send_set.insert((SOCKET)e.data.fd); - } - } -} -#endif - void CConnman::SocketHandler(CMasternodeSync& mn_sync) { AssertLockNotHeld(m_total_bytes_sent_mutex); @@ -2480,7 +2399,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) // select(2)). If none are ready, wait for a short while and return // empty sets. events_per_sock = GenerateWaitSockets(snap.Nodes()); - if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, socketEventsMode)) { + if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor()})) { if (is_lt) { interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); } diff --git a/src/net.h b/src/net.h index 81f609897cbc..5ce882c5610b 100644 --- a/src/net.h +++ b/src/net.h @@ -1623,19 +1623,6 @@ friend class CNode; */ Sock::EventsPerSock GenerateWaitSockets(Span nodes); -#ifdef USE_KQUEUE - void SocketEventsKqueue(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif -#ifdef USE_EPOLL - void SocketEventsEpoll(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif - /** * Check connected and listening sockets for IO readiness and process them accordingly. */ @@ -1867,6 +1854,14 @@ friend class CNode; std::unique_ptr m_edge_trig_events{nullptr}; std::unique_ptr m_wakeup_pipe{nullptr}; + SOCKET GetModeFileDescriptor() + { + if (m_edge_trig_events) { + return static_cast(m_edge_trig_events->GetFileDescriptor()); + } + return INVALID_SOCKET; + } + template void ToggleWakeupPipe(Callable&& func) { diff --git a/src/netbase.cpp b/src/netbase.cpp index ea8877cb4386..7ef1dde60f18 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -572,7 +572,7 @@ static bool ConnectToSocket(const Sock& sock, struct sockaddr* sockaddr, socklen // synchronously to check for successful connection with a timeout. const Sock::Event requested = Sock::RECV | Sock::SEND; Sock::Event occurred; - if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SEM_LT_DEFAULT, &occurred)) { + if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SocketEventsParams(), &occurred)) { LogPrintf("wait for connect to %s failed: %s\n", dest_str, NetworkErrorString(WSAGetLastError())); diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index c285994f5b44..d772d0954f94 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -270,7 +270,7 @@ bool FuzzedSock::IsSelectable(bool is_select) const return m_selectable; } -bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const +bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const { constexpr std::array wait_errnos{ EBADF, @@ -287,7 +287,7 @@ bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Socket return true; } -bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const +bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const { for (auto& [sock, events] : events_per_sock) { (void)sock; diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index 2a2125ec6a0c..d2b493478ae9 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -89,9 +89,9 @@ class FuzzedSock : public Sock bool IsSelectable(bool is_select) const override; - bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode = SEM_LT_DEFAULT, Event* occurred = nullptr) const override; + bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params = SocketEventsParams(), Event* occurred = nullptr) const override; - bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override; + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params = SocketEventsParams()) const override; bool IsConnected(std::string& errmsg) const override; }; diff --git a/src/test/util/net.h b/src/test/util/net.h index c7b786b5645f..679c6cb28b28 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -202,7 +202,7 @@ class StaticContentsSock : public Sock bool Wait(std::chrono::milliseconds timeout, Event requested, - SocketEventsMode event_mode = SEM_LT_DEFAULT, + SocketEventsParams event_params = SocketEventsParams(), Event* occurred = nullptr) const override { if (occurred != nullptr) { @@ -211,7 +211,9 @@ class StaticContentsSock : public Sock return true; } - bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode = SEM_LT_DEFAULT) const override + bool WaitMany(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams event_params = SocketEventsParams()) const override { for (auto& [sock, events] : events_per_sock) { (void)sock; diff --git a/src/util/sock.cpp b/src/util/sock.cpp index a8932a51adab..0c5f6d5b457c 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -15,6 +15,14 @@ #include #include +#ifdef USE_EPOLL +#include +#endif + +#ifdef USE_KQUEUE +#include +#endif + #ifdef USE_POLL #include #endif @@ -145,11 +153,16 @@ bool Sock::IsSelectable(bool is_select) const return IsSelectableSocket(m_socket, is_select); } -bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsMode event_mode, Event* occurred) const +bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const { EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})}; - if (!WaitMany(timeout, events_per_sock)) { + if (auto [sem, _] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) { + // We need to ensure we are only using a level-triggered mode because we are expecting + // a direct correlation between the events reported and the one socket we are querying + event_params = SocketEventsParams(); + } + if (!WaitMany(timeout, events_per_sock, event_params)) { return false; } @@ -160,16 +173,16 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents return true; } -bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) const +bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const { - return WaitManyInternal(timeout, events_per_sock, event_mode); + return WaitManyInternal(timeout, events_per_sock, event_params); } -bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsMode event_mode) +bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) { std::string debug_str; - switch (event_mode) + switch (event_params.m_event_mode) { case SocketEventsMode::Poll: #ifdef USE_POLL @@ -181,11 +194,21 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev case SocketEventsMode::Select: return WaitManySelect(timeout, events_per_sock); case SocketEventsMode::EPoll: - debug_str += "Sock::Wait -- Unimplemented for epoll, falling back on "; +#ifdef USE_EPOLL + assert(event_params.m_event_fd != INVALID_SOCKET); + return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd); +#else + debug_str += "Sock::Wait -- Support for epoll not compiled in, falling back on "; break; +#endif /* USE_EPOLL */ case SocketEventsMode::KQueue: - debug_str += "Sock::Wait -- Unimplemented for kqueue, falling back on "; +#ifdef USE_KQUEUE + assert(event_params.m_event_fd != INVALID_SOCKET); + return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd); +#else + debug_str += "Sock::Wait -- Support for kqueue not compiled in, falling back on "; break; +#endif /* USE_KQUEUE */ default: assert(false); } @@ -202,6 +225,75 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev #endif /* USE_POLL */ } +#ifdef USE_EPOLL +bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd) +{ + std::array events{}; + + int ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout)); + if (ret == SOCKET_ERROR) { + return false; + } + + // Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the + // entire map before populating it with our events data. + events_per_sock.clear(); + + for (int idx = 0; idx < ret; idx++) { + auto& ev = events[idx]; + Event occurred = 0; + if (ev.events & (EPOLLERR | EPOLLHUP)) { + occurred |= ERR; + } else { + if (ev.events & EPOLLIN) { + occurred |= RECV; + } + if (ev.events & EPOLLOUT) { + occurred |= SEND; + } + } + events_per_sock.emplace(static_cast(ev.data.fd), Sock::Events{/*req=*/RECV | SEND, occurred}); + } + + return true; +} +#endif /* USE_EPOLL */ + +#ifdef USE_KQUEUE +bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd) +{ + std::array events{}; + struct timespec ts = MillisToTimespec(timeout); + + int ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts); + if (ret == SOCKET_ERROR) { + return false; + } + + // Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the + // entire map before populating it with our events data. + events_per_sock.clear(); + + for (int idx = 0; idx < ret; idx++) { + auto& ev = events[idx]; + Event occurred = 0; + if (ev.flags & (EV_ERROR | EV_EOF)) { + occurred |= ERR; + } else { + if (ev.filter == EVFILT_READ) { + occurred |= RECV; + } + if (ev.filter == EVFILT_WRITE) { + occurred |= SEND; + } + } + events_per_sock.emplace(static_cast(ev.ident), Sock::Events{/*req=*/RECV | SEND, occurred}); + } + + return true; +} +#endif /* USE_KQUEUE */ + #ifdef USE_POLL bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) { diff --git a/src/util/sock.h b/src/util/sock.h index 1bbf4e2e959c..d6f2ee4b2e9b 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -31,11 +31,27 @@ enum class SocketEventsMode : int8_t { Unknown = -1 }; +struct SocketEventsParams +{ + SocketEventsParams() = default; + SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd) : + m_event_mode{event_mode}, + m_event_fd{event_fd} + {} + ~SocketEventsParams() = default; + +public: + /* Choice of API to use in Sock::Wait{,Many}() */ + SocketEventsMode m_event_mode{ #ifdef USE_POLL -#define SEM_LT_DEFAULT SocketEventsMode::Poll + SocketEventsMode::Poll #else -#define SEM_LT_DEFAULT SocketEventsMode::Select + SocketEventsMode::Select #endif /* USE_POLL */ + }; + /* File descriptor for event triggered SEMs (and INVALID_SOCKET for the rest) */ + SOCKET m_event_fd{INVALID_SOCKET}; +}; /* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ constexpr std::string_view SEMToString(const SocketEventsMode val) { @@ -197,7 +213,7 @@ class Sock * Check if the underlying socket can be used for `select(2)` (or the `Wait()` method). * @return true if selectable */ - [[nodiscard]] virtual bool IsSelectable(bool is_select = (SEM_LT_DEFAULT == SocketEventsMode::Select)) const; + [[nodiscard]] virtual bool IsSelectable(bool is_select = (SocketEventsParams().m_event_mode == SocketEventsMode::Select)) const; using Event = uint8_t; @@ -221,7 +237,7 @@ class Sock * Wait for readiness for input (recv) or output (send). * @param[in] timeout Wait this much for at least one of the requested events to occur. * @param[in] requested Wait for those events, bitwise-or of `RECV` and `SEND`. - * @param[in] event_mode Wait using the API specified. + * @param[in] event_params Wait using the API specified. * @param[out] occurred If not nullptr and the function returns `true`, then this * indicates which of the requested events occurred (`ERR` will be added, even if * not requested, if an exceptional event occurs on the socket). @@ -230,13 +246,13 @@ class Sock */ [[nodiscard]] virtual bool Wait(std::chrono::milliseconds timeout, Event requested, - SocketEventsMode event_mode = SEM_LT_DEFAULT, + SocketEventsParams event_params = SocketEventsParams(), Event* occurred = nullptr) const; /** * Auxiliary requested/occurred events to wait for in `WaitMany()`. */ struct Events { - explicit Events(Event req) : requested{req}, occurred{0} {} + explicit Events(Event req, Event ocr = 0) : requested{req}, occurred{ocr} {} Event requested; Event occurred; }; @@ -268,7 +284,7 @@ class Sock */ [[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, - SocketEventsMode event_mode = SEM_LT_DEFAULT) const; + SocketEventsParams event_params = SocketEventsParams()) const; /** * As an EventsPerSock map no longer contains a Sock object (it now contains the raw SOCKET file @@ -283,7 +299,13 @@ class Sock */ static bool WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, - SocketEventsMode event_mode = SEM_LT_DEFAULT); + SocketEventsParams event_params = SocketEventsParams()); +#ifdef USE_EPOLL + static bool WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd); +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + static bool WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd); +#endif /* USE_KQUEUE */ #ifdef USE_POLL static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock); #endif /* USE_POLL */ From 5ae6f2a48378264bc05ac96a4b6c7a431a4477ff Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 12:14:02 +0000 Subject: [PATCH 09/15] fix: merge `kqueue` events manually as they are not bitwise OR'ed --- src/util/sock.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 0c5f6d5b457c..f8cbdd125ea9 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -287,7 +287,11 @@ bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& even occurred |= SEND; } } - events_per_sock.emplace(static_cast(ev.ident), Sock::Events{/*req=*/RECV | SEND, occurred}); + if (auto it = events_per_sock.find(static_cast(ev.ident)); it != events_per_sock.end()) { + it->second.occurred |= occurred; + } else { + events_per_sock.emplace(static_cast(ev.ident), Sock::Events{/*req=*/RECV | SEND, occurred}); + } } return true; From f01a8714ed8684495b33331e79695b61a18b16a0 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sun, 29 Jun 2025 11:08:01 +0000 Subject: [PATCH 10/15] net: add early bail out condition for empty `events_per_sock` for LTMs We handle timeouts in `CConnman::SocketHandler()`, so there is no adverse effects for bailing out early from `poll` or `select` when there's nothing to monitor to begin with. Co-authored-by: UdjinM6 --- src/util/sock.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/util/sock.cpp b/src/util/sock.cpp index f8cbdd125ea9..fb526c803bda 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -301,6 +301,8 @@ bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& even #ifdef USE_POLL bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) { + if (events_per_sock.empty()) return true; + std::vector pfds; for (const auto& [socket, events] : events_per_sock) { pfds.emplace_back(); @@ -341,6 +343,8 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) { + if (events_per_sock.empty()) return true; + fd_set recv; fd_set send; fd_set err; From e4cc5acec7dd87c4bb5634f71f6076b19ccf75c0 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 13:46:17 +0000 Subject: [PATCH 11/15] net: implement `ToggleWakeupPipe` in all WaitMany variants --- src/net.cpp | 2 +- src/net.h | 5 ++--- src/util/sock.cpp | 54 ++++++++++++++++++++++++++++++++++------------- src/util/sock.h | 28 ++++++++++++++++++------ 4 files changed, 64 insertions(+), 25 deletions(-) diff --git a/src/net.cpp b/src/net.cpp index b2459cc2a811..619a5a797f66 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2399,7 +2399,7 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) // select(2)). If none are ready, wait for a short while and return // empty sets. events_per_sock = GenerateWaitSockets(snap.Nodes()); - if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor()})) { + if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor(), ToggleWakeupPipe})) { if (is_lt) { interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); } diff --git a/src/net.h b/src/net.h index 5ce882c5610b..e273f933b567 100644 --- a/src/net.h +++ b/src/net.h @@ -1862,15 +1862,14 @@ friend class CNode; return INVALID_SOCKET; } - template - void ToggleWakeupPipe(Callable&& func) + SocketEventsParams::wrap_fn ToggleWakeupPipe = [&](std::function&& func) { if (m_wakeup_pipe) { m_wakeup_pipe->Toggle(func); } else { func(); } - } + }; Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); diff --git a/src/util/sock.cpp b/src/util/sock.cpp index fb526c803bda..02612bc01f54 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -157,7 +157,7 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents { EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})}; - if (auto [sem, _] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) { + if (auto [sem, _, __] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) { // We need to ensure we are only using a level-triggered mode because we are expecting // a direct correlation between the events reported and the one socket we are querying event_params = SocketEventsParams(); @@ -186,17 +186,17 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev { case SocketEventsMode::Poll: #ifdef USE_POLL - return WaitManyPoll(timeout, events_per_sock); + return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func); #else debug_str += "Sock::Wait -- Support for poll not compiled in, falling back on "; break; #endif /* USE_POLL */ case SocketEventsMode::Select: - return WaitManySelect(timeout, events_per_sock); + return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func); case SocketEventsMode::EPoll: #ifdef USE_EPOLL assert(event_params.m_event_fd != INVALID_SOCKET); - return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd); + return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); #else debug_str += "Sock::Wait -- Support for epoll not compiled in, falling back on "; break; @@ -204,7 +204,7 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev case SocketEventsMode::KQueue: #ifdef USE_KQUEUE assert(event_params.m_event_fd != INVALID_SOCKET); - return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd); + return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); #else debug_str += "Sock::Wait -- Support for kqueue not compiled in, falling back on "; break; @@ -219,18 +219,24 @@ bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& ev #endif /* USE_POLL*/ LogPrint(BCLog::NET, "%s\n", debug_str); #ifdef USE_POLL - return WaitManyPoll(timeout, events_per_sock); + return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func); #else - return WaitManySelect(timeout, events_per_sock); + return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func); #endif /* USE_POLL */ } #ifdef USE_EPOLL -bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd) +bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET epoll_fd, + SocketEventsParams::wrap_fn wrap_func) { std::array events{}; - int ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout)); + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout)); + }); if (ret == SOCKET_ERROR) { return false; } @@ -260,12 +266,18 @@ bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& event #endif /* USE_EPOLL */ #ifdef USE_KQUEUE -bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd) +bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET kqueue_fd, + SocketEventsParams::wrap_fn wrap_func) { std::array events{}; struct timespec ts = MillisToTimespec(timeout); - int ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts); + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts); + }); if (ret == SOCKET_ERROR) { return false; } @@ -299,7 +311,9 @@ bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& even #endif /* USE_KQUEUE */ #ifdef USE_POLL -bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) +bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func) { if (events_per_sock.empty()) return true; @@ -316,7 +330,11 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events } } - if (poll(pfds.data(), pfds.size(), count_milliseconds(timeout)) == SOCKET_ERROR) { + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = poll(pfds.data(), pfds.size(), count_milliseconds(timeout)); + }); + if (ret == SOCKET_ERROR) { return false; } @@ -341,7 +359,9 @@ bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events } #endif /* USE_POLL */ -bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock) +bool Sock::WaitManySelect(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func) { if (events_per_sock.empty()) return true; @@ -370,7 +390,11 @@ bool Sock::WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& even timeval tv = MillisToTimeval(timeout); - if (select(socket_max + 1, &recv, &send, &err, &tv) == SOCKET_ERROR) { + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = select(socket_max + 1, &recv, &send, &err, &tv); + }); + if (ret == SOCKET_ERROR) { return false; } diff --git a/src/util/sock.h b/src/util/sock.h index d6f2ee4b2e9b..92090506262b 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -33,10 +34,13 @@ enum class SocketEventsMode : int8_t { struct SocketEventsParams { + using wrap_fn = std::function&&)>; + SocketEventsParams() = default; - SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd) : + SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd, wrap_fn wrap_func) : m_event_mode{event_mode}, - m_event_fd{event_fd} + m_event_fd{event_fd}, + m_wrap_func{wrap_func} {} ~SocketEventsParams() = default; @@ -51,6 +55,8 @@ struct SocketEventsParams }; /* File descriptor for event triggered SEMs (and INVALID_SOCKET for the rest) */ SOCKET m_event_fd{INVALID_SOCKET}; + /* Function that wraps itself around WakeMany()'s API call */ + wrap_fn m_wrap_func{[](std::function&& func){func();}}; }; /* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ @@ -301,15 +307,25 @@ class Sock EventsPerSock& events_per_sock, SocketEventsParams event_params = SocketEventsParams()); #ifdef USE_EPOLL - static bool WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET epoll_fd); + static bool WaitManyEPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET epoll_fd, + SocketEventsParams::wrap_fn wrap_func); #endif /* USE_EPOLL */ #ifdef USE_KQUEUE - static bool WaitManyKQueue(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SOCKET kqueue_fd); + static bool WaitManyKQueue(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET kqueue_fd, + SocketEventsParams::wrap_fn wrap_func); #endif /* USE_KQUEUE */ #ifdef USE_POLL - static bool WaitManyPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock); + static bool WaitManyPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func); #endif /* USE_POLL */ - static bool WaitManySelect(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock); + static bool WaitManySelect(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func); /* Higher level, convenience, methods. These may throw. */ From 08a42c1bfa9a68af21ba1a5dbf37d772a4106939 Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Thu, 9 May 2024 16:18:39 +0000 Subject: [PATCH 12/15] refactor: move `DEFAULT_SOCKETEVENTS` to `util/sock.h` --- src/net.h | 10 ---------- src/util/sock.h | 10 ++++++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/net.h b/src/net.h index e273f933b567..95cf7b6d4213 100644 --- a/src/net.h +++ b/src/net.h @@ -118,16 +118,6 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000; static constexpr bool DEFAULT_V2_TRANSPORT{true}; -#if defined USE_KQUEUE -#define DEFAULT_SOCKETEVENTS "kqueue" -#elif defined USE_EPOLL -#define DEFAULT_SOCKETEVENTS "epoll" -#elif defined USE_POLL -#define DEFAULT_SOCKETEVENTS "poll" -#else -#define DEFAULT_SOCKETEVENTS "select" -#endif - typedef int64_t NodeId; struct AddedNodeParams { diff --git a/src/util/sock.h b/src/util/sock.h index 92090506262b..9a0b0606c26b 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -15,6 +15,16 @@ #include #include +#if defined(USE_EPOLL) +#define DEFAULT_SOCKETEVENTS "epoll" +#elif defined(USE_KQUEUE) +#define DEFAULT_SOCKETEVENTS "kqueue" +#elif defined(USE_POLL) +#define DEFAULT_SOCKETEVENTS "poll" +#else +#define DEFAULT_SOCKETEVENTS "select" +#endif + /** * Maximum time to wait for I/O readiness. * It will take up until this time to break off in case of an interruption. From aca5ec9baaa01754b39f9debbcbab600c94a76da Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 28 Jun 2025 13:56:42 +0000 Subject: [PATCH 13/15] chore: remove scaffolding (SEM must be correct, no graceful fallback) Graceful fallback was necessary when filling in the gaps functionality wise but now that everything is in place, we should revert back to the old behavior of crashing when supplied an invalid or inapplicable SocketEventsMode preference as this means we didn't filter out an invalid input from the user-end or didn't set our defaults correctly, which is a logic error. --- src/util/sock.cpp | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 02612bc01f54..06d11cfdebf7 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -180,49 +180,27 @@ bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) { - std::string debug_str; - switch (event_params.m_event_mode) { - case SocketEventsMode::Poll: #ifdef USE_POLL + case SocketEventsMode::Poll: return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func); -#else - debug_str += "Sock::Wait -- Support for poll not compiled in, falling back on "; - break; #endif /* USE_POLL */ case SocketEventsMode::Select: return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func); - case SocketEventsMode::EPoll: #ifdef USE_EPOLL + case SocketEventsMode::EPoll: assert(event_params.m_event_fd != INVALID_SOCKET); return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); -#else - debug_str += "Sock::Wait -- Support for epoll not compiled in, falling back on "; - break; #endif /* USE_EPOLL */ - case SocketEventsMode::KQueue: #ifdef USE_KQUEUE + case SocketEventsMode::KQueue: assert(event_params.m_event_fd != INVALID_SOCKET); return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); -#else - debug_str += "Sock::Wait -- Support for kqueue not compiled in, falling back on "; - break; #endif /* USE_KQUEUE */ default: assert(false); } -#ifdef USE_POLL - debug_str += "poll"; -#else - debug_str += "select"; -#endif /* USE_POLL*/ - LogPrint(BCLog::NET, "%s\n", debug_str); -#ifdef USE_POLL - return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func); -#else - return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func); -#endif /* USE_POLL */ } #ifdef USE_EPOLL From c6e0e96f535f4a693a6080d20642e6daa708877a Mon Sep 17 00:00:00 2001 From: Kittywhiskers Van Gogh <63189531+kwvg@users.noreply.github.com> Date: Sat, 5 Jul 2025 09:03:15 +0000 Subject: [PATCH 14/15] chore: remove scaffolding (remove default args, make explicit choice) This was acceptable while we were refining earlier commits but default args could in combination with future backports result in behavior changes that were unanticipated, we should clear the air right now. We are introducing a new global, `g_socket_events_mode` to allow us to influence Wait(Many)() calls as and when needed. A global is acceptable as it's lightweight, a parameter that is not configurable during runtime except during initial startup and because the structure in which it is ordinarily stored is far heavier. Co-authored-by: UdjinM6 --- src/i2p.cpp | 2 +- src/init.cpp | 16 +------------ src/masternode/node.cpp | 2 +- src/net.cpp | 2 +- src/netbase.cpp | 6 ++--- src/test/fuzz/util.h | 4 ++-- src/test/sock_tests.cpp | 2 +- src/test/util/net.h | 4 ++-- src/test/util/setup_common.cpp | 11 +++++++++ src/util/sock.cpp | 19 +++++++++++---- src/util/sock.h | 44 ++++++++++++++++++++++++---------- 11 files changed, 68 insertions(+), 44 deletions(-) diff --git a/src/i2p.cpp b/src/i2p.cpp index 24d20d7fc7f7..3bc687e0dfde 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -160,7 +160,7 @@ bool Session::Accept(Connection& conn) while (!*m_interrupt) { Sock::Event occurred; - if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SocketEventsParams(), &occurred)) { + if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SocketEventsParams{::g_socket_events_mode}, &occurred)) { errmsg = "wait on socket failed"; break; } diff --git a/src/init.cpp b/src/init.cpp index e8fa49c4c594..0aa30bf002b7 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -480,21 +480,6 @@ static void OnRPCStopped() LogPrint(BCLog::RPC, "RPC stopped.\n"); } -std::string GetSupportedSocketEventsStr() -{ - std::string strSupportedModes = "'select'"; -#ifdef USE_POLL - strSupportedModes += ", 'poll'"; -#endif -#ifdef USE_EPOLL - strSupportedModes += ", 'epoll'"; -#endif -#ifdef USE_KQUEUE - strSupportedModes += ", 'kqueue'"; -#endif - return strSupportedModes; -} - void SetupServerArgs(ArgsManager& argsman) { SetupHelpOptions(argsman); @@ -2533,6 +2518,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); } connOptions.socketEventsMode = sem; + ::g_socket_events_mode = sem; const std::string& i2psam_arg = args.GetArg("-i2psam", ""); if (!i2psam_arg.empty()) { diff --git a/src/masternode/node.cpp b/src/masternode/node.cpp index 98537844fbec..e582b8ee4efc 100644 --- a/src/masternode/node.cpp +++ b/src/masternode/node.cpp @@ -156,7 +156,7 @@ void CActiveMasternodeManager::InitInternal(const CBlockIndex* pindex) // Check socket connectivity LogPrintf("CActiveMasternodeManager::Init -- Checking inbound connection to '%s'\n", m_info.service.ToStringAddrPort()); std::unique_ptr sock{ConnectDirectly(m_info.service, /*manual_connection=*/true)}; - bool fConnected{sock && sock->IsSelectable()}; + bool fConnected{sock && sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)}; sock = std::make_unique(INVALID_SOCKET); if (!fConnected && Params().RequireRoutableExternalIP()) { m_state = MasternodeState::SOME_ERROR; diff --git a/src/net.cpp b/src/net.cpp index 619a5a797f66..d28a891921af 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1928,7 +1928,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, return; } - if (!sock->IsSelectable()) { + if (!sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)) { LogPrintf("%s: non-selectable socket\n", strDropped); return; } diff --git a/src/netbase.cpp b/src/netbase.cpp index 7ef1dde60f18..850044f00c3d 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -336,7 +336,7 @@ static IntrRecvError InterruptibleRecv(uint8_t* data, size_t len, int timeout, c // we're approaching the end of the specified total timeout const auto remaining = std::chrono::milliseconds{endTime - curTime}; const auto timeout = std::min(remaining, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - if (!sock.Wait(timeout, Sock::RECV)) { + if (!sock.Wait(timeout, Sock::RECV, SocketEventsParams{::g_socket_events_mode})) { return IntrRecvError::NetworkError; } } else { @@ -514,7 +514,7 @@ std::unique_ptr CreateSockOS(sa_family_t address_family) // Ensure that waiting for I/O on this socket won't result in undefined // behavior. - if (!sock->IsSelectable()) { + if (!sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)) { LogPrintf("Cannot create connection: non-selectable socket created (fd >= FD_SETSIZE ?)\n"); return nullptr; } @@ -572,7 +572,7 @@ static bool ConnectToSocket(const Sock& sock, struct sockaddr* sockaddr, socklen // synchronously to check for successful connection with a timeout. const Sock::Event requested = Sock::RECV | Sock::SEND; Sock::Event occurred; - if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SocketEventsParams(), &occurred)) { + if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SocketEventsParams{::g_socket_events_mode}, &occurred)) { LogPrintf("wait for connect to %s failed: %s\n", dest_str, NetworkErrorString(WSAGetLastError())); diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index d2b493478ae9..499a17b23fec 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -89,9 +89,9 @@ class FuzzedSock : public Sock bool IsSelectable(bool is_select) const override; - bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params = SocketEventsParams(), Event* occurred = nullptr) const override; + bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred = nullptr) const override; - bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params = SocketEventsParams()) const override; + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const override; bool IsConnected(std::string& errmsg) const override; }; diff --git a/src/test/sock_tests.cpp b/src/test/sock_tests.cpp index 8376ec1a6847..7e7c9741e9be 100644 --- a/src/test/sock_tests.cpp +++ b/src/test/sock_tests.cpp @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(wait) Sock sock0(s[0]); Sock sock1(s[1]); - std::thread waiter([&sock0]() { (void)sock0.Wait(24h, Sock::RECV); }); + std::thread waiter([&sock0]() { (void)sock0.Wait(24h, Sock::RECV, SocketEventsParams{::g_socket_events_mode}); }); BOOST_REQUIRE_EQUAL(sock1.Send("a", 1, 0), 1); diff --git a/src/test/util/net.h b/src/test/util/net.h index 679c6cb28b28..ab1d958546c8 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -202,7 +202,7 @@ class StaticContentsSock : public Sock bool Wait(std::chrono::milliseconds timeout, Event requested, - SocketEventsParams event_params = SocketEventsParams(), + SocketEventsParams event_params, Event* occurred = nullptr) const override { if (occurred != nullptr) { @@ -213,7 +213,7 @@ class StaticContentsSock : public Sock bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, - SocketEventsParams event_params = SocketEventsParams()) const override + SocketEventsParams event_params) const override { for (auto& [sock, events] : events_per_sock) { (void)sock; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 8e4b054caf3c..57b9c464e8d6 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -189,6 +189,15 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve m_node.addrman = std::make_unique(*m_node.netgroupman, /*deterministic=*/false, m_node.args->GetIntArg("-checkaddrman", 0)); + + std::string sem_str = m_args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + ::g_socket_events_mode = SEMFromString(sem_str); + if (::g_socket_events_mode == SocketEventsMode::Unknown) { + throw std::runtime_error( + strprintf("Invalid -socketevents ('%s') specified. Only these modes are supported: %s", + sem_str, GetSupportedSocketEventsStr())); + } + m_node.connman = std::make_unique(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman); // Deterministic randomness for tests. fCheckBlockIndex = true; @@ -209,6 +218,7 @@ BasicTestingSetup::~BasicTestingSetup() m_node.cpoolman.reset(); m_node.mnhf_manager.reset(); m_node.evodb.reset(); + ::g_socket_events_mode = SocketEventsMode::Unknown; m_node.connman.reset(); m_node.addrman.reset(); m_node.netgroupman.reset(); @@ -326,6 +336,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vectorInit(options); } diff --git a/src/util/sock.cpp b/src/util/sock.cpp index 06d11cfdebf7..29413b8e53ad 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -27,6 +27,8 @@ #include #endif +SocketEventsMode g_socket_events_mode{SocketEventsMode::Unknown}; + static inline bool IOErrorIsPermanent(int err) { return err != WSAEAGAIN && err != WSAEINTR && err != WSAEWOULDBLOCK && err != WSAEINPROGRESS; @@ -157,10 +159,17 @@ bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEvents { EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})}; + // We need to ensure we are only using a level-triggered mode because we are expecting + // a direct correlation between the events reported and the one socket we are querying if (auto [sem, _, __] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) { - // We need to ensure we are only using a level-triggered mode because we are expecting - // a direct correlation between the events reported and the one socket we are querying - event_params = SocketEventsParams(); + // We will use a compatible fallback events mode if we didn't specify a valid option + event_params = SocketEventsParams{ +#ifdef USE_POLL + SocketEventsMode::Poll +#else + SocketEventsMode::Select +#endif /* USE_POLL */ + }; } if (!WaitMany(timeout, events_per_sock, event_params)) { return false; @@ -430,7 +439,7 @@ void Sock::SendComplete(const std::string& data, // Wait for a short while (or the socket to become ready for sending) before retrying // if nothing was sent. const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - (void)Wait(wait_time, SEND); + (void)Wait(wait_time, SEND, SocketEventsParams{::g_socket_events_mode}); } } @@ -513,7 +522,7 @@ std::string Sock::RecvUntilTerminator(uint8_t terminator, // Wait for a short while (or the socket to become ready for reading) before retrying. const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - (void)Wait(wait_time, RECV); + (void)Wait(wait_time, RECV, SocketEventsParams{::g_socket_events_mode}); } } diff --git a/src/util/sock.h b/src/util/sock.h index 9a0b0606c26b..5ecf9461716a 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -46,23 +46,24 @@ struct SocketEventsParams { using wrap_fn = std::function&&)>; - SocketEventsParams() = default; + SocketEventsParams() = delete; + SocketEventsParams(SocketEventsMode event_mode) : + m_event_mode{event_mode} + { + assert(m_event_mode != SocketEventsMode::Unknown); + } SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd, wrap_fn wrap_func) : m_event_mode{event_mode}, m_event_fd{event_fd}, m_wrap_func{wrap_func} - {} + { + assert(m_event_mode != SocketEventsMode::Unknown); + } ~SocketEventsParams() = default; public: /* Choice of API to use in Sock::Wait{,Many}() */ - SocketEventsMode m_event_mode{ -#ifdef USE_POLL - SocketEventsMode::Poll -#else - SocketEventsMode::Select -#endif /* USE_POLL */ - }; + SocketEventsMode m_event_mode{SocketEventsMode::Unknown}; /* File descriptor for event triggered SEMs (and INVALID_SOCKET for the rest) */ SOCKET m_event_fd{INVALID_SOCKET}; /* Function that wraps itself around WakeMany()'s API call */ @@ -91,6 +92,21 @@ constexpr std::string_view SEMToString(const SocketEventsMode val) { }; } +constexpr std::string_view GetSupportedSocketEventsStr() +{ + return "'select'" +#ifdef USE_POLL + ", 'poll'" +#endif /* USE_POLL */ +#ifdef USE_EPOLL + ", 'epoll'" +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + ", 'kqueue'" +#endif /* USE_KQUEUE */ + ; +} + /* Converts string to SocketEventsMode value with additional check to report modes not compiled for as unknown */ constexpr SocketEventsMode SEMFromString(std::string_view str) { if (str == "select") { return SocketEventsMode::Select; } @@ -229,7 +245,7 @@ class Sock * Check if the underlying socket can be used for `select(2)` (or the `Wait()` method). * @return true if selectable */ - [[nodiscard]] virtual bool IsSelectable(bool is_select = (SocketEventsParams().m_event_mode == SocketEventsMode::Select)) const; + [[nodiscard]] virtual bool IsSelectable(bool is_select) const; using Event = uint8_t; @@ -262,7 +278,7 @@ class Sock */ [[nodiscard]] virtual bool Wait(std::chrono::milliseconds timeout, Event requested, - SocketEventsParams event_params = SocketEventsParams(), + SocketEventsParams event_params, Event* occurred = nullptr) const; /** * Auxiliary requested/occurred events to wait for in `WaitMany()`. @@ -300,7 +316,7 @@ class Sock */ [[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, - SocketEventsParams event_params = SocketEventsParams()) const; + SocketEventsParams event_params) const; /** * As an EventsPerSock map no longer contains a Sock object (it now contains the raw SOCKET file @@ -315,7 +331,7 @@ class Sock */ static bool WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, - SocketEventsParams event_params = SocketEventsParams()); + SocketEventsParams event_params); #ifdef USE_EPOLL static bool WaitManyEPoll(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, @@ -391,4 +407,6 @@ class Sock /** Return readable error string for a network error code */ std::string NetworkErrorString(int err); +extern SocketEventsMode g_socket_events_mode; + #endif // BITCOIN_UTIL_SOCK_H From c611fb0a9f116a5f88643c4d30d55486dfc3b48d Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Fri, 4 Jul 2025 14:49:34 +0300 Subject: [PATCH 15/15] fix: set `g_socket_events_mode` before starting `CConnman` --- src/init.cpp | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index 0aa30bf002b7..3ff62d219a0a 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -1627,6 +1627,12 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + ::g_socket_events_mode = SEMFromString(sem_str); + if (::g_socket_events_mode == SocketEventsMode::Unknown) { + return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); + } + assert(!node.banman); node.banman = std::make_unique(gArgs.GetDataDirNet() / "banlist", &uiInterface, args.GetIntArg("-bantime", DEFAULT_MISBEHAVING_BANTIME)); assert(!node.connman); @@ -2405,6 +2411,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.m_added_nodes = args.GetArgs("-addnode"); connOptions.nMaxOutboundLimit = *opt_max_upload; connOptions.m_peer_connect_timeout = peer_connect_timeout; + connOptions.socketEventsMode = ::g_socket_events_mode; // Port to bind to if `-bind=addr` is provided without a `:port` suffix. const uint16_t default_bind_port = @@ -2512,14 +2519,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } - std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); - const auto sem = SEMFromString(sem_str); - if (sem == SocketEventsMode::Unknown) { - return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); - } - connOptions.socketEventsMode = sem; - ::g_socket_events_mode = sem; - const std::string& i2psam_arg = args.GetArg("-i2psam", ""); if (!i2psam_arg.empty()) { const std::optional addr{Lookup(i2psam_arg, 7656, fNameLookup)};