diff --git a/ci/tsqa/tests/test_origin_max_connections.py b/ci/tsqa/tests/test_origin_max_connections.py new file mode 100644 index 00000000000..c5bf41a3c37 --- /dev/null +++ b/ci/tsqa/tests/test_origin_max_connections.py @@ -0,0 +1,213 @@ +''' +Test the configure entry : proxy.config.http.origin_max_connections +''' + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import uuid +import socket +import requests +import tsqa.test_cases +import helpers +import thread +from multiprocessing import Pool +import SocketServer +import os + +log = logging.getLogger(__name__) + + +# TODO: seems like a useful shared class- either add to httpbin or some shared lib +class KAHandler(SocketServer.BaseRequestHandler): + '''SocketServer that returns the connection-id as the body + ''' + # class variable to set number of active sessions + alive_sessions = 0 + + def handle(self): + KAHandler.alive_sessions += 1 + # Receive the data in small chunks and retransmit it + conn_id = uuid.uuid4().hex + start = time.time() + while True: + data = self.request.recv(4096).strip() + if data: + log.info('Sending data back to the client: {uid}'.format(uid=conn_id)) + else: + log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start)) + break + body = conn_id + if 'timeout' in data: + print 'sleep for a long time!' + time.sleep(4) + else: + time.sleep(2) + resp = ('HTTP/1.1 200 OK\r\n' + 'Content-Length: {content_length}\r\n' + 'Content-Type: text/html; charset=UTF-8\r\n' + 'Connection: keep-alive\r\n' + 'X-Current-Sessions: {alive_sessions}\r\n' + '\r\n' + '{body}'.format(content_length=len(body), alive_sessions=KAHandler.alive_sessions, body=body)) + self.request.sendall(resp) + KAHandler.alive_sessions -= 1 + + +class TestKeepAlive_Origin_Max_connections(helpers.EnvironmentCase): + @classmethod + def setUpEnv(cls, env): + cls.traffic_server_host = '127.0.0.1' + cls.traffic_server_port = int(cls.configs['records.config']['CONFIG']['proxy.config.http.server_ports']) + cls.socket_server_port = int(tsqa.utils.bind_unused_port()[1]) + + log.info("socket_server_port = %d" % (cls.socket_server_port)) + cls.server = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port) + cls.server.start() + cls.server.ready.wait() + + cls.socket_server_port2 = int(tsqa.utils.bind_unused_port()[1]) + cls.server2 = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port2) + cls.server2.start() + cls.server2.ready.wait() + + queue_path = os.path.join(cls.environment.layout.sysconfdir, 'queue.conf') + with open(queue_path, 'w') as fh: + fh.write('CONFIG proxy.config.http.origin_max_connections_queue INT 2') + + noqueue_path = os.path.join(cls.environment.layout.sysconfdir, 'noqueue.conf') + with open(noqueue_path, 'w') as fh: + fh.write('CONFIG proxy.config.http.origin_max_connections_queue INT 0') + + cls.configs['remap.config'].add_line('map /other/queue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port2, queue_path)) + cls.configs['remap.config'].add_line('map /other/noqueue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port2, noqueue_path)) + cls.configs['remap.config'].add_line('map /other/ http://127.0.0.1:{0}'.format(cls.socket_server_port2)) + cls.configs['remap.config'].add_line('map /queue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port, queue_path)) + cls.configs['remap.config'].add_line('map /noqueue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port, noqueue_path)) + cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}'.format(cls.socket_server_port)) + + cls.configs['records.config']['CONFIG'].update({ + 'proxy.config.http.origin_max_connections': 1, + 'proxy.config.http.keep_alive_enabled_out': 1, + 'proxy.config.http.keep_alive_no_activity_timeout_out': 1, + 'proxy.config.http.transaction_active_timeout_out': 2, + 'proxy.config.http.connect_attempts_timeout': 2, + 'proxy.config.http.connect_attempts_rr_retries': 0, + 'proxy.config.exec_thread.limit': 1, + 'proxy.config.exec_thread.autoconfig': 0, + }) + + def _send_requests(self, total_requests, path='', other=False): + url = 'http://{0}:{1}/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) + url2 = 'http://{0}:{1}/other/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) + jobs = [] + jobs2 = [] + pool = Pool(processes=4) + for _ in xrange(0, total_requests): + jobs.append(pool.apply_async(requests.get, (url,))) + if other: + jobs2.append(pool.apply_async(requests.get, (url2,))) + + results = [] + results2 = [] + for j in jobs: + try: + results.append(j.get()) + except Exception as e: + results.append(e) + + for j in jobs2: + try: + results2.append(j.get()) + except Exception as e: + results2.append(e) + + return results, results2 + + + # TODO: enable after TS-4340 is merged + # and re-enable `other` for the remaining queueing tests + def tesst_origin_scoping(self): + '''Send 2 requests to loopback (on separate ports) and ensure that they run in parallel + ''' + results, results2 = self._send_requests(1, other=True) + + # TS-4340 + # ensure that the 2 origins (2 different ports on loopback) were running in parallel + for i in xrange(0, REQUEST_COUNT): + self.assertEqual(int(results[i].get().headers['X-Current-Sessions']), 2) + self.assertEqual(int(results2[i].get().headers['X-Current-Sessions']), 2) + + def test_origin_default_queueing(self): + '''By default we have no queue limit + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT) + + for x in xrange(0, REQUEST_COUNT): + self.assertEqual(results[x].status_code, 200) + #self.assertEqual(results2[x].status_code, 200) + + def test_origin_queueing(self): + '''If a queue is set, N requests are queued and the rest immediately fail + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='queue/') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + else: + fail += 1 + self.assertEqual(success, 3) + + def test_origin_queueing_timeouts(self): + '''Lets have some requests timeout and ensure that the queue is freed up + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='queue/timeout') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + print 'success', x + else: + fail += 1 + self.assertEqual(fail, 4) + + self.test_origin_queueing() + + def test_origin_no_queueing(self): + '''If the queue is set to 0, all requests past the max immediately fail + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='noqueue/') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + else: + fail += 1 + print 'results:', success, fail + self.assertEqual(success, 1) diff --git a/doc/admin-guide/files/records.config.en.rst b/doc/admin-guide/files/records.config.en.rst index 69308b26e85..55ac3744818 100644 --- a/doc/admin-guide/files/records.config.en.rst +++ b/doc/admin-guide/files/records.config.en.rst @@ -1217,6 +1217,15 @@ Origin Server Connect Attempts Limits the number of socket connections per origin server to the value specified. To enable, set to one (``1``). +.. ts:cv:: CONFIG proxy.config.http.origin_max_connections_queue INT -1 + :reloadable: + :overridable: + + Limits the number of requests to be queued when the :ts:cv:`proxy.config.http.origin_max_connections` is reached. + When disabled (``-1``) requests are will wait indefinitely for an available connection. When set to ``0`` all + requests past the :ts:cv:`proxy.config.http.origin_max_connections` will immediately fail. When set to ``>0`` + ATS will queue that many requests to go to the origin, any additional requests past the limit will immediately fail. + .. ts:cv:: CONFIG proxy.config.http.origin_min_keep_alive_connections INT 0 :reloadable: diff --git a/lib/ts/apidefs.h.in b/lib/ts/apidefs.h.in index ae366f1411c..cbd47a90ed1 100644 --- a/lib/ts/apidefs.h.in +++ b/lib/ts/apidefs.h.in @@ -694,6 +694,7 @@ typedef enum { TS_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES, TS_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY, TS_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT, + TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE, TS_CONFIG_LAST_ENTRY } TSOverridableConfigKey; diff --git a/lib/ts/ink_inet.cc b/lib/ts/ink_inet.cc index 7de24faafa8..9016bd12fe5 100644 --- a/lib/ts/ink_inet.cc +++ b/lib/ts/ink_inet.cc @@ -307,6 +307,26 @@ ats_ip_hash(sockaddr const *addr) return zret.i; } +uint64_t +ats_ip_port_hash(sockaddr const *addr) +{ + union md5sum { + uint64_t i; + uint16_t b[4]; + unsigned char c[16]; + } zret; + + zret.i = 0; + if (ats_is_ip4(addr)) { + zret.i = (static_cast(ats_ip4_addr_cast(addr)) << 16) | (ats_ip_port_cast(addr)); + } else if (ats_is_ip6(addr)) { + ink_code_md5(const_cast(ats_ip_addr8_cast(addr)), TS_IP6_SIZE, zret.c); + // now replace the bottom 16bits so we can account for the port. + zret.b[3] = ats_ip_port_cast(addr); + } + return zret.i; +} + int ats_ip_to_hex(sockaddr const *src, char *dst, size_t len) { diff --git a/lib/ts/ink_inet.h b/lib/ts/ink_inet.h index 1754bd69bad..8945de32473 100644 --- a/lib/ts/ink_inet.h +++ b/lib/ts/ink_inet.h @@ -1151,6 +1151,8 @@ int ats_ip_getbestaddrinfo(char const *name, ///< [in] Address name (IPv4, IPv6, */ uint32_t ats_ip_hash(sockaddr const *addr); +uint64_t ats_ip_port_hash(sockaddr const *addr); + /** Convert address to string as a hexidecimal value. The string is always nul terminated, the output string is clipped if @a dst is insufficient. diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index d448afe8a33..187f5fec554 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -473,6 +473,8 @@ static const RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.http.origin_max_connections", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} , + {RECT_CONFIG, "proxy.config.http.origin_max_connections_queue", RECD_INT, "-1", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} + , {RECT_CONFIG, "proxy.config.http.origin_min_keep_alive_connections", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} , {RECT_CONFIG, "proxy.config.http.attach_server_session_to_client", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-1]", RECA_NULL} diff --git a/plugins/experimental/ts_lua/ts_lua_http_config.c b/plugins/experimental/ts_lua/ts_lua_http_config.c index 5b995be55ad..2b89d305c03 100644 --- a/plugins/experimental/ts_lua/ts_lua_http_config.c +++ b/plugins/experimental/ts_lua/ts_lua_http_config.c @@ -114,6 +114,7 @@ typedef enum { TS_LUA_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES = TS_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES, TS_LUA_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY = TS_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY, TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT = TS_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT, + TS_LUA_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE = TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE, TS_LUA_CONFIG_LAST_ENTRY = TS_CONFIG_LAST_ENTRY, } TSLuaOverridableConfigKey; @@ -198,7 +199,9 @@ ts_lua_var_item ts_lua_http_config_vars[] = { TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ENABLE_REDIRECTION), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_NUMBER_OF_REDIRECTIONS), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY), - TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_LAST_ENTRY), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_LAST_ENTRY), }; // Needed to make sure we have the latest list of overridable http config vars when compiling diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc index 4da8f4d7108..eea2460ad22 100644 --- a/proxy/InkAPI.cc +++ b/proxy/InkAPI.cc @@ -7978,6 +7978,10 @@ _conf_to_memberp(TSOverridableConfigKey conf, OverridableHttpConfigParams *overr typ = OVERRIDABLE_TYPE_INT; ret = &overridableHttpConfig->attach_server_session_to_client; break; + case TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE: + typ = OVERRIDABLE_TYPE_INT; + ret = &overridableHttpConfig->origin_max_connections_queue; + break; // This helps avoiding compiler warnings, yet detect unhandled enum members. case TS_CONFIG_NULL: case TS_CONFIG_LAST_ENTRY: @@ -8488,6 +8492,8 @@ TSHttpTxnConfigFind(const char *name, int length, TSOverridableConfigKey *conf, cnf = TS_CONFIG_HTTP_CACHE_HEURISTIC_MIN_LIFETIME; else if (!strncmp(name, "proxy.config.http.cache.heuristic_max_lifetime", length)) cnf = TS_CONFIG_HTTP_CACHE_HEURISTIC_MAX_LIFETIME; + else if (!strncmp(name, "proxy.config.http.origin_max_connections_queue", length)) + cnf = TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE; break; case 'r': if (!strncmp(name, "proxy.config.http.insert_squid_x_forwarded_for", length)) diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc index d2b84e75c4d..1ee7ed5a187 100644 --- a/proxy/http/HttpConfig.cc +++ b/proxy/http/HttpConfig.cc @@ -891,6 +891,7 @@ HttpConfig::startup() HttpEstablishStaticConfigLongLong(c.max_websocket_connections, "proxy.config.http.websocket.max_number_of_connections"); HttpEstablishStaticConfigLongLong(c.oride.server_tcp_init_cwnd, "proxy.config.http.server_tcp_init_cwnd"); HttpEstablishStaticConfigLongLong(c.oride.origin_max_connections, "proxy.config.http.origin_max_connections"); + HttpEstablishStaticConfigLongLong(c.oride.origin_max_connections_queue, "proxy.config.http.origin_max_connections_queue"); HttpEstablishStaticConfigLongLong(c.origin_min_keep_alive_connections, "proxy.config.http.origin_min_keep_alive_connections"); HttpEstablishStaticConfigLongLong(c.oride.attach_server_session_to_client, "proxy.config.http.attach_server_session_to_client"); @@ -1162,6 +1163,13 @@ HttpConfig::reconfigure() params->max_websocket_connections = m_master.max_websocket_connections; params->oride.server_tcp_init_cwnd = m_master.oride.server_tcp_init_cwnd; params->oride.origin_max_connections = m_master.oride.origin_max_connections; + params->oride.origin_max_connections_queue = m_master.oride.origin_max_connections_queue; + // if origin_max_connections_queue is set without max_connections, it is meaningless, so we'll warn + if (params->oride.origin_max_connections_queue && + !(params->oride.origin_max_connections || params->origin_min_keep_alive_connections)) { + Warning("origin_max_connections_queue is set, but neither origin_max_connections nor origin_min_keep_alive_connections are " + "set, please correct your records.config"); + } params->origin_min_keep_alive_connections = m_master.origin_min_keep_alive_connections; params->oride.attach_server_session_to_client = m_master.oride.attach_server_session_to_client; diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h index 4f031493582..92ec3db6001 100644 --- a/proxy/http/HttpConfig.h +++ b/proxy/http/HttpConfig.h @@ -378,7 +378,7 @@ struct OverridableHttpConfigParams { cache_heuristic_min_lifetime(3600), cache_heuristic_max_lifetime(86400), cache_guaranteed_min_lifetime(0), cache_guaranteed_max_lifetime(31536000), cache_max_stale_age(604800), keep_alive_no_activity_timeout_in(115), keep_alive_no_activity_timeout_out(120), transaction_no_activity_timeout_in(30), transaction_no_activity_timeout_out(30), - transaction_active_timeout_out(0), origin_max_connections(0), attach_server_session_to_client(0), + transaction_active_timeout_out(0), origin_max_connections(0), origin_max_connections_queue(0), attach_server_session_to_client(0), connect_attempts_max_retries(0), connect_attempts_max_retries_dead_server(3), connect_attempts_rr_retries(3), connect_attempts_timeout(30), post_connect_attempts_timeout(1800), down_server_timeout(300), client_abort_threshold(10), freshness_fuzz_time(240), freshness_fuzz_min_time(0), max_cache_open_read_retries(-1), cache_open_read_retry_time(10), @@ -529,6 +529,7 @@ struct OverridableHttpConfigParams { MgmtInt transaction_no_activity_timeout_out; MgmtInt transaction_active_timeout_out; MgmtInt origin_max_connections; + MgmtInt origin_max_connections_queue; MgmtInt attach_server_session_to_client; diff --git a/proxy/http/HttpConnectionCount.cc b/proxy/http/HttpConnectionCount.cc index 74f06b4b79a..9f482a96431 100644 --- a/proxy/http/HttpConnectionCount.cc +++ b/proxy/http/HttpConnectionCount.cc @@ -25,3 +25,4 @@ ConnectionCount ConnectionCount::_connectionCount; +ConnectionCountQueue ConnectionCountQueue::_connectionCount; diff --git a/proxy/http/HttpConnectionCount.h b/proxy/http/HttpConnectionCount.h index af4d7f6780e..d1f36562c6a 100644 --- a/proxy/http/HttpConnectionCount.h +++ b/proxy/http/HttpConnectionCount.h @@ -26,6 +26,10 @@ #include "ts/ink_inet.h" #include "ts/ink_mutex.h" #include "ts/Map.h" +#include "ts/Diags.h" +#include "ts/INK_MD5.h" +#include "ts/ink_config.h" +#include "HttpProxyAPIEnums.h" #ifndef _HTTP_CONNECTION_COUNT_H_ #define _HTTP_CONNECTION_COUNT_H_ @@ -52,10 +56,10 @@ class ConnectionCount * @return Number of connections */ int - getCount(const IpEndpoint &addr) + getCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) { ink_mutex_acquire(&_mutex); - int count = _hostCount.get(ConnAddr(addr)); + int count = _hostCount.get(ConnAddr(addr, hostname_hash, match_type)); ink_mutex_release(&_mutex); return count; } @@ -66,9 +70,10 @@ class ConnectionCount * @param delta Default is +1, can be set to negative to decrement */ void - incrementCount(const IpEndpoint &addr, const int delta = 1) + incrementCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type, + const int delta = 1) { - ConnAddr caddr(addr); + ConnAddr caddr(addr, hostname_hash, match_type); ink_mutex_acquire(&_mutex); int count = _hostCount.get(caddr); _hostCount.put(caddr, count + delta); @@ -77,14 +82,35 @@ class ConnectionCount struct ConnAddr { IpEndpoint _addr; + INK_MD5 _hostname_hash; + TSServerSessionSharingMatchType _match_type; - ConnAddr() { ink_zero(_addr); } - ConnAddr(int x) + ConnAddr() : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) + { + ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(int x) : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) { ink_release_assert(x == 0); ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) + : _addr(addr), _hostname_hash(hostname_hash), _match_type(match_type) + { } - ConnAddr(const IpEndpoint &addr) : _addr(addr) {} + + ConnAddr(const IpEndpoint &addr, const char *hostname, TSServerSessionSharingMatchType match_type) + : _addr(addr), _match_type(match_type) + { + MD5Context md5_ctx; + md5_ctx.hash_immediate(_hostname_hash, static_cast(hostname), strlen(hostname)); + } + + operator bool() { return ats_is_ip(&_addr); } }; @@ -94,16 +120,70 @@ class ConnectionCount static uintptr_t hash(ConnAddr &addr) { - return (uintptr_t)ats_ip_hash(&addr._addr.sa); + if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + return (uintptr_t)ats_ip_port_hash(&addr._addr.sa); + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + return (uintptr_t)addr._hostname_hash.u64[0]; + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + return ((uintptr_t)ats_ip_port_hash(&addr._addr.sa) ^ (uintptr_t)addr._hostname_hash.u64[0]); + } else { + return 0; // they will never be equal() because of it returns false for NONE matches. + } } + static int equal(ConnAddr &a, ConnAddr &b) { - return ats_ip_addr_eq(&a._addr, &b._addr); + char addrbuf1[INET6_ADDRSTRLEN]; + char addrbuf2[INET6_ADDRSTRLEN]; + char md5buf1[33]; + char md5buf2[33]; + ink_code_md5_stringify(md5buf1, sizeof(md5buf1), reinterpret_cast(a._hostname_hash.u8)); + ink_code_md5_stringify(md5buf2, sizeof(md5buf2), reinterpret_cast(b._hostname_hash.u8)); + Debug("conn_count", "Comparing hostname hash %s dest %s match method %d to hostname hash %s dest %s match method %d", md5buf1, + ats_ip_nptop(&a._addr.sa, addrbuf1, sizeof(addrbuf1)), a._match_type, md5buf2, + ats_ip_nptop(&b._addr.sa, addrbuf2, sizeof(addrbuf2)), b._match_type); + + if (a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE) { + Debug("conn_count", "result = 0, a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE"); + return 0; + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + if (ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + if ((a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + if ((ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) && + (a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + + return 1; + } + } + + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + return 0; } }; -private: +protected: // Hide the constructor and copy constructor ConnectionCount() { ink_mutex_init(&_mutex, "ConnectionCountMutex"); } ConnectionCount(const ConnectionCount & /* x ATS_UNUSED */) {} @@ -113,4 +193,21 @@ class ConnectionCount ink_mutex _mutex; }; +class ConnectionCountQueue: public ConnectionCount { +public: + /** + * Static method to get the instance of the class + * @return Returns a pointer to the instance of the class + */ + static ConnectionCountQueue * + getInstance() + { + return &_connectionCount; + } +private: + static ConnectionCountQueue _connectionCount; +}; + + + #endif diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index da418e06ddc..f084c63722d 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -1652,6 +1652,7 @@ HttpSM::state_http_server_open(int event, void *data) UnixNetVConnection *server_vc = (UnixNetVConnection*)data; printf("client fd is :%d , server fd is %d\n",vc->con.fd, server_vc->con.fd); */ + session->attach_hostname(t_state.current.server->name); ats_ip_copy(&session->server_ip, &t_state.current.server->dst_addr); session->new_connection(static_cast(data)); session->state = HSS_ACTIVE; @@ -4748,12 +4749,41 @@ HttpSM::do_http_server_open(bool raw) if (t_state.txn_conf->origin_max_connections > 0) { ConnectionCount *connections = ConnectionCount::getInstance(); - char addrbuf[INET6_ADDRSTRLEN]; - if (connections->getCount((t_state.current.server->dst_addr)) >= t_state.txn_conf->origin_max_connections) { + INK_MD5 hostname_hash; + MD5Context md5_ctx; + md5_ctx.hash_immediate(hostname_hash, static_cast(t_state.current.server->name), + strlen(t_state.current.server->name)); + + ip_port_text_buffer addrbuf; + if (connections->getCount(t_state.current.server->dst_addr, hostname_hash, + (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) >= + t_state.txn_conf->origin_max_connections) { DebugSM("http", "[%" PRId64 "] over the number of connection for this host: %s", sm_id, - ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); + ats_ip_nptop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); ink_assert(pending_action == NULL); - pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + + // if we were previously queued, or the queue is disabled-- just reschedule + if (t_state.origin_request_queued || t_state.txn_conf->origin_max_connections_queue < 0) { + pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + return; + } else if (t_state.txn_conf->origin_max_connections_queue > 0) { // If we have a queue, lets see if there is a slot + ConnectionCountQueue *waiting_connections = ConnectionCountQueue::getInstance(); + // if there is space in the queue + if (waiting_connections->getCount(t_state.current.server->dst_addr, hostname_hash, + (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) < t_state.txn_conf->origin_max_connections_queue) { + t_state.origin_request_queued = true; + Debug("http", "[%" PRId64 "] queued for this host: %s", sm_id, + ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); + waiting_connections->incrementCount(t_state.current.server->dst_addr, hostname_hash, (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match, 1); + pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + } else { // the queue is full + t_state.current.state = HttpTransact::CONNECTION_ERROR; + call_transact_and_set_next_state(HttpTransact::HandleResponse); + } + } else { // the queue is set to 0 + t_state.current.state = HttpTransact::CONNECTION_ERROR; + call_transact_and_set_next_state(HttpTransact::HandleResponse); + } return; } } @@ -5168,6 +5198,19 @@ HttpSM::handle_post_failure() void HttpSM::handle_http_server_open() { + // if we were a queued request, we need to decrement the queue size-- as we got a connection + if (t_state.origin_request_queued) { + INK_MD5 hostname_hash; + MD5Context md5_ctx; + md5_ctx.hash_immediate(hostname_hash, static_cast(t_state.current.server->name), + strlen(t_state.current.server->name)); + + ConnectionCountQueue *waiting_connections = ConnectionCountQueue::getInstance(); + waiting_connections->incrementCount(t_state.current.server->dst_addr, hostname_hash, (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match, -1); + // The request is now not queued. This is important if the request will ever retry, the t_state is re-used + t_state.origin_request_queued = false; + } + // [bwyatt] applying per-transaction OS netVC options here // IFF they differ from the netVC's current options. // This should keep this from being redundant on a diff --git a/proxy/http/HttpServerSession.cc b/proxy/http/HttpServerSession.cc index 7a0db7b2a94..763c6b8c399 100644 --- a/proxy/http/HttpServerSession.cc +++ b/proxy/http/HttpServerSession.cc @@ -77,10 +77,11 @@ HttpServerSession::new_connection(NetVConnection *new_vc) if (enable_origin_connection_limiting == true) { if (connection_count == NULL) connection_count = ConnectionCount::getInstance(); - connection_count->incrementCount(server_ip); - char addrbuf[INET6_ADDRSTRLEN]; + connection_count->incrementCount(server_ip, hostname_hash, sharing_match); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] new connection, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } #ifdef LAZY_BUF_ALLOC read_buffer = new_empty_MIOBuffer(HTTP_SERVER_RESP_HDR_BUFFER_INDEX); @@ -139,13 +140,15 @@ HttpServerSession::do_io_close(int alerrno) // Check to see if we are limiting the number of connections // per host if (enable_origin_connection_limiting == true) { - if (connection_count->getCount(server_ip) > 0) { - connection_count->incrementCount(server_ip, -1); - char addrbuf[INET6_ADDRSTRLEN]; + if (connection_count->getCount(server_ip, hostname_hash, sharing_match) > 0) { + connection_count->incrementCount(server_ip, hostname_hash, sharing_match, -1); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] connection closed, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } else { - Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, connection_count->getCount(server_ip)); + Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } } diff --git a/proxy/http/HttpSessionManager.cc b/proxy/http/HttpSessionManager.cc index ae19ca1e337..2f3bd63d07c 100644 --- a/proxy/http/HttpSessionManager.cc +++ b/proxy/http/HttpSessionManager.cc @@ -171,8 +171,8 @@ ServerSessionPool::eventHandler(int event, void *data) // origin, then reset the timeouts on our end and do not close the connection if ((event == VC_EVENT_INACTIVITY_TIMEOUT || event == VC_EVENT_ACTIVE_TIMEOUT) && s->state == HSS_KA_SHARED && s->enable_origin_connection_limiting) { - bool connection_count_below_min = - s->connection_count->getCount(s->server_ip) <= http_config_params->origin_min_keep_alive_connections; + bool connection_count_below_min = s->connection_count->getCount(s->server_ip, s->hostname_hash, s->sharing_match) <= + http_config_params->origin_min_keep_alive_connections; if (connection_count_below_min) { Debug("http_ss", "[%" PRId64 "] [session_bucket] session received io notice [%s], " diff --git a/proxy/http/HttpTransact.h b/proxy/http/HttpTransact.h index df5b928d724..00046ca8ad5 100644 --- a/proxy/http/HttpTransact.h +++ b/proxy/http/HttpTransact.h @@ -840,6 +840,9 @@ class HttpTransact bool is_websocket; bool did_upgrade_succeed; + // Some queue info + bool origin_request_queued; + char *internal_msg_buffer; // out char *internal_msg_buffer_type; // out int64_t internal_msg_buffer_size; // out @@ -959,7 +962,7 @@ class HttpTransact first_dns_lookup(true), backdoor_request(false), cop_test_page(false), parent_params(NULL), cache_lookup_result(CACHE_LOOKUP_NONE), next_action(SM_ACTION_UNDEFINED), api_next_action(SM_ACTION_UNDEFINED), transact_return_point(NULL), post_remap_upgrade_return_point(NULL), upgrade_token_wks(NULL), is_upgrade_request(false), - is_websocket(false), did_upgrade_succeed(false), internal_msg_buffer(NULL), internal_msg_buffer_type(NULL), + is_websocket(false), did_upgrade_succeed(false), origin_request_queued(false), internal_msg_buffer(NULL), internal_msg_buffer_type(NULL), internal_msg_buffer_size(0), internal_msg_buffer_fast_allocator_size(-1), icp_lookup_success(false), scheme(-1), next_hop_scheme(scheme), orig_scheme(scheme), method(0), cause_of_death_errno(-UNKNOWN_INTERNAL_ERROR), client_request_time(UNDEFINED_TIME), request_sent_time(UNDEFINED_TIME), response_received_time(UNDEFINED_TIME),