-
Notifications
You must be signed in to change notification settings - Fork 5.4k
network: cleanup connection logic and crash when fd cannot be allocated #237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,13 +19,12 @@ ConnectionImpl::ConnectionImpl(Event::DispatcherImpl& dispatcher, int fd, | |
| : filter_manager_(*this, *this), remote_address_(remote_address), dispatcher_(dispatcher), | ||
| fd_(fd), id_(++next_global_id_) { | ||
|
|
||
| // Treat the lack of a valid fd (which in practice only happens if we run out of FDs) as an OOM | ||
| // condition and just crash. | ||
| RELEASE_ASSERT(fd_ != -1); | ||
|
|
||
| file_event_ = | ||
| dispatcher_.createFileEvent(fd_, [this](uint32_t events) -> void { onFileEvent(events); }); | ||
| if (fd_ == -1) { | ||
| // Can't obtain a socket. | ||
| state_ |= InternalState::ImmediateConnectionError; | ||
| file_event_->activate(Event::FileReadyType::Write); | ||
| } | ||
|
|
||
| read_buffer_.setCallback([this](uint64_t old_size, int64_t delta) -> void { | ||
| onBufferChange(ConnectionBufferType::Read, old_size, delta); | ||
|
|
@@ -69,7 +68,7 @@ void ConnectionImpl::close(ConnectionCloseType type) { | |
| doWriteToSocket(); | ||
| } | ||
|
|
||
| doLocalClose(); | ||
| closeSocket(ConnectionEvent::LocalClose); | ||
| } else { | ||
| ASSERT(type == ConnectionCloseType::FlushWrite); | ||
| state_ |= InternalState::CloseWithFlush; | ||
|
|
@@ -87,9 +86,12 @@ Connection::State ConnectionImpl::state() { | |
| } | ||
| } | ||
|
|
||
| void ConnectionImpl::closeSocket() { | ||
| ASSERT(fd_ != -1 || (state_ & InternalState::ImmediateConnectionError)); | ||
| conn_log_debug("closing socket", *this); | ||
| void ConnectionImpl::closeSocket(uint32_t close_type) { | ||
| if (fd_ == -1) { | ||
| return; | ||
| } | ||
|
|
||
| conn_log_debug("closing socket: {}", *this, close_type); | ||
|
|
||
| // Drain input and output buffers so that callbacks get fired. This does not happen automatically | ||
| // as part of destruction. | ||
|
|
@@ -109,15 +111,8 @@ void ConnectionImpl::closeSocket() { | |
| file_event_.reset(); | ||
| ::close(fd_); | ||
| fd_ = -1; | ||
| } | ||
|
|
||
| void ConnectionImpl::doLocalClose() { | ||
| conn_log_debug("doing local close", *this); | ||
| closeSocket(); | ||
|
|
||
| // We expect our owner to deal with freeing us in whatever way makes sense. We raise an event | ||
| // to kick that off. | ||
| raiseEvents(ConnectionEvent::LocalClose); | ||
| raiseEvents(close_type); | ||
| } | ||
|
|
||
| Event::Dispatcher& ConnectionImpl::dispatcher() { return dispatcher_; } | ||
|
|
@@ -232,8 +227,7 @@ void ConnectionImpl::onFileEvent(uint32_t events) { | |
|
|
||
| if (state_ & InternalState::ImmediateConnectionError) { | ||
| conn_log_debug("raising immediate connect error", *this); | ||
| closeSocket(); | ||
| raiseEvents(ConnectionEvent::RemoteClose); | ||
| closeSocket(ConnectionEvent::RemoteClose); | ||
| return; | ||
| } | ||
|
|
||
|
|
@@ -282,10 +276,9 @@ void ConnectionImpl::onReadReady() { | |
| onRead(); | ||
|
|
||
| // The read callback may have already closed the connection. | ||
| if (fd_ != -1 && action == PostIoAction::Close) { | ||
| if (action == PostIoAction::Close) { | ||
| conn_log_debug("remote close", *this); | ||
| closeSocket(); | ||
| raiseEvents(ConnectionEvent::RemoteClose); | ||
| closeSocket(ConnectionEvent::RemoteClose); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -326,8 +319,7 @@ void ConnectionImpl::onWriteReady() { | |
| onConnected(); | ||
| } else { | ||
| conn_log_debug("delayed connection error: {}", *this, error); | ||
| closeSocket(); | ||
| raiseEvents(ConnectionEvent::RemoteClose); | ||
| closeSocket(ConnectionEvent::RemoteClose); | ||
| return; | ||
| } | ||
| } | ||
|
|
@@ -336,13 +328,10 @@ void ConnectionImpl::onWriteReady() { | |
| // It is possible (though unlikely) for the connection to have already been closed during the | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this comment still relevant?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about deleting the comment, but it I figured I would leave it just in case something comes up later, since it took me about 4 hours to figure out what was happening in the first place. |
||
| // write callback. This can happen if we manage to complete the SSL handshake in the write | ||
| // callback, raise a connected event, and close the connection. | ||
| if (fd_ != -1) { | ||
| closeSocket(); | ||
| raiseEvents(ConnectionEvent::RemoteClose); | ||
| } | ||
| closeSocket(ConnectionEvent::RemoteClose); | ||
| } else if ((state_ & InternalState::CloseWithFlush) && write_buffer_.length() == 0) { | ||
| conn_log_debug("write flush complete", *this); | ||
| doLocalClose(); | ||
| closeSocket(ConnectionEvent::LocalClose); | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,14 @@ ListenerImpl::ListenerImpl(Event::DispatcherImpl& dispatcher, ListenSocket& sock | |
| if (!listener_) { | ||
| throw CreateListenerException(fmt::format("cannot listen on socket: {}", socket.name())); | ||
| } | ||
|
|
||
| evconnlistener_set_error_cb(listener_.get(), errorCallback); | ||
| } | ||
|
|
||
| void ListenerImpl::errorCallback(evconnlistener*, void*) { | ||
| // We should never get an error callback. This can happen if we run out of FDs or memory. In those | ||
| // cases just crash. | ||
| PANIC(fmt::format("listener accept failure: {}", strerror(errno))); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you 100% sure this never happens in normal operation? stat first?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm 99.999% sure this never happens. I will do a smoke test first. |
||
| } | ||
|
|
||
| void ListenerImpl::newConnection(int fd, sockaddr* addr) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| #include "common/network/listener_impl.h" | ||
| #include "common/stats/stats_impl.h" | ||
|
|
||
| #include "test/mocks/network/mocks.h" | ||
|
|
||
| using testing::_; | ||
| using testing::Invoke; | ||
|
|
||
| namespace Network { | ||
|
|
||
| static void errorCallbackTest() { | ||
| // Force the error callback to fire by closing the socket under the listener. We run this entire | ||
| // test in the forked process to avoid confusion when the fork happens. | ||
| Stats::IsolatedStoreImpl stats_store; | ||
| Event::DispatcherImpl dispatcher; | ||
| Network::TcpListenSocket socket(10000); | ||
| Network::MockListenerCallbacks listener_callbacks; | ||
| Network::ListenerPtr listener = | ||
| dispatcher.createListener(socket, listener_callbacks, stats_store, false); | ||
|
|
||
| Network::ClientConnectionPtr client_connection = | ||
| dispatcher.createClientConnection("tcp://127.0.0.1:10000"); | ||
| client_connection->connect(); | ||
|
|
||
| EXPECT_CALL(listener_callbacks, onNewConnection_(_)) | ||
| .WillOnce(Invoke([&](Network::ConnectionPtr& conn) -> void { | ||
| client_connection->close(ConnectionCloseType::NoFlush); | ||
| conn->close(ConnectionCloseType::NoFlush); | ||
| socket.close(); | ||
| })); | ||
|
|
||
| dispatcher.run(Event::Dispatcher::RunType::Block); | ||
| } | ||
|
|
||
| TEST(ListenerImplDeathTest, ErrorCallback) { | ||
| EXPECT_DEATH(errorCallbackTest(), ".*listener accept failure.*"); | ||
| } | ||
|
|
||
| } // Network |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it make sense to add a stat check for this first? or are you 100% sure it never happens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm 99.999% sure this never happens. I will do a smoke test first.