diff --git a/src/i2p.cpp b/src/i2p.cpp index 76a0b6196a80..3bc687e0dfde 100644 --- a/src/i2p.cpp +++ b/src/i2p.cpp @@ -160,7 +160,7 @@ bool Session::Accept(Connection& conn) while (!*m_interrupt) { Sock::Event occurred; - if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, &occurred)) { + if (!conn.sock->Wait(MAX_WAIT_FOR_IO, Sock::RECV, SocketEventsParams{::g_socket_events_mode}, &occurred)) { errmsg = "wait on socket failed"; break; } diff --git a/src/init.cpp b/src/init.cpp index e8fa49c4c594..3ff62d219a0a 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -480,21 +480,6 @@ static void OnRPCStopped() LogPrint(BCLog::RPC, "RPC stopped.\n"); } -std::string GetSupportedSocketEventsStr() -{ - std::string strSupportedModes = "'select'"; -#ifdef USE_POLL - strSupportedModes += ", 'poll'"; -#endif -#ifdef USE_EPOLL - strSupportedModes += ", 'epoll'"; -#endif -#ifdef USE_KQUEUE - strSupportedModes += ", 'kqueue'"; -#endif - return strSupportedModes; -} - void SetupServerArgs(ArgsManager& argsman) { SetupHelpOptions(argsman); @@ -1642,6 +1627,12 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } + std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + ::g_socket_events_mode = SEMFromString(sem_str); + if (::g_socket_events_mode == SocketEventsMode::Unknown) { + return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); + } + assert(!node.banman); node.banman = std::make_unique(gArgs.GetDataDirNet() / "banlist", &uiInterface, args.GetIntArg("-bantime", DEFAULT_MISBEHAVING_BANTIME)); assert(!node.connman); @@ -2420,6 +2411,7 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) connOptions.m_added_nodes = args.GetArgs("-addnode"); connOptions.nMaxOutboundLimit = *opt_max_upload; connOptions.m_peer_connect_timeout = peer_connect_timeout; + connOptions.socketEventsMode = ::g_socket_events_mode; // Port to bind to if `-bind=addr` is provided without a `:port` suffix. const uint16_t default_bind_port = @@ -2527,13 +2519,6 @@ bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info) } } - std::string sem_str = args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); - const auto sem = SEMFromString(sem_str); - if (sem == SocketEventsMode::Unknown) { - return InitError(strprintf(_("Invalid -socketevents ('%s') specified. Only these modes are supported: %s"), sem_str, GetSupportedSocketEventsStr())); - } - connOptions.socketEventsMode = sem; - const std::string& i2psam_arg = args.GetArg("-i2psam", ""); if (!i2psam_arg.empty()) { const std::optional addr{Lookup(i2psam_arg, 7656, fNameLookup)}; diff --git a/src/masternode/node.cpp b/src/masternode/node.cpp index 98537844fbec..e582b8ee4efc 100644 --- a/src/masternode/node.cpp +++ b/src/masternode/node.cpp @@ -156,7 +156,7 @@ void CActiveMasternodeManager::InitInternal(const CBlockIndex* pindex) // Check socket connectivity LogPrintf("CActiveMasternodeManager::Init -- Checking inbound connection to '%s'\n", m_info.service.ToStringAddrPort()); std::unique_ptr sock{ConnectDirectly(m_info.service, /*manual_connection=*/true)}; - bool fConnected{sock && sock->IsSelectable()}; + bool fConnected{sock && sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)}; sock = std::make_unique(INVALID_SOCKET); if (!fConnected && Params().RequireRoutableExternalIP()) { m_state = MasternodeState::SOME_ERROR; diff --git a/src/net.cpp b/src/net.cpp index 97c457979afb..d28a891921af 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -56,18 +56,6 @@ #include #endif -#ifdef USE_POLL -#include -#endif - -#ifdef USE_EPOLL -#include -#endif - -#ifdef USE_KQUEUE -#include -#endif - #include #include #include @@ -1940,7 +1928,7 @@ void CConnman::CreateNodeFromAcceptedSocket(std::unique_ptr&& sock, return; } - if (!sock->IsSelectable()) { + if (!sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)) { LogPrintf("%s: non-selectable socket\n", strDropped); return; } @@ -2338,13 +2326,12 @@ bool CConnman::InactivityCheck(const CNode& node) const return false; } -bool CConnman::GenerateSelectSet(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set) +Sock::EventsPerSock CConnman::GenerateWaitSockets(Span nodes) { + Sock::EventsPerSock events_per_sock; + for (const ListenSocket& hListenSocket : vhListenSocket) { - recv_set.insert(hListenSocket.sock->Get()); + events_per_sock.emplace(hListenSocket.sock->Get(), Sock::Events{Sock::RECV}); } for (CNode* pnode : nodes) { @@ -2357,13 +2344,15 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, continue; } - error_set.insert(pnode->m_sock->Get()); + Sock::Event requested{0}; if (select_send) { - send_set.insert(pnode->m_sock->Get()); + requested |= Sock::SEND; } if (select_recv) { - recv_set.insert(pnode->m_sock->Get()); + requested |= Sock::RECV; } + + events_per_sock.emplace(pnode->m_sock->Get(), Sock::Events{requested}); } if (m_wakeup_pipe) { @@ -2372,245 +2361,17 @@ bool CConnman::GenerateSelectSet(const std::vector& nodes, // This is currently only implemented for POSIX compliant systems. This means that Windows will fall back to // timing out after 50ms and then trying to send. This is ok as we assume that heavy-load daemons are usually // run on Linux and friends. - recv_set.insert(m_wakeup_pipe->m_pipe[0]); - } - - return !recv_set.empty() || !send_set.empty() || !error_set.empty(); -} - -#ifdef USE_KQUEUE -void CConnman::SocketEventsKqueue(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - const size_t maxEvents = 64; - struct kevent events[maxEvents]; - - struct timespec timeout; - timeout.tv_sec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS / 1000; - timeout.tv_nsec = (only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS % 1000) * 1000 * 1000; - - int n{-1}; - ToggleWakeupPipe([&](){n = kevent(Assert(m_edge_trig_events)->GetFileDescriptor(), nullptr, 0, events, maxEvents, &timeout);}); - if (n == -1) { - LogPrintf("kevent wait error\n"); - } else if (n > 0) { - for (int i = 0; i < n; i++) { - auto& event = events[i]; - if ((event.flags & EV_ERROR) || (event.flags & EV_EOF)) { - error_set.insert((SOCKET)event.ident); - continue; - } - - if (event.filter == EVFILT_READ) { - recv_set.insert((SOCKET)event.ident); - } - - if (event.filter == EVFILT_WRITE) { - send_set.insert((SOCKET)event.ident); - } - } - } -} -#endif - -#ifdef USE_EPOLL -void CConnman::SocketEventsEpoll(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - const size_t maxEvents = 64; - epoll_event events[maxEvents]; - - int n{-1}; - ToggleWakeupPipe([&](){n = epoll_wait(Assert(m_edge_trig_events)->GetFileDescriptor(), events, maxEvents, only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); - for (int i = 0; i < n; i++) { - auto& e = events[i]; - if((e.events & EPOLLERR) || (e.events & EPOLLHUP)) { - error_set.insert((SOCKET)e.data.fd); - continue; - } - - if (e.events & EPOLLIN) { - recv_set.insert((SOCKET)e.data.fd); - } - - if (e.events & EPOLLOUT) { - send_set.insert((SOCKET)e.data.fd); - } - } -} -#endif - -#ifdef USE_POLL -void CConnman::SocketEventsPoll(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { - if (!only_poll) interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); - return; - } - - std::unordered_map pollfds; - for (SOCKET socket_id : recv_select_set) { - pollfds[socket_id].fd = socket_id; - pollfds[socket_id].events |= POLLIN; - } - - for (SOCKET socket_id : send_select_set) { - pollfds[socket_id].fd = socket_id; - pollfds[socket_id].events |= POLLOUT; - } - - for (SOCKET socket_id : error_select_set) { - pollfds[socket_id].fd = socket_id; - // These flags are ignored, but we set them for clarity - pollfds[socket_id].events |= POLLERR|POLLHUP; - } - - std::vector vpollfds; - vpollfds.reserve(pollfds.size()); - for (auto it : pollfds) { - vpollfds.push_back(std::move(it.second)); - } - - int r{-1}; - ToggleWakeupPipe([&](){r = poll(vpollfds.data(), vpollfds.size(), only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS);}); - if (r < 0) { - return; - } - - if (interruptNet) return; - - for (struct pollfd pollfd_entry : vpollfds) { - if (pollfd_entry.revents & POLLIN) recv_set.insert(pollfd_entry.fd); - if (pollfd_entry.revents & POLLOUT) send_set.insert(pollfd_entry.fd); - if (pollfd_entry.revents & (POLLERR|POLLHUP)) error_set.insert(pollfd_entry.fd); - } -} -#endif - -void CConnman::SocketEventsSelect(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - std::set recv_select_set, send_select_set, error_select_set; - if (!GenerateSelectSet(nodes, recv_select_set, send_select_set, error_select_set)) { - interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); - return; - } - - // - // Find which sockets have data to receive - // - struct timeval timeout; - timeout.tv_sec = 0; - timeout.tv_usec = only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS * 1000; // frequency to poll pnode->vSend - - fd_set fdsetRecv; - fd_set fdsetSend; - fd_set fdsetError; - FD_ZERO(&fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - SOCKET hSocketMax = 0; - - for (SOCKET hSocket : recv_select_set) { - FD_SET(hSocket, &fdsetRecv); - hSocketMax = std::max(hSocketMax, hSocket); - } - - for (SOCKET hSocket : send_select_set) { - FD_SET(hSocket, &fdsetSend); - hSocketMax = std::max(hSocketMax, hSocket); - } - - for (SOCKET hSocket : error_select_set) { - FD_SET(hSocket, &fdsetError); - hSocketMax = std::max(hSocketMax, hSocket); + events_per_sock.emplace(m_wakeup_pipe->m_pipe[0], Sock::Events{Sock::RECV}); } - int nSelect{-1}; - ToggleWakeupPipe([&](){nSelect = select(hSocketMax + 1, &fdsetRecv, &fdsetSend, &fdsetError, &timeout);}); - if (interruptNet) - return; - - if (nSelect == SOCKET_ERROR) - { - int nErr = WSAGetLastError(); - LogPrintf("socket select error %s\n", NetworkErrorString(nErr)); - for (unsigned int i = 0; i <= hSocketMax; i++) - FD_SET(i, &fdsetRecv); - FD_ZERO(&fdsetSend); - FD_ZERO(&fdsetError); - if (!interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS))) - return; - } - - for (SOCKET hSocket : recv_select_set) { - if (FD_ISSET(hSocket, &fdsetRecv)) { - recv_set.insert(hSocket); - } - } - - for (SOCKET hSocket : send_select_set) { - if (FD_ISSET(hSocket, &fdsetSend)) { - send_set.insert(hSocket); - } - } - - for (SOCKET hSocket : error_select_set) { - if (FD_ISSET(hSocket, &fdsetError)) { - error_set.insert(hSocket); - } - } -} - -void CConnman::SocketEvents(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll) -{ - switch (socketEventsMode) { -#ifdef USE_KQUEUE - case SocketEventsMode::KQueue: - SocketEventsKqueue(recv_set, send_set, error_set, only_poll); - break; -#endif -#ifdef USE_EPOLL - case SocketEventsMode::EPoll: - SocketEventsEpoll(recv_set, send_set, error_set, only_poll); - break; -#endif -#ifdef USE_POLL - case SocketEventsMode::Poll: - SocketEventsPoll(nodes, recv_set, send_set, error_set, only_poll); - break; -#endif - case SocketEventsMode::Select: - SocketEventsSelect(nodes, recv_set, send_set, error_set, only_poll); - break; - default: - assert(false); - } + return events_per_sock; } void CConnman::SocketHandler(CMasternodeSync& mn_sync) { AssertLockNotHeld(m_total_bytes_sent_mutex); - std::set recv_set; - std::set send_set; - std::set error_set; + Sock::EventsPerSock events_per_sock; bool only_poll = [this]() { // Check if we have work to do and thus should avoid waiting for events @@ -2630,72 +2391,64 @@ void CConnman::SocketHandler(CMasternodeSync& mn_sync) { const NodesSnapshot snap{*this, /* cond = */ CConnman::AllNodes, /* shuffle = */ false}; + const auto timeout = std::chrono::milliseconds(only_poll ? 0 : SELECT_TIMEOUT_MILLISECONDS); + const bool is_lt = socketEventsMode == SocketEventsMode::Poll || socketEventsMode == SocketEventsMode::Select; + // Check for the readiness of the already connected sockets and the // listening sockets in one call ("readiness" as in poll(2) or // select(2)). If none are ready, wait for a short while and return // empty sets. - SocketEvents(snap.Nodes(), recv_set, send_set, error_set, only_poll); + events_per_sock = GenerateWaitSockets(snap.Nodes()); + if ((is_lt && events_per_sock.empty()) || !Sock::WaitManyInternal(timeout, events_per_sock, SocketEventsParams{socketEventsMode, GetModeFileDescriptor(), ToggleWakeupPipe})) { + if (is_lt) { + interruptNet.sleep_for(std::chrono::milliseconds(SELECT_TIMEOUT_MILLISECONDS)); + } + } - // Drain the wakeup pipe - if (m_wakeup_pipe && recv_set.count(m_wakeup_pipe->m_pipe[0])) { - m_wakeup_pipe->Drain(); - } + // Drain the wakeup pipe + if (m_wakeup_pipe && events_per_sock.find(m_wakeup_pipe->m_pipe[0]) != events_per_sock.end()) { + m_wakeup_pipe->Drain(); + } // Service (send/receive) each of the already connected nodes. - SocketHandlerConnected(recv_set, send_set, error_set); + SocketHandlerConnected(events_per_sock); } // Accept new connections from listening sockets. - SocketHandlerListening(recv_set, mn_sync); + SocketHandlerListening(events_per_sock, mn_sync); } -void CConnman::SocketHandlerConnected(const std::set& recv_set, - const std::set& send_set, - const std::set& error_set) +void CConnman::SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) { AssertLockNotHeld(m_total_bytes_sent_mutex); if (interruptNet) return; - std::set vErrorNodes; - std::set vReceivableNodes; - std::set vSendableNodes; + std::set node_err_set; + std::set node_recv_set; + std::set node_send_set; { LOCK(cs_mapSocketToNode); - for (auto hSocket : error_set) { - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; + for (const auto& [sock, events] : events_per_sock) { + auto it = mapSocketToNode.find(sock); + if (it == mapSocketToNode.end()) continue; + if (events.occurred & Sock::ERR) { + it->second->AddRef(); + node_err_set.emplace(it->second); } - it->second->AddRef(); - vErrorNodes.emplace(it->second); - } - for (auto hSocket : recv_set) { - if (error_set.count(hSocket)) { - // no need to handle it twice - continue; + if (events.occurred & Sock::RECV) { + if (events.occurred & Sock::ERR) continue; + LOCK(cs_sendable_receivable_nodes); + auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fHasRecvData = true; } - - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; - } - - LOCK(cs_sendable_receivable_nodes); - auto jt = mapReceivableNodes.emplace(it->second->GetId(), it->second); - assert(jt.first->second == it->second); - it->second->fHasRecvData = true; - } - for (auto hSocket : send_set) { - auto it = mapSocketToNode.find(hSocket); - if (it == mapSocketToNode.end()) { - continue; + if (events.occurred & Sock::SEND) { + LOCK(cs_sendable_receivable_nodes); + auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); + assert(jt.first->second == it->second); + it->second->fCanSendData = true; } - - LOCK(cs_sendable_receivable_nodes); - auto jt = mapSendableNodes.emplace(it->second->GetId(), it->second); - assert(jt.first->second == it->second); - it->second->fCanSendData = true; } } @@ -2712,17 +2465,17 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, if (pnode->fHasRecvData && !pnode->fPauseRecv && !pnode->fDisconnect && (!pnode->m_transport->ReceivedMessageComplete() || to_send.empty())) { pnode->AddRef(); - vReceivableNodes.emplace(pnode); + node_recv_set.emplace(pnode); } // Collect nodes that have data to send and have a socket with non-empty write buffers if (pnode->fCanSendData && (!pnode->m_transport->ReceivedMessageComplete() || !to_send.empty())) { pnode->AddRef(); - vSendableNodes.emplace(pnode); + node_send_set.emplace(pnode); } }); - for (CNode* pnode : vSendableNodes) { + for (CNode* pnode : node_send_set) { if (interruptNet) { break; } @@ -2739,13 +2492,13 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, // sending actually succeeded to make sure progress is always made; otherwise a // deadlock would be possible when both sides have data to send, but neither is // receiving. - if (data_left && vReceivableNodes.erase(pnode)) { + if (data_left && node_recv_set.erase(pnode)) { pnode->Release(); } } } - for (CNode* pnode : vErrorNodes) + for (CNode* pnode : node_err_set) { if (interruptNet) { break; @@ -2754,7 +2507,7 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (CNode* pnode : vReceivableNodes) + for (CNode* pnode : node_recv_set) { if (interruptNet) { break; @@ -2766,13 +2519,13 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, SocketRecvData(pnode); } - for (auto& node : vErrorNodes) { + for (auto& node : node_err_set) { node->Release(); } - for (auto& node : vReceivableNodes) { + for (auto& node : node_recv_set) { node->Release(); } - for (auto& node : vSendableNodes) { + for (auto& node : node_send_set) { node->Release(); } @@ -2804,13 +2557,14 @@ void CConnman::SocketHandlerConnected(const std::set& recv_set, } } -void CConnman::SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) +void CConnman::SocketHandlerListening(const Sock::EventsPerSock& events_per_sock, CMasternodeSync& mn_sync) { for (const ListenSocket& listen_socket : vhListenSocket) { if (interruptNet) { return; } - if (recv_set.count(listen_socket.sock->Get()) > 0) { + const auto it = events_per_sock.find(listen_socket.sock->Get()); + if (it != events_per_sock.end() && (it->second.occurred & Sock::RECV)) { AcceptConnection(listen_socket, mn_sync); } } diff --git a/src/net.h b/src/net.h index e353f8a09542..95cf7b6d4213 100644 --- a/src/net.h +++ b/src/net.h @@ -118,16 +118,6 @@ static const size_t DEFAULT_MAXSENDBUFFER = 1 * 1000; static constexpr bool DEFAULT_V2_TRANSPORT{true}; -#if defined USE_KQUEUE -#define DEFAULT_SOCKETEVENTS "kqueue" -#elif defined USE_EPOLL -#define DEFAULT_SOCKETEVENTS "epoll" -#elif defined USE_POLL -#define DEFAULT_SOCKETEVENTS "poll" -#else -#define DEFAULT_SOCKETEVENTS "select" -#endif - typedef int64_t NodeId; struct AddedNodeParams { @@ -1619,55 +1609,9 @@ friend class CNode; /** * Generate a collection of sockets to check for IO readiness. * @param[in] nodes Select from these nodes' sockets. - * @param[out] recv_set Sockets to check for read readiness. - * @param[out] send_set Sockets to check for write readiness. - * @param[out] error_set Sockets to check for errors. - * @return true if at least one socket is to be checked (the returned set is not empty) + * @return sockets to check for readiness */ - bool GenerateSelectSet(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set); - - /** - * Check which sockets are ready for IO. - * @param[in] nodes Select from these nodes' sockets (in supported event methods). - * @param[in] only_poll Permit zero timeout polling - * @param[out] recv_set Sockets which are ready for read. - * @param[out] send_set Sockets which are ready for write. - * @param[out] error_set Sockets which have errors. - * This calls `GenerateSelectSet()` to gather a list of sockets to check. - */ - void SocketEvents(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); - -#ifdef USE_KQUEUE - void SocketEventsKqueue(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif -#ifdef USE_EPOLL - void SocketEventsEpoll(std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif -#ifdef USE_POLL - void SocketEventsPoll(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); -#endif - void SocketEventsSelect(const std::vector& nodes, - std::set& recv_set, - std::set& send_set, - std::set& error_set, - bool only_poll); + Sock::EventsPerSock GenerateWaitSockets(Span nodes); /** * Check connected and listening sockets for IO readiness and process them accordingly. @@ -1676,20 +1620,16 @@ friend class CNode; /** * Do the read/write for connected sockets that are ready for IO. - * @param[in] recv_set Sockets that are ready for read. - * @param[in] send_set Sockets that are ready for send. - * @param[in] error_set Sockets that have an exceptional condition (error). + * @param[in] events_per_sock Sockets that are ready for IO. */ - void SocketHandlerConnected(const std::set& recv_set, - const std::set& send_set, - const std::set& error_set) + void SocketHandlerConnected(const Sock::EventsPerSock& events_per_sock) EXCLUSIVE_LOCKS_REQUIRED(!m_total_bytes_sent_mutex, !mutexMsgProc); /** * Accept incoming connections, one from each read-ready listening socket. - * @param[in] recv_set Sockets that are ready for read. + * @param[in] events_per_sock Sockets that are ready for IO. */ - void SocketHandlerListening(const std::set& recv_set, CMasternodeSync& mn_sync) + void SocketHandlerListening(const Sock::EventsPerSock& events_per_sock, CMasternodeSync& mn_sync) EXCLUSIVE_LOCKS_REQUIRED(!mutexMsgProc); void ThreadSocketHandler(CMasternodeSync& mn_sync) @@ -1904,15 +1844,22 @@ friend class CNode; std::unique_ptr m_edge_trig_events{nullptr}; std::unique_ptr m_wakeup_pipe{nullptr}; - template - void ToggleWakeupPipe(Callable&& func) + SOCKET GetModeFileDescriptor() + { + if (m_edge_trig_events) { + return static_cast(m_edge_trig_events->GetFileDescriptor()); + } + return INVALID_SOCKET; + } + + SocketEventsParams::wrap_fn ToggleWakeupPipe = [&](std::function&& func) { if (m_wakeup_pipe) { m_wakeup_pipe->Toggle(func); } else { func(); } - } + }; Mutex cs_sendable_receivable_nodes; std::unordered_map mapReceivableNodes GUARDED_BY(cs_sendable_receivable_nodes); diff --git a/src/netbase.cpp b/src/netbase.cpp index 319bc7c9d88d..850044f00c3d 100644 --- a/src/netbase.cpp +++ b/src/netbase.cpp @@ -336,7 +336,7 @@ static IntrRecvError InterruptibleRecv(uint8_t* data, size_t len, int timeout, c // we're approaching the end of the specified total timeout const auto remaining = std::chrono::milliseconds{endTime - curTime}; const auto timeout = std::min(remaining, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - if (!sock.Wait(timeout, Sock::RECV)) { + if (!sock.Wait(timeout, Sock::RECV, SocketEventsParams{::g_socket_events_mode})) { return IntrRecvError::NetworkError; } } else { @@ -514,7 +514,7 @@ std::unique_ptr CreateSockOS(sa_family_t address_family) // Ensure that waiting for I/O on this socket won't result in undefined // behavior. - if (!sock->IsSelectable()) { + if (!sock->IsSelectable(/*is_select=*/::g_socket_events_mode == SocketEventsMode::Select)) { LogPrintf("Cannot create connection: non-selectable socket created (fd >= FD_SETSIZE ?)\n"); return nullptr; } @@ -572,7 +572,7 @@ static bool ConnectToSocket(const Sock& sock, struct sockaddr* sockaddr, socklen // synchronously to check for successful connection with a timeout. const Sock::Event requested = Sock::RECV | Sock::SEND; Sock::Event occurred; - if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, &occurred)) { + if (!sock.Wait(std::chrono::milliseconds{nConnectTimeout}, requested, SocketEventsParams{::g_socket_events_mode}, &occurred)) { LogPrintf("wait for connect to %s failed: %s\n", dest_str, NetworkErrorString(WSAGetLastError())); diff --git a/src/test/fuzz/util.cpp b/src/test/fuzz/util.cpp index 2f44a69561da..d772d0954f94 100644 --- a/src/test/fuzz/util.cpp +++ b/src/test/fuzz/util.cpp @@ -265,12 +265,12 @@ bool FuzzedSock::SetNonBlocking() const return true; } -bool FuzzedSock::IsSelectable() const +bool FuzzedSock::IsSelectable(bool is_select) const { return m_selectable; } -bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const { constexpr std::array wait_errnos{ EBADF, @@ -287,6 +287,15 @@ bool FuzzedSock::Wait(std::chrono::milliseconds timeout, Event requested, Event* return true; } +bool FuzzedSock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const +{ + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = m_fuzzed_data_provider.ConsumeBool() ? events.requested : 0; + } + return true; +} + bool FuzzedSock::IsConnected(std::string& errmsg) const { if (m_fuzzed_data_provider.ConsumeBool()) { diff --git a/src/test/fuzz/util.h b/src/test/fuzz/util.h index 5273ea4e3743..499a17b23fec 100644 --- a/src/test/fuzz/util.h +++ b/src/test/fuzz/util.h @@ -87,9 +87,11 @@ class FuzzedSock : public Sock bool SetNonBlocking() const override; - bool IsSelectable() const override; + bool IsSelectable(bool is_select) const override; - bool Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred = nullptr) const override; + bool Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred = nullptr) const override; + + bool WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const override; bool IsConnected(std::string& errmsg) const override; }; diff --git a/src/test/sock_tests.cpp b/src/test/sock_tests.cpp index 8376ec1a6847..7e7c9741e9be 100644 --- a/src/test/sock_tests.cpp +++ b/src/test/sock_tests.cpp @@ -121,7 +121,7 @@ BOOST_AUTO_TEST_CASE(wait) Sock sock0(s[0]); Sock sock1(s[1]); - std::thread waiter([&sock0]() { (void)sock0.Wait(24h, Sock::RECV); }); + std::thread waiter([&sock0]() { (void)sock0.Wait(24h, Sock::RECV, SocketEventsParams{::g_socket_events_mode}); }); BOOST_REQUIRE_EQUAL(sock1.Send("a", 1, 0), 1); diff --git a/src/test/util/net.h b/src/test/util/net.h index dd29d8675cc0..ab1d958546c8 100644 --- a/src/test/util/net.h +++ b/src/test/util/net.h @@ -198,10 +198,11 @@ class StaticContentsSock : public Sock bool SetNonBlocking() const override { return true; } - bool IsSelectable() const override { return true; } + bool IsSelectable(bool is_select) const override { return true; } bool Wait(std::chrono::milliseconds timeout, Event requested, + SocketEventsParams event_params, Event* occurred = nullptr) const override { if (occurred != nullptr) { @@ -210,6 +211,17 @@ class StaticContentsSock : public Sock return true; } + bool WaitMany(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams event_params) const override + { + for (auto& [sock, events] : events_per_sock) { + (void)sock; + events.occurred = events.requested; + } + return true; + } + private: const std::string m_contents; mutable size_t m_consumed; diff --git a/src/test/util/setup_common.cpp b/src/test/util/setup_common.cpp index 8e4b054caf3c..57b9c464e8d6 100644 --- a/src/test/util/setup_common.cpp +++ b/src/test/util/setup_common.cpp @@ -189,6 +189,15 @@ BasicTestingSetup::BasicTestingSetup(const std::string& chainName, const std::ve m_node.addrman = std::make_unique(*m_node.netgroupman, /*deterministic=*/false, m_node.args->GetIntArg("-checkaddrman", 0)); + + std::string sem_str = m_args.GetArg("-socketevents", DEFAULT_SOCKETEVENTS); + ::g_socket_events_mode = SEMFromString(sem_str); + if (::g_socket_events_mode == SocketEventsMode::Unknown) { + throw std::runtime_error( + strprintf("Invalid -socketevents ('%s') specified. Only these modes are supported: %s", + sem_str, GetSupportedSocketEventsStr())); + } + m_node.connman = std::make_unique(0x1337, 0x1337, *m_node.addrman, *m_node.netgroupman); // Deterministic randomness for tests. fCheckBlockIndex = true; @@ -209,6 +218,7 @@ BasicTestingSetup::~BasicTestingSetup() m_node.cpoolman.reset(); m_node.mnhf_manager.reset(); m_node.evodb.reset(); + ::g_socket_events_mode = SocketEventsMode::Unknown; m_node.connman.reset(); m_node.addrman.reset(); m_node.netgroupman.reset(); @@ -326,6 +336,7 @@ TestingSetup::TestingSetup(const std::string& chainName, const std::vectorInit(options); } diff --git a/src/util/sock.cpp b/src/util/sock.cpp index d379467b0eb0..29413b8e53ad 100644 --- a/src/util/sock.cpp +++ b/src/util/sock.cpp @@ -15,15 +15,34 @@ #include #include +#ifdef USE_EPOLL +#include +#endif + +#ifdef USE_KQUEUE +#include +#endif + #ifdef USE_POLL #include #endif +SocketEventsMode g_socket_events_mode{SocketEventsMode::Unknown}; + static inline bool IOErrorIsPermanent(int err) { return err != WSAEAGAIN && err != WSAEINTR && err != WSAEWOULDBLOCK && err != WSAEINPROGRESS; } +static inline bool IsSelectableSocket(const SOCKET& s, bool is_select) +{ +#if defined(WIN32) + return true; +#else + return is_select ? (s < FD_SETSIZE) : true; +#endif +} + Sock::Sock() : m_socket(INVALID_SOCKET) {} Sock::Sock(SOCKET s) : m_socket(s) {} @@ -131,89 +150,256 @@ bool Sock::SetNonBlocking() const return true; } -bool Sock::IsSelectable() const +bool Sock::IsSelectable(bool is_select) const { -#if defined(USE_POLL) || defined(WIN32) - return true; -#else - return m_socket < FD_SETSIZE; -#endif + return IsSelectableSocket(m_socket, is_select); } -bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, Event* occurred) const +bool Sock::Wait(std::chrono::milliseconds timeout, Event requested, SocketEventsParams event_params, Event* occurred) const { + EventsPerSock events_per_sock{std::make_pair(m_socket, Events{requested})}; + + // We need to ensure we are only using a level-triggered mode because we are expecting + // a direct correlation between the events reported and the one socket we are querying + if (auto [sem, _, __] = event_params; sem != SocketEventsMode::Poll && sem != SocketEventsMode::Select) { + // We will use a compatible fallback events mode if we didn't specify a valid option + event_params = SocketEventsParams{ #ifdef USE_POLL - pollfd fd; - fd.fd = m_socket; - fd.events = 0; - if (requested & RECV) { - fd.events |= POLLIN; + SocketEventsMode::Poll +#else + SocketEventsMode::Select +#endif /* USE_POLL */ + }; } - if (requested & SEND) { - fd.events |= POLLOUT; + if (!WaitMany(timeout, events_per_sock, event_params)) { + return false; } - if (poll(&fd, 1, count_milliseconds(timeout)) == SOCKET_ERROR) { + if (occurred != nullptr) { + *occurred = events_per_sock.begin()->second.occurred; + } + + return true; +} + +bool Sock::WaitMany(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) const +{ + return WaitManyInternal(timeout, events_per_sock, event_params); +} + +bool Sock::WaitManyInternal(std::chrono::milliseconds timeout, EventsPerSock& events_per_sock, SocketEventsParams event_params) +{ + switch (event_params.m_event_mode) + { +#ifdef USE_POLL + case SocketEventsMode::Poll: + return WaitManyPoll(timeout, events_per_sock, event_params.m_wrap_func); +#endif /* USE_POLL */ + case SocketEventsMode::Select: + return WaitManySelect(timeout, events_per_sock, event_params.m_wrap_func); +#ifdef USE_EPOLL + case SocketEventsMode::EPoll: + assert(event_params.m_event_fd != INVALID_SOCKET); + return WaitManyEPoll(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + case SocketEventsMode::KQueue: + assert(event_params.m_event_fd != INVALID_SOCKET); + return WaitManyKQueue(timeout, events_per_sock, event_params.m_event_fd, event_params.m_wrap_func); +#endif /* USE_KQUEUE */ + default: + assert(false); + } +} + +#ifdef USE_EPOLL +bool Sock::WaitManyEPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET epoll_fd, + SocketEventsParams::wrap_fn wrap_func) +{ + std::array events{}; + + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = epoll_wait(epoll_fd, events.data(), events.size(), count_milliseconds(timeout)); + }); + if (ret == SOCKET_ERROR) { return false; } - if (occurred != nullptr) { - *occurred = 0; - if (fd.revents & POLLIN) { - *occurred |= RECV; - } - if (fd.revents & POLLOUT) { - *occurred |= SEND; - } - if (fd.revents & (POLLERR | POLLHUP)) { - *occurred |= ERR; + // Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the + // entire map before populating it with our events data. + events_per_sock.clear(); + + for (int idx = 0; idx < ret; idx++) { + auto& ev = events[idx]; + Event occurred = 0; + if (ev.events & (EPOLLERR | EPOLLHUP)) { + occurred |= ERR; + } else { + if (ev.events & EPOLLIN) { + occurred |= RECV; + } + if (ev.events & EPOLLOUT) { + occurred |= SEND; + } } + events_per_sock.emplace(static_cast(ev.data.fd), Sock::Events{/*req=*/RECV | SEND, occurred}); } return true; -#else - if (!IsSelectable()) { +} +#endif /* USE_EPOLL */ + +#ifdef USE_KQUEUE +bool Sock::WaitManyKQueue(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET kqueue_fd, + SocketEventsParams::wrap_fn wrap_func) +{ + std::array events{}; + struct timespec ts = MillisToTimespec(timeout); + + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = kevent(kqueue_fd, nullptr, 0, events.data(), events.size(), &ts); + }); + if (ret == SOCKET_ERROR) { return false; } - fd_set fdset_recv; - fd_set fdset_send; - fd_set fdset_err; - FD_ZERO(&fdset_recv); - FD_ZERO(&fdset_send); - FD_ZERO(&fdset_err); + // Events reported do not correspond to sockets requested in edge-triggered modes, we will clear the + // entire map before populating it with our events data. + events_per_sock.clear(); + + for (int idx = 0; idx < ret; idx++) { + auto& ev = events[idx]; + Event occurred = 0; + if (ev.flags & (EV_ERROR | EV_EOF)) { + occurred |= ERR; + } else { + if (ev.filter == EVFILT_READ) { + occurred |= RECV; + } + if (ev.filter == EVFILT_WRITE) { + occurred |= SEND; + } + } + if (auto it = events_per_sock.find(static_cast(ev.ident)); it != events_per_sock.end()) { + it->second.occurred |= occurred; + } else { + events_per_sock.emplace(static_cast(ev.ident), Sock::Events{/*req=*/RECV | SEND, occurred}); + } + } + + return true; +} +#endif /* USE_KQUEUE */ + +#ifdef USE_POLL +bool Sock::WaitManyPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func) +{ + if (events_per_sock.empty()) return true; + + std::vector pfds; + for (const auto& [socket, events] : events_per_sock) { + pfds.emplace_back(); + auto& pfd = pfds.back(); + pfd.fd = socket; + if (events.requested & RECV) { + pfd.events |= POLLIN; + } + if (events.requested & SEND) { + pfd.events |= POLLOUT; + } + } - if (requested & RECV) { - FD_SET(m_socket, &fdset_recv); + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = poll(pfds.data(), pfds.size(), count_milliseconds(timeout)); + }); + if (ret == SOCKET_ERROR) { + return false; } - if (requested & SEND) { - FD_SET(m_socket, &fdset_send); + assert(pfds.size() == events_per_sock.size()); + size_t i{0}; + for (auto& [socket, events] : events_per_sock) { + assert(socket == static_cast(pfds[i].fd)); + events.occurred = 0; + if (pfds[i].revents & POLLIN) { + events.occurred |= RECV; + } + if (pfds[i].revents & POLLOUT) { + events.occurred |= SEND; + } + if (pfds[i].revents & (POLLERR | POLLHUP)) { + events.occurred |= ERR; + } + ++i; } - FD_SET(m_socket, &fdset_err); + return true; +} +#endif /* USE_POLL */ + +bool Sock::WaitManySelect(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func) +{ + if (events_per_sock.empty()) return true; + + fd_set recv; + fd_set send; + fd_set err; + FD_ZERO(&recv); + FD_ZERO(&send); + FD_ZERO(&err); + SOCKET socket_max{0}; + + for (const auto& [sock, events] : events_per_sock) { + if (!IsSelectableSocket(sock, /*is_select=*/true)) { + return false; + } + const auto& s = sock; + if (events.requested & RECV) { + FD_SET(s, &recv); + } + if (events.requested & SEND) { + FD_SET(s, &send); + } + FD_SET(s, &err); + socket_max = std::max(socket_max, s); + } - timeval timeout_struct = MillisToTimeval(timeout); + timeval tv = MillisToTimeval(timeout); - if (select(m_socket + 1, &fdset_recv, &fdset_send, &fdset_err, &timeout_struct) == SOCKET_ERROR) { + int ret{SOCKET_ERROR}; + wrap_func([&](){ + ret = select(socket_max + 1, &recv, &send, &err, &tv); + }); + if (ret == SOCKET_ERROR) { return false; } - if (occurred != nullptr) { - *occurred = 0; - if (FD_ISSET(m_socket, &fdset_recv)) { - *occurred |= RECV; + for (auto& [sock, events] : events_per_sock) { + const auto& s = sock; + events.occurred = 0; + if (FD_ISSET(s, &recv)) { + events.occurred |= RECV; } - if (FD_ISSET(m_socket, &fdset_send)) { - *occurred |= SEND; + if (FD_ISSET(s, &send)) { + events.occurred |= SEND; } - if (FD_ISSET(m_socket, &fdset_err)) { - *occurred |= ERR; + if (FD_ISSET(s, &err)) { + events.occurred |= ERR; } } return true; -#endif /* USE_POLL */ } void Sock::SendComplete(const std::string& data, @@ -253,7 +439,7 @@ void Sock::SendComplete(const std::string& data, // Wait for a short while (or the socket to become ready for sending) before retrying // if nothing was sent. const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - (void)Wait(wait_time, SEND); + (void)Wait(wait_time, SEND, SocketEventsParams{::g_socket_events_mode}); } } @@ -336,7 +522,7 @@ std::string Sock::RecvUntilTerminator(uint8_t terminator, // Wait for a short while (or the socket to become ready for reading) before retrying. const auto wait_time = std::min(deadline - now, std::chrono::milliseconds{MAX_WAIT_FOR_IO}); - (void)Wait(wait_time, RECV); + (void)Wait(wait_time, RECV, SocketEventsParams{::g_socket_events_mode}); } } diff --git a/src/util/sock.h b/src/util/sock.h index fbd1909d60f9..5ecf9461716a 100644 --- a/src/util/sock.h +++ b/src/util/sock.h @@ -10,8 +10,20 @@ #include #include +#include #include #include +#include + +#if defined(USE_EPOLL) +#define DEFAULT_SOCKETEVENTS "epoll" +#elif defined(USE_KQUEUE) +#define DEFAULT_SOCKETEVENTS "kqueue" +#elif defined(USE_POLL) +#define DEFAULT_SOCKETEVENTS "poll" +#else +#define DEFAULT_SOCKETEVENTS "select" +#endif /** * Maximum time to wait for I/O readiness. @@ -19,6 +31,8 @@ */ static constexpr auto MAX_WAIT_FOR_IO = 1s; +static constexpr size_t MAX_EVENTS = 64; + enum class SocketEventsMode : int8_t { Select = 0, Poll = 1, @@ -28,6 +42,34 @@ enum class SocketEventsMode : int8_t { Unknown = -1 }; +struct SocketEventsParams +{ + using wrap_fn = std::function&&)>; + + SocketEventsParams() = delete; + SocketEventsParams(SocketEventsMode event_mode) : + m_event_mode{event_mode} + { + assert(m_event_mode != SocketEventsMode::Unknown); + } + SocketEventsParams(SocketEventsMode event_mode, SOCKET event_fd, wrap_fn wrap_func) : + m_event_mode{event_mode}, + m_event_fd{event_fd}, + m_wrap_func{wrap_func} + { + assert(m_event_mode != SocketEventsMode::Unknown); + } + ~SocketEventsParams() = default; + +public: + /* Choice of API to use in Sock::Wait{,Many}() */ + SocketEventsMode m_event_mode{SocketEventsMode::Unknown}; + /* File descriptor for event triggered SEMs (and INVALID_SOCKET for the rest) */ + SOCKET m_event_fd{INVALID_SOCKET}; + /* Function that wraps itself around WakeMany()'s API call */ + wrap_fn m_wrap_func{[](std::function&& func){func();}}; +}; + /* Converts SocketEventsMode value to string with additional check to report modes not compiled for as unknown */ constexpr std::string_view SEMToString(const SocketEventsMode val) { switch (val) { @@ -50,6 +92,21 @@ constexpr std::string_view SEMToString(const SocketEventsMode val) { }; } +constexpr std::string_view GetSupportedSocketEventsStr() +{ + return "'select'" +#ifdef USE_POLL + ", 'poll'" +#endif /* USE_POLL */ +#ifdef USE_EPOLL + ", 'epoll'" +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + ", 'kqueue'" +#endif /* USE_KQUEUE */ + ; +} + /* Converts string to SocketEventsMode value with additional check to report modes not compiled for as unknown */ constexpr SocketEventsMode SEMFromString(std::string_view str) { if (str == "select") { return SocketEventsMode::Select; } @@ -188,7 +245,7 @@ class Sock * Check if the underlying socket can be used for `select(2)` (or the `Wait()` method). * @return true if selectable */ - [[nodiscard]] virtual bool IsSelectable() const; + [[nodiscard]] virtual bool IsSelectable(bool is_select) const; using Event = uint8_t; @@ -212,6 +269,7 @@ class Sock * Wait for readiness for input (recv) or output (send). * @param[in] timeout Wait this much for at least one of the requested events to occur. * @param[in] requested Wait for those events, bitwise-or of `RECV` and `SEND`. + * @param[in] event_params Wait using the API specified. * @param[out] occurred If not nullptr and the function returns `true`, then this * indicates which of the requested events occurred (`ERR` will be added, even if * not requested, if an exceptional event occurs on the socket). @@ -220,7 +278,80 @@ class Sock */ [[nodiscard]] virtual bool Wait(std::chrono::milliseconds timeout, Event requested, + SocketEventsParams event_params, Event* occurred = nullptr) const; + /** + * Auxiliary requested/occurred events to wait for in `WaitMany()`. + */ + struct Events { + explicit Events(Event req, Event ocr = 0) : requested{req}, occurred{ocr} {} + Event requested; + Event occurred; + }; + + /** + * On which socket to wait for what events in `WaitMany()`. + * + * Bitcoin: + * The `shared_ptr` is copied into the map to ensure that the `Sock` object + * is not destroyed (its destructor would close the underlying socket). + * If this happens shortly before or after we call `poll(2)` and a new + * socket gets created under the same file descriptor number then the report + * from `WaitMany()` will be bogus. + * + * Dash: + * The raw `SOCKET` file descriptor is copied into the map (generally taken from + * Sock::Get()) to allow sockets managed by external logic (e.g. WakeupPipes) to + * be used without wrapping it into a Sock object and risk handing control over. + */ + using EventsPerSock = std::unordered_map; + + /** + * Same as `Wait()`, but wait on many sockets within the same timeout. + * @param[in] timeout Wait this long for at least one of the requested events to occur. + * @param[in,out] events_per_sock Wait for the requested events on these sockets and set + * `occurred` for the events that actually occurred. + * @return true on success (or timeout, if all `what[].occurred` are returned as 0), + * false otherwise + */ + [[nodiscard]] virtual bool WaitMany(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams event_params) const; + + /** + * As an EventsPerSock map no longer contains a Sock object (it now contains the raw SOCKET file + * descriptor), we lose access to all the logic implemented in Sock. Except that as WaitMany + * doesn't interact with the raw socket stored within Sock, it can be safely declared as static + * and we can pass all other parameters as arguments as it should ordinarily remain the same for + * entire runtime duration of the program. We keep around the virtual WaitMany to allow mockability + * in tests, so keep in mind that using WaitManyInternal *bypasses* any override for WaitMany. + * + * This doesn't apply to Sock::Wait(), as it populates an EventsPerSock map with its own raw + * socket before passing it to WaitMany. + */ + static bool WaitManyInternal(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams event_params); +#ifdef USE_EPOLL + static bool WaitManyEPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET epoll_fd, + SocketEventsParams::wrap_fn wrap_func); +#endif /* USE_EPOLL */ +#ifdef USE_KQUEUE + static bool WaitManyKQueue(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SOCKET kqueue_fd, + SocketEventsParams::wrap_fn wrap_func); +#endif /* USE_KQUEUE */ +#ifdef USE_POLL + static bool WaitManyPoll(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func); +#endif /* USE_POLL */ + static bool WaitManySelect(std::chrono::milliseconds timeout, + EventsPerSock& events_per_sock, + SocketEventsParams::wrap_fn wrap_func); /* Higher level, convenience, methods. These may throw. */ @@ -276,4 +407,6 @@ class Sock /** Return readable error string for a network error code */ std::string NetworkErrorString(int err); +extern SocketEventsMode g_socket_events_mode; + #endif // BITCOIN_UTIL_SOCK_H diff --git a/src/util/time.cpp b/src/util/time.cpp index acfe845d25d0..e04f43ae6151 100644 --- a/src/util/time.cpp +++ b/src/util/time.cpp @@ -108,6 +108,14 @@ int64_t ParseISO8601DateTime(const std::string& str) return (ptime - epoch).total_seconds(); } +struct timespec MillisToTimespec(int64_t nTimeout) +{ + struct timespec timeout; + timeout.tv_sec = nTimeout / 1000; + timeout.tv_nsec = (nTimeout % 1000) * 1000 * 1000; + return timeout; +} + struct timeval MillisToTimeval(int64_t nTimeout) { struct timeval timeout; @@ -116,6 +124,11 @@ struct timeval MillisToTimeval(int64_t nTimeout) return timeout; } +struct timespec MillisToTimespec(std::chrono::milliseconds ms) +{ + return MillisToTimespec(count_milliseconds(ms)); +} + struct timeval MillisToTimeval(std::chrono::milliseconds ms) { return MillisToTimeval(count_milliseconds(ms)); diff --git a/src/util/time.h b/src/util/time.h index 7334293c60c7..d1fca85e85a6 100644 --- a/src/util/time.h +++ b/src/util/time.h @@ -116,10 +116,12 @@ int64_t ParseISO8601DateTime(const std::string& str); * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(int64_t nTimeout); +struct timespec MillisToTimespec(int64_t nTimeout); /** * Convert milliseconds to a struct timeval for e.g. select. */ struct timeval MillisToTimeval(std::chrono::milliseconds ms); +struct timespec MillisToTimespec(std::chrono::milliseconds ms); #endif // BITCOIN_UTIL_TIME_H diff --git a/src/util/wpipe.cpp b/src/util/wpipe.cpp index e3e3979256c1..b04c323af077 100644 --- a/src/util/wpipe.cpp +++ b/src/util/wpipe.cpp @@ -21,15 +21,17 @@ WakeupPipe::WakeupPipe(EdgeTriggeredEvents* edge_trig_events) } for (size_t idx = 0; idx < m_pipe.size(); idx++) { int flags = fcntl(m_pipe[idx], F_GETFL, 0); - if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == -1) { + if (fcntl(m_pipe[idx], F_SETFL, flags | O_NONBLOCK) == SOCKET_ERROR) { LogPrintf("Unable to initialize WakeupPipe, fcntl for O_NONBLOCK on m_pipe[%d] failed with error %s\n", idx, NetworkErrorString(WSAGetLastError())); + Close(); return; } } if (edge_trig_events && !edge_trig_events->RegisterPipe(m_pipe[0])) { LogPrintf("Unable to initialize WakeupPipe, EdgeTriggeredEvents::RegisterPipe() failed for m_pipe[0] = %d\n", m_pipe[0]); + Close(); return; } m_valid = true; @@ -42,22 +44,33 @@ WakeupPipe::~WakeupPipe() { if (m_valid) { #ifdef USE_WAKEUP_PIPE + Drain(); if (m_edge_trig_events && !m_edge_trig_events->UnregisterPipe(m_pipe[0])) { LogPrintf("Destroying WakeupPipe instance, EdgeTriggeredEvents::UnregisterPipe() failed for m_pipe[0] = %d\n", m_pipe[0]); } - for (size_t idx = 0; idx < m_pipe.size(); idx++) { - if (close(m_pipe[idx]) != 0) { - LogPrintf("Destroying WakeupPipe instance, close() failed for m_pipe[%d] = %d with error %s\n", - idx, m_pipe[idx], NetworkErrorString(WSAGetLastError())); - } - } + Close(); #else assert(false); #endif /* USE_WAKEUP_PIPE */ } } +void WakeupPipe::Close() +{ +#ifdef USE_WAKEUP_PIPE + for (size_t idx{0}; idx < m_pipe.size(); idx++) { + if (m_pipe[idx] != -1 && close(m_pipe[idx]) != 0) { + LogPrintf("close() failed for m_pipe[%d] = %d with error %s\n", idx, m_pipe[idx], + NetworkErrorString(WSAGetLastError())); + } + m_pipe[idx] = -1; + } +#else + assert(false); +#endif /* USE_WAKEUP_PIPE */ +} + void WakeupPipe::Drain() const { #ifdef USE_WAKEUP_PIPE @@ -80,7 +93,7 @@ void WakeupPipe::Write() std::array buf{}; int ret = write(m_pipe[1], buf.data(), buf.size()); - if (ret == -1) { + if (ret == SOCKET_ERROR) { LogPrintf("write() to m_pipe[1] = %d failed with error %s\n", m_pipe[1], NetworkErrorString(WSAGetLastError())); } if (ret != EXPECTED_PIPE_WRITTEN_BYTES) { diff --git a/src/util/wpipe.h b/src/util/wpipe.h index 30912149b48d..e936899ea71c 100644 --- a/src/util/wpipe.h +++ b/src/util/wpipe.h @@ -21,6 +21,10 @@ class EdgeTriggeredEvents; */ class WakeupPipe { +private: + /* Iterate through m_pipe and ::close() them */ + void Close(); + public: explicit WakeupPipe(EdgeTriggeredEvents* edge_trig_events); ~WakeupPipe();