From 9a1479764e06090b4d7c83aaf9a458758534f0d0 Mon Sep 17 00:00:00 2001 From: Brian Geffon Date: Thu, 3 Mar 2016 12:18:28 -0800 Subject: [PATCH 1/6] TS-4340: Fix origin max connections --- lib/ts/ink_inet.cc | 20 +++++++ lib/ts/ink_inet.h | 2 + proxy/http/HttpConnectionCount.h | 98 +++++++++++++++++++++++++++++--- proxy/http/HttpSM.cc | 14 ++++- proxy/http/HttpServerSession.cc | 19 ++++--- proxy/http/HttpSessionManager.cc | 4 +- 6 files changed, 135 insertions(+), 22 deletions(-) diff --git a/lib/ts/ink_inet.cc b/lib/ts/ink_inet.cc index 7de24faafa8..9016bd12fe5 100644 --- a/lib/ts/ink_inet.cc +++ b/lib/ts/ink_inet.cc @@ -307,6 +307,26 @@ ats_ip_hash(sockaddr const *addr) return zret.i; } +uint64_t +ats_ip_port_hash(sockaddr const *addr) +{ + union md5sum { + uint64_t i; + uint16_t b[4]; + unsigned char c[16]; + } zret; + + zret.i = 0; + if (ats_is_ip4(addr)) { + zret.i = (static_cast(ats_ip4_addr_cast(addr)) << 16) | (ats_ip_port_cast(addr)); + } else if (ats_is_ip6(addr)) { + ink_code_md5(const_cast(ats_ip_addr8_cast(addr)), TS_IP6_SIZE, zret.c); + // now replace the bottom 16bits so we can account for the port. + zret.b[3] = ats_ip_port_cast(addr); + } + return zret.i; +} + int ats_ip_to_hex(sockaddr const *src, char *dst, size_t len) { diff --git a/lib/ts/ink_inet.h b/lib/ts/ink_inet.h index 1754bd69bad..8945de32473 100644 --- a/lib/ts/ink_inet.h +++ b/lib/ts/ink_inet.h @@ -1151,6 +1151,8 @@ int ats_ip_getbestaddrinfo(char const *name, ///< [in] Address name (IPv4, IPv6, */ uint32_t ats_ip_hash(sockaddr const *addr); +uint64_t ats_ip_port_hash(sockaddr const *addr); + /** Convert address to string as a hexidecimal value. The string is always nul terminated, the output string is clipped if @a dst is insufficient. diff --git a/proxy/http/HttpConnectionCount.h b/proxy/http/HttpConnectionCount.h index af4d7f6780e..3fed8541a32 100644 --- a/proxy/http/HttpConnectionCount.h +++ b/proxy/http/HttpConnectionCount.h @@ -26,6 +26,10 @@ #include "ts/ink_inet.h" #include "ts/ink_mutex.h" #include "ts/Map.h" +#include "ts/Diags.h" +#include "ts/INK_MD5.h" +#include "ts/ink_config.h" +#include "HttpProxyAPIEnums.h" #ifndef _HTTP_CONNECTION_COUNT_H_ #define _HTTP_CONNECTION_COUNT_H_ @@ -52,10 +56,10 @@ class ConnectionCount * @return Number of connections */ int - getCount(const IpEndpoint &addr) + getCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) { ink_mutex_acquire(&_mutex); - int count = _hostCount.get(ConnAddr(addr)); + int count = _hostCount.get(ConnAddr(addr, hostname_hash, match_type)); ink_mutex_release(&_mutex); return count; } @@ -66,9 +70,10 @@ class ConnectionCount * @param delta Default is +1, can be set to negative to decrement */ void - incrementCount(const IpEndpoint &addr, const int delta = 1) + incrementCount(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type, + const int delta = 1) { - ConnAddr caddr(addr); + ConnAddr caddr(addr, hostname_hash, match_type); ink_mutex_acquire(&_mutex); int count = _hostCount.get(caddr); _hostCount.put(caddr, count + delta); @@ -77,14 +82,35 @@ class ConnectionCount struct ConnAddr { IpEndpoint _addr; + INK_MD5 _hostname_hash; + TSServerSessionSharingMatchType _match_type; - ConnAddr() { ink_zero(_addr); } - ConnAddr(int x) + ConnAddr() : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) + { + ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(int x) : _match_type(TS_SERVER_SESSION_SHARING_MATCH_NONE) { ink_release_assert(x == 0); ink_zero(_addr); + ink_zero(_hostname_hash); + } + + ConnAddr(const IpEndpoint &addr, const INK_MD5 &hostname_hash, TSServerSessionSharingMatchType match_type) + : _addr(addr), _hostname_hash(hostname_hash), _match_type(match_type) + { } - ConnAddr(const IpEndpoint &addr) : _addr(addr) {} + + ConnAddr(const IpEndpoint &addr, const char *hostname, TSServerSessionSharingMatchType match_type) + : _addr(addr), _match_type(match_type) + { + MD5Context md5_ctx; + md5_ctx.hash_immediate(_hostname_hash, static_cast(hostname), strlen(hostname)); + } + + operator bool() { return ats_is_ip(&_addr); } }; @@ -94,12 +120,66 @@ class ConnectionCount static uintptr_t hash(ConnAddr &addr) { - return (uintptr_t)ats_ip_hash(&addr._addr.sa); + if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + return (uintptr_t)ats_ip_port_hash(&addr._addr.sa); + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + return (uintptr_t)addr._hostname_hash.u64[0]; + } else if (addr._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + return ((uintptr_t)ats_ip_port_hash(&addr._addr.sa) ^ (uintptr_t)addr._hostname_hash.u64[0]); + } else { + return 0; // they will never be equal() because of it returns false for NONE matches. + } } + static int equal(ConnAddr &a, ConnAddr &b) { - return ats_ip_addr_eq(&a._addr, &b._addr); + char addrbuf1[INET6_ADDRSTRLEN]; + char addrbuf2[INET6_ADDRSTRLEN]; + char md5buf1[33]; + char md5buf2[33]; + ink_code_md5_stringify(md5buf1, sizeof(md5buf1), reinterpret_cast(a._hostname_hash.u8)); + ink_code_md5_stringify(md5buf2, sizeof(md5buf2), reinterpret_cast(b._hostname_hash.u8)); + Debug("conn_count", "Comparing hostname hash %s dest %s match method %d to hostname hash %s dest %s match method %d", md5buf1, + ats_ip_nptop(&a._addr.sa, addrbuf1, sizeof(addrbuf1)), a._match_type, md5buf2, + ats_ip_nptop(&b._addr.sa, addrbuf2, sizeof(addrbuf2)), b._match_type); + + if (a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE) { + Debug("conn_count", "result = 0, a._match_type != b._match_type || a._match_type == TS_SERVER_SESSION_SHARING_MATCH_NONE"); + return 0; + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP) { + if (ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_IP"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST) { + if ((a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 1; + } else { + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_HOST"); + return 0; + } + } + + if (a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH) { + if ((ats_ip_addr_port_eq(&a._addr.sa, &b._addr.sa)) && + (a._hostname_hash.u64[0] == b._hostname_hash.u64[0] && a._hostname_hash.u64[1] == b._hostname_hash.u64[1])) { + Debug("conn_count", "result = 1, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + + return 1; + } + } + + Debug("conn_count", "result = 0, a._match_type == TS_SERVER_SESSION_SHARING_MATCH_BOTH"); + return 0; } }; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index da418e06ddc..90d819b9df0 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -1652,6 +1652,7 @@ HttpSM::state_http_server_open(int event, void *data) UnixNetVConnection *server_vc = (UnixNetVConnection*)data; printf("client fd is :%d , server fd is %d\n",vc->con.fd, server_vc->con.fd); */ + session->attach_hostname(t_state.current.server->name); ats_ip_copy(&session->server_ip, &t_state.current.server->dst_addr); session->new_connection(static_cast(data)); session->state = HSS_ACTIVE; @@ -4748,10 +4749,17 @@ HttpSM::do_http_server_open(bool raw) if (t_state.txn_conf->origin_max_connections > 0) { ConnectionCount *connections = ConnectionCount::getInstance(); - char addrbuf[INET6_ADDRSTRLEN]; - if (connections->getCount((t_state.current.server->dst_addr)) >= t_state.txn_conf->origin_max_connections) { + INK_MD5 hostname_hash; + MD5Context md5_ctx; + md5_ctx.hash_immediate(hostname_hash, static_cast(t_state.current.server->name), + strlen(t_state.current.server->name)); + + ip_port_text_buffer addrbuf; + if (connections->getCount(t_state.current.server->dst_addr, hostname_hash, + (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) >= + t_state.txn_conf->origin_max_connections) { DebugSM("http", "[%" PRId64 "] over the number of connection for this host: %s", sm_id, - ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); + ats_ip_nptop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); ink_assert(pending_action == NULL); pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); return; diff --git a/proxy/http/HttpServerSession.cc b/proxy/http/HttpServerSession.cc index 7a0db7b2a94..763c6b8c399 100644 --- a/proxy/http/HttpServerSession.cc +++ b/proxy/http/HttpServerSession.cc @@ -77,10 +77,11 @@ HttpServerSession::new_connection(NetVConnection *new_vc) if (enable_origin_connection_limiting == true) { if (connection_count == NULL) connection_count = ConnectionCount::getInstance(); - connection_count->incrementCount(server_ip); - char addrbuf[INET6_ADDRSTRLEN]; + connection_count->incrementCount(server_ip, hostname_hash, sharing_match); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] new connection, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } #ifdef LAZY_BUF_ALLOC read_buffer = new_empty_MIOBuffer(HTTP_SERVER_RESP_HDR_BUFFER_INDEX); @@ -139,13 +140,15 @@ HttpServerSession::do_io_close(int alerrno) // Check to see if we are limiting the number of connections // per host if (enable_origin_connection_limiting == true) { - if (connection_count->getCount(server_ip) > 0) { - connection_count->incrementCount(server_ip, -1); - char addrbuf[INET6_ADDRSTRLEN]; + if (connection_count->getCount(server_ip, hostname_hash, sharing_match) > 0) { + connection_count->incrementCount(server_ip, hostname_hash, sharing_match, -1); + ip_port_text_buffer addrbuf; Debug("http_ss", "[%" PRId64 "] connection closed, ip: %s, count: %u", con_id, - ats_ip_ntop(&server_ip.sa, addrbuf, sizeof(addrbuf)), connection_count->getCount(server_ip)); + ats_ip_nptop(&server_ip.sa, addrbuf, sizeof(addrbuf)), + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } else { - Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, connection_count->getCount(server_ip)); + Error("[%" PRId64 "] number of connections should be greater than zero: %u", con_id, + connection_count->getCount(server_ip, hostname_hash, sharing_match)); } } diff --git a/proxy/http/HttpSessionManager.cc b/proxy/http/HttpSessionManager.cc index ae19ca1e337..2f3bd63d07c 100644 --- a/proxy/http/HttpSessionManager.cc +++ b/proxy/http/HttpSessionManager.cc @@ -171,8 +171,8 @@ ServerSessionPool::eventHandler(int event, void *data) // origin, then reset the timeouts on our end and do not close the connection if ((event == VC_EVENT_INACTIVITY_TIMEOUT || event == VC_EVENT_ACTIVE_TIMEOUT) && s->state == HSS_KA_SHARED && s->enable_origin_connection_limiting) { - bool connection_count_below_min = - s->connection_count->getCount(s->server_ip) <= http_config_params->origin_min_keep_alive_connections; + bool connection_count_below_min = s->connection_count->getCount(s->server_ip, s->hostname_hash, s->sharing_match) <= + http_config_params->origin_min_keep_alive_connections; if (connection_count_below_min) { Debug("http_ss", "[%" PRId64 "] [session_bucket] session received io notice [%s], " From 84e29a4fe8e8d3a42b329feb69dc7c4c3f99d41c Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Mon, 11 Apr 2016 15:03:16 -0700 Subject: [PATCH 2/6] TS-4340 Add tests for per-origin connection limits --- ci/tsqa/tests/test_origin_max_connections.py | 115 +++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 ci/tsqa/tests/test_origin_max_connections.py diff --git a/ci/tsqa/tests/test_origin_max_connections.py b/ci/tsqa/tests/test_origin_max_connections.py new file mode 100644 index 00000000000..48f06eb4b15 --- /dev/null +++ b/ci/tsqa/tests/test_origin_max_connections.py @@ -0,0 +1,115 @@ +''' +Test the configure entry : proxy.config.http.origin_max_connections +''' + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import logging +import uuid +import socket +import requests +import tsqa.test_cases +import helpers +import thread +from multiprocessing import Pool +import SocketServer + +log = logging.getLogger(__name__) + + +# TODO: seems like a useful shared class- either add to httpbin or some shared lib +class KAHandler(SocketServer.BaseRequestHandler): + '''SocketServer that returns the connection-id as the body + ''' + # class variable to set number of active sessions + alive_sessions = 0 + + def handle(self): + KAHandler.alive_sessions += 1 + # Receive the data in small chunks and retransmit it + conn_id = uuid.uuid4().hex + start = time.time() + while True: + data = self.request.recv(4096).strip() + if data: + log.info('Sending data back to the client: {uid}'.format(uid=conn_id)) + else: + log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start)) + break + body = conn_id + time.sleep(1) + resp = ('HTTP/1.1 200 OK\r\n' + 'Content-Length: {content_length}\r\n' + 'Content-Type: text/html; charset=UTF-8\r\n' + 'Connection: keep-alive\r\n' + 'X-Current-Sessions: {alive_sessions}\r\n' + '\r\n' + '{body}'.format(content_length=len(body), alive_sessions=KAHandler.alive_sessions, body=body)) + self.request.sendall(resp) + KAHandler.alive_sessions -= 1 + + +class TestKeepAlive_Origin_Max_connections(helpers.EnvironmentCase): + @classmethod + def setUpEnv(cls, env): + cls.traffic_server_host = '127.0.0.1' + cls.traffic_server_port = int(cls.configs['records.config']['CONFIG']['proxy.config.http.server_ports']) + cls.socket_server_port = int(tsqa.utils.bind_unused_port()[1]) + + log.info("socket_server_port = %d" % (cls.socket_server_port)) + cls.server = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port) + cls.server.start() + cls.server.ready.wait() + + cls.socket_server_port2 = int(tsqa.utils.bind_unused_port()[1]) + cls.server2 = tsqa.endpoint.SocketServerDaemon(KAHandler, port=cls.socket_server_port2) + cls.server2.start() + cls.server2.ready.wait() + + cls.configs['remap.config'].add_line('map /other/ http://127.0.0.1:{0}'.format(cls.socket_server_port2)) + cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}'.format(cls.socket_server_port)) + + cls.origin_keep_alive_timeout = 1 + + cls.configs['records.config']['CONFIG'].update({ + 'proxy.config.http.origin_max_connections': 1, + 'proxy.config.http.keep_alive_enabled_out': 1, + 'proxy.config.http.keep_alive_no_activity_timeout_out': cls.origin_keep_alive_timeout, + 'proxy.config.exec_thread.limit': 2, + 'proxy.config.exec_thread.autoconfig': 0, + }) + + + def test_max(self): + ''' + ''' + REQUEST_COUNT = 8 + url = 'http://{0}:{1}/'.format(self.traffic_server_host, self.traffic_server_port) + url2 = 'http://{0}:{1}/other/'.format(self.traffic_server_host, self.traffic_server_port) + results = [] + results2 = [] + pool = Pool(processes=4) + for _ in xrange(0, REQUEST_COUNT): + results.append(pool.apply_async(requests.get, (url,))) + results2.append(pool.apply_async(requests.get, (url2,))) + + # TS-4340 + # ensure that the 2 origins (2 different ports on loopback) were running in parallel + for i in xrange(0, REQUEST_COUNT): + self.assertEqual(int(results[i].get().headers['X-Current-Sessions']), 2) + self.assertEqual(int(results2[i].get().headers['X-Current-Sessions']), 2) From 38aa266f429fe10047214ed64072c5fa135e4060 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Mon, 11 Apr 2016 20:02:21 -0700 Subject: [PATCH 3/6] TS-4341 add `origin_max_connections_queue` if you have `origin_max_connections` set, it will limit the number of sessions sent to a given origin. Once the limit is hit ATS would infinitely queue requests waiting for the origin to respond. In the event that the origin is broken (and will stay so) it is better to error the pages, on the flip side if you want ATS to just limit the number of connections you don't want to just close the connections. To handle both of these I've introduced `origin_max_connections_queue`. This option limits the number of outstanding connections allowed to a given origin. A value <0 defaults to the old behavior (no limits). If set to 0 all requests past the limit are errored, and if the limit is >0 then that many requests are allowed to be outstanding to the origin. This closes #564 --- lib/ts/apidefs.h.in | 1 + mgmt/RecordsConfig.cc | 2 + .../experimental/ts_lua/ts_lua_http_config.c | 5 ++- proxy/InkAPI.cc | 6 +++ proxy/http/HttpConfig.cc | 8 ++++ proxy/http/HttpConfig.h | 3 +- proxy/http/HttpConnectionCount.cc | 1 + proxy/http/HttpConnectionCount.h | 19 +++++++++- proxy/http/HttpSM.cc | 37 ++++++++++++++++++- proxy/http/HttpTransact.h | 5 ++- 10 files changed, 82 insertions(+), 5 deletions(-) diff --git a/lib/ts/apidefs.h.in b/lib/ts/apidefs.h.in index ae366f1411c..cbd47a90ed1 100644 --- a/lib/ts/apidefs.h.in +++ b/lib/ts/apidefs.h.in @@ -694,6 +694,7 @@ typedef enum { TS_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES, TS_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY, TS_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT, + TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE, TS_CONFIG_LAST_ENTRY } TSOverridableConfigKey; diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index d448afe8a33..187f5fec554 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -473,6 +473,8 @@ static const RecordElement RecordsConfig[] = , {RECT_CONFIG, "proxy.config.http.origin_max_connections", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} , + {RECT_CONFIG, "proxy.config.http.origin_max_connections_queue", RECD_INT, "-1", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} + , {RECT_CONFIG, "proxy.config.http.origin_min_keep_alive_connections", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_STR, "^[0-9]+$", RECA_NULL} , {RECT_CONFIG, "proxy.config.http.attach_server_session_to_client", RECD_INT, "0", RECU_DYNAMIC, RR_NULL, RECC_INT, "[0-1]", RECA_NULL} diff --git a/plugins/experimental/ts_lua/ts_lua_http_config.c b/plugins/experimental/ts_lua/ts_lua_http_config.c index 5b995be55ad..2b89d305c03 100644 --- a/plugins/experimental/ts_lua/ts_lua_http_config.c +++ b/plugins/experimental/ts_lua/ts_lua_http_config.c @@ -114,6 +114,7 @@ typedef enum { TS_LUA_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES = TS_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES, TS_LUA_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY = TS_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY, TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT = TS_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT, + TS_LUA_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE = TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE, TS_LUA_CONFIG_LAST_ENTRY = TS_CONFIG_LAST_ENTRY, } TSLuaOverridableConfigKey; @@ -198,7 +199,9 @@ ts_lua_var_item ts_lua_http_config_vars[] = { TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ENABLE_REDIRECTION), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_NUMBER_OF_REDIRECTIONS), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY), - TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT), TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_LAST_ENTRY), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ATTACH_SERVER_SESSION_TO_CLIENT), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE), + TS_LUA_MAKE_VAR_ITEM(TS_LUA_CONFIG_LAST_ENTRY), }; // Needed to make sure we have the latest list of overridable http config vars when compiling diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc index 4da8f4d7108..eea2460ad22 100644 --- a/proxy/InkAPI.cc +++ b/proxy/InkAPI.cc @@ -7978,6 +7978,10 @@ _conf_to_memberp(TSOverridableConfigKey conf, OverridableHttpConfigParams *overr typ = OVERRIDABLE_TYPE_INT; ret = &overridableHttpConfig->attach_server_session_to_client; break; + case TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE: + typ = OVERRIDABLE_TYPE_INT; + ret = &overridableHttpConfig->origin_max_connections_queue; + break; // This helps avoiding compiler warnings, yet detect unhandled enum members. case TS_CONFIG_NULL: case TS_CONFIG_LAST_ENTRY: @@ -8488,6 +8492,8 @@ TSHttpTxnConfigFind(const char *name, int length, TSOverridableConfigKey *conf, cnf = TS_CONFIG_HTTP_CACHE_HEURISTIC_MIN_LIFETIME; else if (!strncmp(name, "proxy.config.http.cache.heuristic_max_lifetime", length)) cnf = TS_CONFIG_HTTP_CACHE_HEURISTIC_MAX_LIFETIME; + else if (!strncmp(name, "proxy.config.http.origin_max_connections_queue", length)) + cnf = TS_CONFIG_HTTP_ORIGIN_MAX_CONNECTIONS_QUEUE; break; case 'r': if (!strncmp(name, "proxy.config.http.insert_squid_x_forwarded_for", length)) diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc index d2b84e75c4d..1ee7ed5a187 100644 --- a/proxy/http/HttpConfig.cc +++ b/proxy/http/HttpConfig.cc @@ -891,6 +891,7 @@ HttpConfig::startup() HttpEstablishStaticConfigLongLong(c.max_websocket_connections, "proxy.config.http.websocket.max_number_of_connections"); HttpEstablishStaticConfigLongLong(c.oride.server_tcp_init_cwnd, "proxy.config.http.server_tcp_init_cwnd"); HttpEstablishStaticConfigLongLong(c.oride.origin_max_connections, "proxy.config.http.origin_max_connections"); + HttpEstablishStaticConfigLongLong(c.oride.origin_max_connections_queue, "proxy.config.http.origin_max_connections_queue"); HttpEstablishStaticConfigLongLong(c.origin_min_keep_alive_connections, "proxy.config.http.origin_min_keep_alive_connections"); HttpEstablishStaticConfigLongLong(c.oride.attach_server_session_to_client, "proxy.config.http.attach_server_session_to_client"); @@ -1162,6 +1163,13 @@ HttpConfig::reconfigure() params->max_websocket_connections = m_master.max_websocket_connections; params->oride.server_tcp_init_cwnd = m_master.oride.server_tcp_init_cwnd; params->oride.origin_max_connections = m_master.oride.origin_max_connections; + params->oride.origin_max_connections_queue = m_master.oride.origin_max_connections_queue; + // if origin_max_connections_queue is set without max_connections, it is meaningless, so we'll warn + if (params->oride.origin_max_connections_queue && + !(params->oride.origin_max_connections || params->origin_min_keep_alive_connections)) { + Warning("origin_max_connections_queue is set, but neither origin_max_connections nor origin_min_keep_alive_connections are " + "set, please correct your records.config"); + } params->origin_min_keep_alive_connections = m_master.origin_min_keep_alive_connections; params->oride.attach_server_session_to_client = m_master.oride.attach_server_session_to_client; diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h index 4f031493582..92ec3db6001 100644 --- a/proxy/http/HttpConfig.h +++ b/proxy/http/HttpConfig.h @@ -378,7 +378,7 @@ struct OverridableHttpConfigParams { cache_heuristic_min_lifetime(3600), cache_heuristic_max_lifetime(86400), cache_guaranteed_min_lifetime(0), cache_guaranteed_max_lifetime(31536000), cache_max_stale_age(604800), keep_alive_no_activity_timeout_in(115), keep_alive_no_activity_timeout_out(120), transaction_no_activity_timeout_in(30), transaction_no_activity_timeout_out(30), - transaction_active_timeout_out(0), origin_max_connections(0), attach_server_session_to_client(0), + transaction_active_timeout_out(0), origin_max_connections(0), origin_max_connections_queue(0), attach_server_session_to_client(0), connect_attempts_max_retries(0), connect_attempts_max_retries_dead_server(3), connect_attempts_rr_retries(3), connect_attempts_timeout(30), post_connect_attempts_timeout(1800), down_server_timeout(300), client_abort_threshold(10), freshness_fuzz_time(240), freshness_fuzz_min_time(0), max_cache_open_read_retries(-1), cache_open_read_retry_time(10), @@ -529,6 +529,7 @@ struct OverridableHttpConfigParams { MgmtInt transaction_no_activity_timeout_out; MgmtInt transaction_active_timeout_out; MgmtInt origin_max_connections; + MgmtInt origin_max_connections_queue; MgmtInt attach_server_session_to_client; diff --git a/proxy/http/HttpConnectionCount.cc b/proxy/http/HttpConnectionCount.cc index 74f06b4b79a..9f482a96431 100644 --- a/proxy/http/HttpConnectionCount.cc +++ b/proxy/http/HttpConnectionCount.cc @@ -25,3 +25,4 @@ ConnectionCount ConnectionCount::_connectionCount; +ConnectionCountQueue ConnectionCountQueue::_connectionCount; diff --git a/proxy/http/HttpConnectionCount.h b/proxy/http/HttpConnectionCount.h index 3fed8541a32..d1f36562c6a 100644 --- a/proxy/http/HttpConnectionCount.h +++ b/proxy/http/HttpConnectionCount.h @@ -183,7 +183,7 @@ class ConnectionCount } }; -private: +protected: // Hide the constructor and copy constructor ConnectionCount() { ink_mutex_init(&_mutex, "ConnectionCountMutex"); } ConnectionCount(const ConnectionCount & /* x ATS_UNUSED */) {} @@ -193,4 +193,21 @@ class ConnectionCount ink_mutex _mutex; }; +class ConnectionCountQueue: public ConnectionCount { +public: + /** + * Static method to get the instance of the class + * @return Returns a pointer to the instance of the class + */ + static ConnectionCountQueue * + getInstance() + { + return &_connectionCount; + } +private: + static ConnectionCountQueue _connectionCount; +}; + + + #endif diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index 90d819b9df0..f084c63722d 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -4761,7 +4761,29 @@ HttpSM::do_http_server_open(bool raw) DebugSM("http", "[%" PRId64 "] over the number of connection for this host: %s", sm_id, ats_ip_nptop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); ink_assert(pending_action == NULL); - pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + + // if we were previously queued, or the queue is disabled-- just reschedule + if (t_state.origin_request_queued || t_state.txn_conf->origin_max_connections_queue < 0) { + pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + return; + } else if (t_state.txn_conf->origin_max_connections_queue > 0) { // If we have a queue, lets see if there is a slot + ConnectionCountQueue *waiting_connections = ConnectionCountQueue::getInstance(); + // if there is space in the queue + if (waiting_connections->getCount(t_state.current.server->dst_addr, hostname_hash, + (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match) < t_state.txn_conf->origin_max_connections_queue) { + t_state.origin_request_queued = true; + Debug("http", "[%" PRId64 "] queued for this host: %s", sm_id, + ats_ip_ntop(&t_state.current.server->dst_addr.sa, addrbuf, sizeof(addrbuf))); + waiting_connections->incrementCount(t_state.current.server->dst_addr, hostname_hash, (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match, 1); + pending_action = eventProcessor.schedule_in(this, HRTIME_MSECONDS(100)); + } else { // the queue is full + t_state.current.state = HttpTransact::CONNECTION_ERROR; + call_transact_and_set_next_state(HttpTransact::HandleResponse); + } + } else { // the queue is set to 0 + t_state.current.state = HttpTransact::CONNECTION_ERROR; + call_transact_and_set_next_state(HttpTransact::HandleResponse); + } return; } } @@ -5176,6 +5198,19 @@ HttpSM::handle_post_failure() void HttpSM::handle_http_server_open() { + // if we were a queued request, we need to decrement the queue size-- as we got a connection + if (t_state.origin_request_queued) { + INK_MD5 hostname_hash; + MD5Context md5_ctx; + md5_ctx.hash_immediate(hostname_hash, static_cast(t_state.current.server->name), + strlen(t_state.current.server->name)); + + ConnectionCountQueue *waiting_connections = ConnectionCountQueue::getInstance(); + waiting_connections->incrementCount(t_state.current.server->dst_addr, hostname_hash, (TSServerSessionSharingMatchType)t_state.txn_conf->server_session_sharing_match, -1); + // The request is now not queued. This is important if the request will ever retry, the t_state is re-used + t_state.origin_request_queued = false; + } + // [bwyatt] applying per-transaction OS netVC options here // IFF they differ from the netVC's current options. // This should keep this from being redundant on a diff --git a/proxy/http/HttpTransact.h b/proxy/http/HttpTransact.h index df5b928d724..00046ca8ad5 100644 --- a/proxy/http/HttpTransact.h +++ b/proxy/http/HttpTransact.h @@ -840,6 +840,9 @@ class HttpTransact bool is_websocket; bool did_upgrade_succeed; + // Some queue info + bool origin_request_queued; + char *internal_msg_buffer; // out char *internal_msg_buffer_type; // out int64_t internal_msg_buffer_size; // out @@ -959,7 +962,7 @@ class HttpTransact first_dns_lookup(true), backdoor_request(false), cop_test_page(false), parent_params(NULL), cache_lookup_result(CACHE_LOOKUP_NONE), next_action(SM_ACTION_UNDEFINED), api_next_action(SM_ACTION_UNDEFINED), transact_return_point(NULL), post_remap_upgrade_return_point(NULL), upgrade_token_wks(NULL), is_upgrade_request(false), - is_websocket(false), did_upgrade_succeed(false), internal_msg_buffer(NULL), internal_msg_buffer_type(NULL), + is_websocket(false), did_upgrade_succeed(false), origin_request_queued(false), internal_msg_buffer(NULL), internal_msg_buffer_type(NULL), internal_msg_buffer_size(0), internal_msg_buffer_fast_allocator_size(-1), icp_lookup_success(false), scheme(-1), next_hop_scheme(scheme), orig_scheme(scheme), method(0), cause_of_death_errno(-UNKNOWN_INTERNAL_ERROR), client_request_time(UNDEFINED_TIME), request_sent_time(UNDEFINED_TIME), response_received_time(UNDEFINED_TIME), From faf06b3c445aeef01d58d7b5f6e9efcfccc022b9 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Mon, 11 Apr 2016 20:10:44 -0700 Subject: [PATCH 4/6] TS-4341 Document origin_max_connections_queue --- doc/admin-guide/files/records.config.en.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/doc/admin-guide/files/records.config.en.rst b/doc/admin-guide/files/records.config.en.rst index 69308b26e85..55ac3744818 100644 --- a/doc/admin-guide/files/records.config.en.rst +++ b/doc/admin-guide/files/records.config.en.rst @@ -1217,6 +1217,15 @@ Origin Server Connect Attempts Limits the number of socket connections per origin server to the value specified. To enable, set to one (``1``). +.. ts:cv:: CONFIG proxy.config.http.origin_max_connections_queue INT -1 + :reloadable: + :overridable: + + Limits the number of requests to be queued when the :ts:cv:`proxy.config.http.origin_max_connections` is reached. + When disabled (``-1``) requests are will wait indefinitely for an available connection. When set to ``0`` all + requests past the :ts:cv:`proxy.config.http.origin_max_connections` will immediately fail. When set to ``>0`` + ATS will queue that many requests to go to the origin, any additional requests past the limit will immediately fail. + .. ts:cv:: CONFIG proxy.config.http.origin_min_keep_alive_connections INT 0 :reloadable: From fa1bb2951475ddb92e624fde011843a9a905c603 Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Mon, 11 Apr 2016 15:51:15 -0700 Subject: [PATCH 5/6] TS-4342 Add tests for queued/max connections in addition to base tests --- ci/tsqa/tests/test_origin_max_connections.py | 105 ++++++++++++++++--- 1 file changed, 90 insertions(+), 15 deletions(-) diff --git a/ci/tsqa/tests/test_origin_max_connections.py b/ci/tsqa/tests/test_origin_max_connections.py index 48f06eb4b15..6fc0cd886dd 100644 --- a/ci/tsqa/tests/test_origin_max_connections.py +++ b/ci/tsqa/tests/test_origin_max_connections.py @@ -28,6 +28,7 @@ import thread from multiprocessing import Pool import SocketServer +import os log = logging.getLogger(__name__) @@ -52,7 +53,7 @@ def handle(self): log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start)) break body = conn_id - time.sleep(1) + time.sleep(2) resp = ('HTTP/1.1 200 OK\r\n' 'Content-Length: {content_length}\r\n' 'Content-Type: text/html; charset=UTF-8\r\n' @@ -81,35 +82,109 @@ def setUpEnv(cls, env): cls.server2.start() cls.server2.ready.wait() + queue_path = os.path.join(cls.environment.layout.sysconfdir, 'queue.conf') + with open(queue_path, 'w') as fh: + fh.write('CONFIG proxy.config.http.origin_max_connections_queue INT 2') + + noqueue_path = os.path.join(cls.environment.layout.sysconfdir, 'noqueue.conf') + with open(noqueue_path, 'w') as fh: + fh.write('CONFIG proxy.config.http.origin_max_connections_queue INT 0') + + cls.configs['remap.config'].add_line('map /other/queue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port2, queue_path)) + cls.configs['remap.config'].add_line('map /other/noqueue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port2, noqueue_path)) cls.configs['remap.config'].add_line('map /other/ http://127.0.0.1:{0}'.format(cls.socket_server_port2)) + cls.configs['remap.config'].add_line('map /queue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port, queue_path)) + cls.configs['remap.config'].add_line('map /noqueue/ http://127.0.0.1:{0} @plugin=conf_remap.so @pparam={1}'.format(cls.socket_server_port, noqueue_path)) cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}'.format(cls.socket_server_port)) - cls.origin_keep_alive_timeout = 1 - cls.configs['records.config']['CONFIG'].update({ 'proxy.config.http.origin_max_connections': 1, 'proxy.config.http.keep_alive_enabled_out': 1, - 'proxy.config.http.keep_alive_no_activity_timeout_out': cls.origin_keep_alive_timeout, - 'proxy.config.exec_thread.limit': 2, + 'proxy.config.http.keep_alive_no_activity_timeout_out': 1, + 'proxy.config.exec_thread.limit': 1, 'proxy.config.exec_thread.autoconfig': 0, }) + def _send_requests(self, total_requests, path='', other=False): + url = 'http://{0}:{1}/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) + print url + url2 = 'http://{0}:{1}/other/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) + jobs = [] + jobs2 = [] + pool = Pool(processes=4) + for _ in xrange(0, total_requests): + jobs.append(pool.apply_async(requests.get, (url,))) + if other: + jobs2.append(pool.apply_async(requests.get, (url2,))) - def test_max(self): - ''' - ''' - REQUEST_COUNT = 8 - url = 'http://{0}:{1}/'.format(self.traffic_server_host, self.traffic_server_port) - url2 = 'http://{0}:{1}/other/'.format(self.traffic_server_host, self.traffic_server_port) results = [] results2 = [] - pool = Pool(processes=4) - for _ in xrange(0, REQUEST_COUNT): - results.append(pool.apply_async(requests.get, (url,))) - results2.append(pool.apply_async(requests.get, (url2,))) + for j in jobs: + try: + results.append(j.get()) + except Exception as e: + results.append(e) + + for j in jobs2: + try: + results2.append(j.get()) + except Exception as e: + results2.append(e) + + return results, results2 + + + # TODO: enable after TS-4340 is merged + # and re-enable `other` for the remaining queueing tests + def tesst_origin_scoping(self): + '''Send 2 requests to loopback (on separate ports) and ensure that they run in parallel + ''' + results, results2 = self._send_requests(1, other=True) # TS-4340 # ensure that the 2 origins (2 different ports on loopback) were running in parallel for i in xrange(0, REQUEST_COUNT): self.assertEqual(int(results[i].get().headers['X-Current-Sessions']), 2) self.assertEqual(int(results2[i].get().headers['X-Current-Sessions']), 2) + + def test_origin_default_queueing(self): + '''By default we have no queue limit + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT) + + for x in xrange(0, REQUEST_COUNT): + self.assertEqual(results[x].status_code, 200) + #self.assertEqual(results2[x].status_code, 200) + + def test_origin_queueing(self): + '''If a queue is set, N requests are queued and the rest immediately fail + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='queue/') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + else: + fail += 1 + print 'results:', success, fail + self.assertEqual(success, 3) + + def test_origin_no_queueing(self): + '''If the queue is set to 0, all requests past the max immediately fail + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='noqueue/') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + else: + fail += 1 + print 'results:', success, fail + self.assertEqual(success, 1) From 600a98e3d93942a59ecb2a42b99a77856122dbdc Mon Sep 17 00:00:00 2001 From: Thomas Jackson Date: Tue, 12 Apr 2016 09:38:53 -0700 Subject: [PATCH 6/6] Test for queue leaking in the CLIENT_ABORT case --- ci/tsqa/tests/test_origin_max_connections.py | 29 ++++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/ci/tsqa/tests/test_origin_max_connections.py b/ci/tsqa/tests/test_origin_max_connections.py index 6fc0cd886dd..c5bf41a3c37 100644 --- a/ci/tsqa/tests/test_origin_max_connections.py +++ b/ci/tsqa/tests/test_origin_max_connections.py @@ -53,7 +53,11 @@ def handle(self): log.info('Client disconnected: {timeout}seconds'.format(timeout=time.time() - start)) break body = conn_id - time.sleep(2) + if 'timeout' in data: + print 'sleep for a long time!' + time.sleep(4) + else: + time.sleep(2) resp = ('HTTP/1.1 200 OK\r\n' 'Content-Length: {content_length}\r\n' 'Content-Type: text/html; charset=UTF-8\r\n' @@ -101,13 +105,15 @@ def setUpEnv(cls, env): 'proxy.config.http.origin_max_connections': 1, 'proxy.config.http.keep_alive_enabled_out': 1, 'proxy.config.http.keep_alive_no_activity_timeout_out': 1, + 'proxy.config.http.transaction_active_timeout_out': 2, + 'proxy.config.http.connect_attempts_timeout': 2, + 'proxy.config.http.connect_attempts_rr_retries': 0, 'proxy.config.exec_thread.limit': 1, 'proxy.config.exec_thread.autoconfig': 0, }) def _send_requests(self, total_requests, path='', other=False): url = 'http://{0}:{1}/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) - print url url2 = 'http://{0}:{1}/other/{2}'.format(self.traffic_server_host, self.traffic_server_port, path) jobs = [] jobs2 = [] @@ -170,9 +176,26 @@ def test_origin_queueing(self): success += 1 else: fail += 1 - print 'results:', success, fail self.assertEqual(success, 3) + def test_origin_queueing_timeouts(self): + '''Lets have some requests timeout and ensure that the queue is freed up + ''' + REQUEST_COUNT = 4 + results, results2 = self._send_requests(REQUEST_COUNT, path='queue/timeout') + + success = 0 + fail = 0 + for x in xrange(0, REQUEST_COUNT): + if results[x].status_code == 200: + success += 1 + print 'success', x + else: + fail += 1 + self.assertEqual(fail, 4) + + self.test_origin_queueing() + def test_origin_no_queueing(self): '''If the queue is set to 0, all requests past the max immediately fail '''