-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconnection.cpp
More file actions
279 lines (250 loc) · 9.15 KB
/
connection.cpp
File metadata and controls
279 lines (250 loc) · 9.15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
#include <stdexcept>
#include <utility>
#include <netinet/tcp.h>
#include <cstring>
#include <sstream>
#include <sys/epoll.h>
#include <sys/signalfd.h>
#include <signal.h>
#include <fcntl.h>
#include "connection.h"
//
// Tell eopll to start watching for events on the specified file descriptor
// args:
// :epoll_fd: the file descriptor for the poll
// :event_fd: the file descriptor we want events for
// :event_type: the event type(s) we are looking for
// :modify: Are we adding a new watch for a new file descriptor, or making a change
// to the event type of events being watched for on an exising wathc?
// Modifying the event type on a watch that doesn't exist, or vice versa
// will result in a runtime execption.
//
int epoll_watch(int epoll_fd, int event_fd, int event_type, bool modify=false)
{
int watch_type = modify ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = event_type;
ev.data.fd = event_fd;
return epoll_ctl(epoll_fd, watch_type, event_fd, &ev);
}
//
// Stop watching for events on the specified file descriptor
//
int epoll_delete(int epoll_fd, int event_fd)
{
epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.data.fd = event_fd;
return epoll_ctl(epoll_fd, EPOLL_CTL_DEL, event_fd, &ev);
}
//
// Set the file descriptor to non-blocking mode.
// This will mean that if we read/write to the fd when it is not ready
// an error will occur.
//
int setnonblocking(int fd)
{
return fcntl(fd, F_SETFL, O_NONBLOCK);
}
//
// Open up a non-blocking socket listening on the specified port.
// args:
// :port: the port to listen on
// :connection_queue_size: The maximim number of unanswered connections that
// OS will hold for us.
//
int setup_socket(unsigned int port, int connection_queue_size)
{
int sock_fd = throw_on_err(socket(AF_INET, SOCK_STREAM, 0), "create socket");
throw_on_err(setnonblocking(sock_fd), "make socket non-blocking");
int flag = 1;
setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof(flag));
sockaddr_in server_address;
memset(&server_address, 0, sizeof(server_address));
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(port);
throw_on_err(bind(sock_fd, (sockaddr *)&server_address,
sizeof(server_address)), "bind socket to address");
throw_on_err(listen(sock_fd, connection_queue_size), "set socket to listen");
return sock_fd;
}
//
// Block the default handlers for SIGINT and SIGQUIT and give us a file
// descriptor so that we can manually listen for these events on epoll
//
int setup_sig_fd()
{
sigset_t mask = block_signals();
return throw_on_err(signalfd(-1, &mask, 0), "signalfd");
}
//
// Set up an epoll file handle and tell it to listen for events from our
// open socket and the signals that we blocked.
//
int setup_epoll(int sock_fd, int sig_fd)
{
int epoll_fd = throw_on_err(epoll_create1(0), "epoll_create");
throw_on_err(epoll_watch(epoll_fd, sock_fd, EPOLLIN), "set up sock poll");
throw_on_err(epoll_watch(epoll_fd, sig_fd, EPOLLIN), "set up sigint poll");
return epoll_fd;
}
TcpConnectionQueue::TcpConnectionQueue(int port, int os_queue_size, int max_batch_size):
m_sock_fd(setup_socket(port, os_queue_size)),
m_sig_fd(setup_sig_fd()),
m_epoll_fd(setup_epoll(m_sock_fd, m_sig_fd)),
m_alive(true),
m_max_batch_size(max_batch_size)
{
m_epoll_buffer = new epoll_event[max_batch_size];
}
//
// Once this has been called, `is_alive()` will start to return false,
// and the response threadpool will be shutdown.
//
void TcpConnectionQueue::shutdown()
{
std::cerr << "Shutting down." << std::endl;
m_alive = false;
m_thread_pool.shutdown();
}
//
// This is called when we recieve an epoll event telling
// us that sock_fd has an incoming connection. Adds
// a watch to epoll to tell us when that conneciton
// has data available for reading.
//
void accept_connection(int sock_fd, int epoll_fd)
{
sockaddr_in incoming_address;
socklen_t address_size(sizeof(incoming_address));
int connection_fd = throw_on_err(
accept(sock_fd, (sockaddr *) &incoming_address, &address_size),
"accept(m_soc_fd)");
throw_on_err(setnonblocking(connection_fd),
"make incoming connection non-blocking");
throw_on_err(epoll_watch(epoll_fd, connection_fd, EPOLLIN),
"Add incoming connection to epoll");
}
//
// This is called when we revieve an event from epoll telling us that a
// connection is ready to recieve data.
//
// The response here is a bit more simplistic than in a real web server, as we
// assume that all the data can be sent in one go, which means that we have a
// lot less book keeping to do.
//
// The method is stateful and the behaviour is going to depend on the pending
// responses. If there is no response future for the connection, then we assume
// that the response has already been sent. We close the connection and
// remove it from the epoll watch-list. If we do find a resoponse future, then
// we remove it from the set of pending responses, extract its data and send
// them to the awaiting connection.
//
// If the future isn't ready, then the thread is going to block while it waits
// for the data. That would be really bad, as this one thread is dealing with
// all the incoming connections. This won't be a problem thouhgh because
// elsewhere in the code, we have been careful to only add the watch for this
// connection *after* the response data are ready to send.
//
void TcpConnectionQueue::send_if_ready(int connection_fd)
{
ResponseTable::accessor accessor;
bool found = m_pending_responses.find(accessor, connection_fd);
if(!found)
{
throw_on_err(epoll_delete(m_epoll_fd, connection_fd),
"Remove outgoing connection from epoll");
throw_on_err(close(connection_fd), "Close connection");
}
else if(accessor->second.valid())
{
auto response_str = static_cast<std::string>(*(accessor->second.get()));
m_pending_responses.erase(accessor);
accessor.release();
int numBytesToSend = response_str.size();
throw_on_err(send(connection_fd, response_str.c_str(), numBytesToSend, 0), "send");
}
else
{
throw std::runtime_error("Attempted to send a response that has already been served");
}
}
//
// If the connection is closed while the data was being prepared then we need
// to delete it from our set of pending responses. Apart from using up memory,
// having stale responses could cause really strange behaviour when a new
// connection comes in using the same file descriptor of an old connection.
//
void TcpConnectionQueue::delete_pending_response(int connection_fd)
{
ResponseTable::accessor accessor;
if(m_pending_responses.find(accessor, connection_fd))
{
m_pending_responses.erase(accessor);
}
throw_on_err(epoll_delete(m_epoll_fd, connection_fd), "removing closed connection from epoll");
}
std::vector<TcpConnectionQueue::connection_ptr> TcpConnectionQueue::handle_connections(int timeout_ms)
{
std::vector<TcpConnectionQueue::connection_ptr> connections;
int nfds = throw_on_err(
epoll_wait(m_epoll_fd, m_epoll_buffer, m_max_batch_size, timeout_ms),
"epoll_wait");
for(auto i = 0; i < nfds; ++i)
{
int event_type = m_epoll_buffer[i].events;
int event_fd = m_epoll_buffer[i].data.fd;
if(event_fd == m_sig_fd)
{
shutdown();
}
else if (event_fd == m_sock_fd)
{
accept_connection(m_sock_fd, m_epoll_fd);
}
else if(event_type & EPOLLIN)
{
throw_on_err(epoll_delete(m_epoll_fd, event_fd),
"Remove incoming connection from epoll");
connections.push_back(
connection_ptr(new IncomingConnection(event_fd, this)));
}
else if(event_type & EPOLLOUT)
{
send_if_ready(event_fd);
}
else if(event_type & EPOLLRDHUP)
{
delete_pending_response(event_fd);
}
}
return connections;
}
std::string TcpConnectionQueue::IncomingConnection::receive()
{
char msg_buffer[MAX_PACKET_SIZE];
int msg_size = recv(m_request_fd, msg_buffer, MAX_PACKET_SIZE, 0);
return std::string(msg_buffer, msg_size);
}
void TcpConnectionQueue::IncomingConnection::respond(std::function<std::shared_ptr<Response>(void)> &&response)
{
throw_on_err(epoll_watch(m_queue->m_epoll_fd, m_request_fd, EPOLLRDHUP),
"Add outgoing response to epoll");
{
ResponseTable::accessor accessor;
if(m_queue->m_pending_responses.insert(accessor, m_request_fd)) {
accessor->second = m_queue->m_thread_pool.submit([=](){
auto r = response();
throw_on_err(epoll_watch(m_queue->m_epoll_fd, m_request_fd, EPOLLOUT | EPOLLRDHUP, true),
"Add outgoing response to epoll");
return r;
});
}
else
{
throw std::runtime_error("Could not add response to outgoing queue");
}
}
}