diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index c5752ee7..1e27f96f 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -499,20 +499,18 @@ std::unique_ptr> ConnectStream(EventLoop& loop, int f kj::mv(init_client), connection.release(), /* destroy_connection= */ true); } -//! Given stream file descriptor and an init object, construct a new ProxyServer -//! object that handles requests from the stream calling the init object. Embed -//! the ProxyServer in a Connection object that is stored and erased if +//! Given stream and init objects, construct a new ProxyServer object that +//! handles requests from the stream by calling the init object. Embed the +//! ProxyServer in a Connection object that is stored and erased if //! disconnected. This should be called from the event loop thread. template -void ServeStream(EventLoop& loop, int fd, InitImpl& init) +void _Serve(EventLoop& loop, kj::Own&& stream, InitImpl& init) { - loop.m_incoming_connections.emplace_front(loop, - loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), - [&](Connection& connection) { - // Set owned to false so proxy object doesn't attempt to delete init - // object on disconnect/close. - return kj::heap>(&init, false, connection); - }); + loop.m_incoming_connections.emplace_front(loop, kj::mv(stream), [&](Connection& connection) { + // Set owned to false so proxy object doesn't attempt to delete init + // object on disconnect/close. + return kj::heap>(&init, false, connection); + }); auto it = loop.m_incoming_connections.begin(); it->onDisconnect([&loop, it] { loop.log() << "IPC server: socket disconnected."; @@ -520,6 +518,41 @@ void ServeStream(EventLoop& loop, int fd, InitImpl& init) }); } +//! Given connection receiver and an init object, handle incoming connections by +//! calling _Serve, to create ProxyServer objects and forward requests to the +//! init object. +template +void _Listen(EventLoop& loop, kj::Own&& listener, InitImpl& init) +{ + auto* ptr = listener.get(); + loop.m_task_set->add(ptr->accept().then(kj::mvCapture(kj::mv(listener), + [&loop, &init](kj::Own&& listener, kj::Own&& stream) { + _Serve(loop, kj::mv(stream), init); + _Listen(loop, kj::mv(listener), init); + }))); +} + +//! Given stream file descriptor and an init object, handle requests on the +//! stream by calling methods on the Init object. +template +void ServeStream(EventLoop& loop, int fd, InitImpl& init) +{ + _Serve( + loop, loop.m_io_context.lowLevelProvider->wrapSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), init); +} + +//! Given listening socket file descriptor and an init object, handle incoming +//! connections and requests by calling methods on the Init object. +template +void ListenConnections(EventLoop& loop, int fd, InitImpl& init) +{ + loop.sync([&]() { + _Listen(loop, + loop.m_io_context.lowLevelProvider->wrapListenSocketFd(fd, kj::LowLevelAsyncIoProvider::TAKE_OWNERSHIP), + init); + }); +} + extern thread_local ThreadContext g_thread_context; } // namespace mp