From 1b9053a95308218a7b25ac2e6aad9cd81658001a Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Thu, 25 Mar 2021 10:19:28 -0600 Subject: [PATCH 01/10] New rate_limit plugin for simple resource limitations This current version has only one limiter, which implements a basic active_in limitation. However, at least for now, these connections that are queued will still count against the active connection metrics and limits that are in the core. I'm open for a refactoring of this, if or when we want to have different types of limiters. Similar to the policies for the cache_promote plugin. --- plugins/Makefile.am | 1 + plugins/experimental/rate_limit/Makefile.inc | 21 +++ plugins/experimental/rate_limit/limiter.cc | 96 +++++++++++++ plugins/experimental/rate_limit/limiter.h | 135 ++++++++++++++++++ plugins/experimental/rate_limit/rate_limit.cc | 125 ++++++++++++++++ 5 files changed, 378 insertions(+) create mode 100644 plugins/experimental/rate_limit/Makefile.inc create mode 100644 plugins/experimental/rate_limit/limiter.cc create mode 100644 plugins/experimental/rate_limit/limiter.h create mode 100644 plugins/experimental/rate_limit/rate_limit.cc diff --git a/plugins/Makefile.am b/plugins/Makefile.am index 52ce5d8bfdc..785e495669f 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -79,6 +79,7 @@ include experimental/memory_profile/Makefile.inc include experimental/metalink/Makefile.inc include experimental/money_trace/Makefile.inc include experimental/mp4/Makefile.inc +include experimental/rate_limit/Makefile.inc include experimental/redo_cache_lookup/Makefile.inc include experimental/remap_stats/Makefile.inc include experimental/slice/Makefile.inc diff --git a/plugins/experimental/rate_limit/Makefile.inc b/plugins/experimental/rate_limit/Makefile.inc new file mode 100644 index 00000000000..250ce134bd2 --- /dev/null +++ b/plugins/experimental/rate_limit/Makefile.inc @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pkglib_LTLIBRARIES += experimental/rate_limit/rate_limit.la + +experimental_rate_limit_rate_limit_la_SOURCES = \ + experimental/rate_limit/rate_limit.cc \ + experimental/rate_limit/limiter.cc diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc new file mode 100644 index 00000000000..c6818b469a1 --- /dev/null +++ b/plugins/experimental/rate_limit/limiter.cc @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "limiter.h" + +bool +RateLimiter::reserve() +{ + TSReleaseAssert(_active <= limit); + TSMutexLock(_active_lock); + if (_active == limit) { + TSMutexUnlock(_active_lock); + return false; + } else { + ++_active; + TSDebug(PLUGIN_NAME, "Active txns == %u", active()); + TSMutexUnlock(_active_lock); + return true; + } +} + +void +RateLimiter::release() +{ + TSMutexLock(_active_lock); + --_active; + TSMutexUnlock(_active_lock); +} + +/////////////////////////////////////////////////////////////////////////////// +// This is the continuation that gets scheduled perdiocally to process the +// deque of waiting TXNs. +// +int +RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) +{ + RateLimiter *limiter = static_cast(TSContDataGet(cont)); + + while (limiter->size() > 0 && limiter->reserve()) { + QueueItem item = limiter->pop(); + + TSDebug(PLUGIN_NAME, "Enabling queued txn"); + // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done + TSHttpTxnHookAdd(std::get<0>(item), TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); + TSHttpTxnReenable(std::get<0>(item), TS_EVENT_HTTP_CONTINUE); + } + + return TS_EVENT_NONE; +} + +/////////////////////////////////////////////////////////////////////////////// +// The main rate limiting continuation. ToDo: Maybe this should be in the +// RateLimiter class (static)? +// +int +RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) +{ + RateLimiter *limiter = static_cast(TSContDataGet(cont)); + TSDebug(PLUGIN_NAME, "rate_limit_cont() called with event == %d", static_cast(event)); + + switch (event) { + case TS_EVENT_HTTP_TXN_CLOSE: + TSDebug(PLUGIN_NAME, "Decrementing active count"); + limiter->release(); + TSContDestroy(cont); // We are done with this continuation now + TSHttpTxnReenable(static_cast(edata), TS_EVENT_HTTP_CONTINUE); + return TS_EVENT_CONTINUE; + break; + + case TS_EVENT_HTTP_POST_REMAP: + TSDebug(PLUGIN_NAME, "Delaying request"); + limiter->push(static_cast(edata), cont); + return TS_EVENT_NONE; + break; + + default: + TSDebug(PLUGIN_NAME, "Unknown event"); + TSError("Unknown event in %s", PLUGIN_NAME); + break; + } + return TS_EVENT_NONE; +} diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h new file mode 100644 index 00000000000..647a596b54b --- /dev/null +++ b/plugins/experimental/rate_limit/limiter.h @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +#include + +constexpr char const PLUGIN_NAME[] = "rate_limit"; +constexpr unsigned QUEUE_DELAY_TIME = 100; // Examine the queue every 100ms. + +using QueueItem = std::tuple; + +/////////////////////////////////////////////////////////////////////////////// +// Configuration object for a rate limiting remap rule. +// +class RateLimiter +{ +public: + RateLimiter() : _queue_lock(TSMutexCreate()), _active_lock(TSMutexCreate()) {} + + ~RateLimiter() + { + if (_queue_cont) { + TSContDestroy(_queue_cont); + } + TSMutexDestroy(_queue_lock); + TSMutexDestroy(_active_lock); + } + + // Reserve / release a slot from the active connect limits. Reserve will return + // false if we are unable to reserve a slot. + bool reserve(); + void release(); + + // Current size of the active_in connections + unsigned + active() const + { + return _active.load(); + } + + // Current size of the queue + unsigned + size() const + { + return _size.load(); + } + + // Is the queue full (at it's max size)? + bool + full() const + { + return (_size == max_queue); + } + + void + push(TSHttpTxn txnp, TSCont cont) + { + TSMutexLock(_queue_lock); + _queue.push_back(std::make_tuple(txnp, cont)); + ++_size; + TSMutexUnlock(_queue_lock); + } + + QueueItem + pop() + { + QueueItem item{nullptr, nullptr}; + + TSMutexLock(_queue_lock); + if (!_queue.empty()) { + item = std::move(_queue.front()); + _queue.pop_front(); + --_size; + } + TSMutexUnlock(_queue_lock); + + return item; // ToDo: do we see RVO here ? + } + + // Setup the continuous queue processing continuation + void + setupQueueCont() + { + _queue_cont = TSContCreate(queue_process_cont, TSMutexCreate()); + TSReleaseAssert(_queue_cont); + TSContDataSet(_queue_cont, this); + TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME, TS_THREAD_POOL_TASK); + } + + // Create and setup a TXN continuation for a connection that needs to be delayed + void + setupTxnCont(void *ih, TSHttpTxn txnp, TSHttpHookID hook) + { + TSCont cont = TSContCreate(rate_limit_cont, nullptr); + TSReleaseAssert(cont); + + TSContDataSet(cont, ih); + TSHttpTxnHookAdd(txnp, hook, cont); + } + + // These are the configurable portions of this limiter, public so sue me. + unsigned limit = 100; // Arbitrary default, probably should be a required config + unsigned max_queue = 0; // No queue limit, but if sets will give an immediate error if at max + unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + +private: + static int queue_process_cont(TSCont cont, TSEvent event, void *edata); + static int rate_limit_cont(TSCont cont, TSEvent event, void *edata); + + std::atomic _active = 0; // Current active number of txns. This has to always stay <= limit above + std::atomic _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue. + + TSMutex _queue_lock, _active_lock; // Resource locks + std::deque _queue; // Queue for the pending TXN's + TSCont _queue_cont = nullptr; // Continuation processing the queue periodically. +}; diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc new file mode 100644 index 00000000000..dc406eff4a9 --- /dev/null +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +#include "limiter.h" + +/////////////////////////////////////////////////////////////////////////////// +// Setup stuff for the remap plugin +// +TSReturnCode +TSRemapInit(TSRemapInterface *api_info, char *errbuf, int errbuf_size) +{ + if (!api_info) { + strncpy(errbuf, "[tsremap_init] - Invalid TSRemapInterface argument", errbuf_size - 1); + return TS_ERROR; + } + + if (api_info->tsremap_version < TSREMAP_VERSION) { + snprintf(errbuf, errbuf_size - 1, "[TSRemapInit] - Incorrect API version %ld.%ld", api_info->tsremap_version >> 16, + (api_info->tsremap_version & 0xffff)); + return TS_ERROR; + } + + TSDebug(PLUGIN_NAME, "plugin is successfully initialized"); + return TS_SUCCESS; +} + +void +TSRemapDeleteInstance(void *ih) +{ + delete static_cast(ih); +} + +TSReturnCode +TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSED */, int /* errbuf_size ATS_UNUSED */) +{ + static const struct option longopt[] = { + // There's only one limiter right now, so no option for --limiter + {const_cast("limit"), required_argument, nullptr, 'l'}, + {const_cast("queue"), required_argument, nullptr, 'q'}, + {const_cast("error"), required_argument, nullptr, 'e'}, + // EOF + {nullptr, no_argument, nullptr, '\0'}, + }; + + RateLimiter *limiter = new RateLimiter(); + TSReleaseAssert(limiter); + // argv contains the "to" and "from" URLs. Skip the first so that the + // second one poses as the program name. + --argc; + ++argv; + + while (true) { + int opt = getopt_long(argc, (char *const *)argv, "", longopt, nullptr); + + switch (opt) { + case 'l': + limiter->limit = strtol(optarg, nullptr, 10); + break; + case 'q': + limiter->max_queue = strtol(optarg, nullptr, 10); + break; + case 'e': + limiter->error = strtol(optarg, nullptr, 10); + break; + } + if (opt == -1) { + break; + } + } + + limiter->setupQueueCont(); + + TSDebug(PLUGIN_NAME, "Added active_in limiter rule (limit=%u, queue=%u, error=%u", limiter->limit, limiter->max_queue, + limiter->error); + *ih = static_cast(limiter); + + return TS_SUCCESS; +} + +/////////////////////////////////////////////////////////////////////////////// +// This is the main "entry" point for the plugin, called for every request. +// +TSRemapStatus +TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri) +{ + RateLimiter *limiter = static_cast(ih); + + if (limiter) { + if (!limiter->reserve()) { + if (limiter->max_queue > 0 && limiter->full()) { + // We are running at limit, and the queue has reached max capacity, give back an error and be done. + TSDebug(PLUGIN_NAME, "Rejecting request, we're at capacity and queue is full"); + TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); + } else { + limiter->setupTxnCont(ih, txnp, TS_HTTP_POST_REMAP_HOOK); + TSDebug(PLUGIN_NAME, "Adding rate limiting hook, we are at capacity"); + } + } else { + limiter->setupTxnCont(ih, txnp, TS_HTTP_TXN_CLOSE_HOOK); + TSDebug(PLUGIN_NAME, "Adding txn-close hook, we're not at capacity"); + } + } + + return TSREMAP_NO_REMAP; +} From f044cb961ba0c23bb65b3058d571a16aae3d72d2 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Thu, 25 Mar 2021 13:41:56 -0600 Subject: [PATCH 02/10] Added documentation --- doc/admin-guide/plugins/index.en.rst | 4 ++ doc/admin-guide/plugins/rate_limit.en.rst | 62 +++++++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 doc/admin-guide/plugins/rate_limit.en.rst diff --git a/doc/admin-guide/plugins/index.en.rst b/doc/admin-guide/plugins/index.en.rst index 6f73b82083f..c9037c9a610 100644 --- a/doc/admin-guide/plugins/index.en.rst +++ b/doc/admin-guide/plugins/index.en.rst @@ -164,6 +164,7 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi MP4 Multiplexer MySQL Remap + Rate Limit Signed URLs Slice SSL Headers @@ -228,6 +229,9 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi :doc:`Prefetch ` Pre-fetch objects based on the requested URL path pattern. +:doc:`Rate Limit ` + Simple transaction rate limiting. + :doc:`Remap Purge ` This remap plugin allows the administrator to easily setup remotely controlled ``PURGE`` for the content of an entire remap rule. diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst new file mode 100644 index 00000000000..e3d36fa8591 --- /dev/null +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -0,0 +1,62 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _admin-plugins-rate-limit: + +Rate Limit Plugin +******************** + +The :program:`rate_limit` plugin provides basic mechanism for how much +traffic a particular service (remap rule) is allowed to do. Currently, +the only implementation is a limit on how many active client transactions +a service can have. However, it would be easy to refactor this plugin to +allow for adding new limiter policies later on. + +The limit counters and queues are per remap rule only, i.e. there is +(currently) no way to group transaction limits from different remap rules +into a single rate limiter. + +All configuration is done via :file:`remap.config`, and the following options +are available: + +.. program:: rate-limit + +.. option:: --limit + + The maximum number of active connections. + +.. option:: --queue + + When the limit (above) has been reached, all new transactions are placed + on a FIFO queue. This option (optional) sets an upper bound on how many + queued transactions we will allow. When this threshold is reached, all + additional transactions are immediately served with an error message. + +.. option:: --error + + An optional HTTP status error code, to be used together with the + :option:`--queue` option above. The default is `429`. + +Examples +-------- + +This example shows a simple rate limiting of `128` concurently active client +transactions, with a maximum queue size of `256`. The default of HTTP status +code `429` is used when queue is full. + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=128 @pparam=--queue=256 From 67a0d3032eafb6df71b5f0af5ad32b3072763f03 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Thu, 25 Mar 2021 21:43:20 -0600 Subject: [PATCH 03/10] Change --queue=0 to mean no queue at all --- doc/admin-guide/plugins/rate_limit.en.rst | 15 ++++++++++++++- plugins/experimental/rate_limit/limiter.h | 7 ++++--- plugins/experimental/rate_limit/rate_limit.cc | 2 +- 3 files changed, 19 insertions(+), 5 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index e3d36fa8591..94395d92dad 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -37,7 +37,7 @@ are available: .. option:: --limit - The maximum number of active connections. + The maximum number of active client transactions. .. option:: --queue @@ -46,6 +46,12 @@ are available: queued transactions we will allow. When this threshold is reached, all additional transactions are immediately served with an error message. + The queue is effectively disabled if this is set to `0`, which implies + that when the transaction limit is reached, we immediately start serving + error responses. + + The default queue size is `UINT_MAX`, which is essentially unlimited. + .. option:: --error An optional HTTP status error code, to be used together with the @@ -60,3 +66,10 @@ code `429` is used when queue is full. map http://cdn.example.com/ http://some-server.example.com \ @plugin=rate_limit.so @pparam=--limit=128 @pparam=--queue=256 + + +This example would put a hard transaction (in) limit to 256, with no backoff +queue: + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=0 diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 647a596b54b..30d5d4a30dc 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -17,6 +17,7 @@ */ #include #include +#include #include #include #include @@ -118,9 +119,9 @@ class RateLimiter } // These are the configurable portions of this limiter, public so sue me. - unsigned limit = 100; // Arbitrary default, probably should be a required config - unsigned max_queue = 0; // No queue limit, but if sets will give an immediate error if at max - unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + unsigned limit = 100; // Arbitrary default, probably should be a required config + unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max + unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) private: static int queue_process_cont(TSCont cont, TSEvent event, void *edata); diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc index dc406eff4a9..8d91aa4525c 100644 --- a/plugins/experimental/rate_limit/rate_limit.cc +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -107,7 +107,7 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri) if (limiter) { if (!limiter->reserve()) { - if (limiter->max_queue > 0 && limiter->full()) { + if (!limiter->max_queue || limiter->full()) { // We are running at limit, and the queue has reached max capacity, give back an error and be done. TSDebug(PLUGIN_NAME, "Rejecting request, we're at capacity and queue is full"); TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); From 79116e860daf55ab9b7937287f796adbfbcfc60f Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Fri, 26 Mar 2021 10:17:05 -0600 Subject: [PATCH 04/10] Cancel the queue continuation on config reload --- plugins/experimental/rate_limit/limiter.cc | 36 ++++++----------- plugins/experimental/rate_limit/limiter.h | 45 +++++++++++++++------- 2 files changed, 43 insertions(+), 38 deletions(-) diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index c6818b469a1..32fe6930ad4 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -17,30 +17,6 @@ */ #include "limiter.h" -bool -RateLimiter::reserve() -{ - TSReleaseAssert(_active <= limit); - TSMutexLock(_active_lock); - if (_active == limit) { - TSMutexUnlock(_active_lock); - return false; - } else { - ++_active; - TSDebug(PLUGIN_NAME, "Active txns == %u", active()); - TSMutexUnlock(_active_lock); - return true; - } -} - -void -RateLimiter::release() -{ - TSMutexLock(_active_lock); - --_active; - TSMutexUnlock(_active_lock); -} - /////////////////////////////////////////////////////////////////////////////// // This is the continuation that gets scheduled perdiocally to process the // deque of waiting TXNs. @@ -94,3 +70,15 @@ RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) } return TS_EVENT_NONE; } + +/////////////////////////////////////////////////////////////////////////////// +// Setup the continuous queue processing continuation +// +void +RateLimiter::setupQueueCont() +{ + _queue_cont = TSContCreate(queue_process_cont, TSMutexCreate()); + TSReleaseAssert(_queue_cont); + TSContDataSet(_queue_cont, this); + _action = TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME, TS_THREAD_POOL_TASK); +} diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 30d5d4a30dc..309d005434f 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -39,6 +39,9 @@ class RateLimiter ~RateLimiter() { + if (_action) { + TSActionCancel(_action); + } if (_queue_cont) { TSContDestroy(_queue_cont); } @@ -48,8 +51,29 @@ class RateLimiter // Reserve / release a slot from the active connect limits. Reserve will return // false if we are unable to reserve a slot. - bool reserve(); - void release(); + bool + reserve() + { + TSReleaseAssert(_active <= limit); + TSMutexLock(_active_lock); + if (_active == limit) { + TSMutexUnlock(_active_lock); + return false; + } else { + ++_active; + TSDebug(PLUGIN_NAME, "Active txns == %u", active()); + TSMutexUnlock(_active_lock); + return true; + } + } + + void + release() + { + TSMutexLock(_active_lock); + --_active; + TSMutexUnlock(_active_lock); + } // Current size of the active_in connections unsigned @@ -97,17 +121,9 @@ class RateLimiter return item; // ToDo: do we see RVO here ? } - // Setup the continuous queue processing continuation - void - setupQueueCont() - { - _queue_cont = TSContCreate(queue_process_cont, TSMutexCreate()); - TSReleaseAssert(_queue_cont); - TSContDataSet(_queue_cont, this); - TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME, TS_THREAD_POOL_TASK); - } + // Continuation creation and scheduling + void setupQueueCont(); - // Create and setup a TXN continuation for a connection that needs to be delayed void setupTxnCont(void *ih, TSHttpTxn txnp, TSHttpHookID hook) { @@ -128,9 +144,10 @@ class RateLimiter static int rate_limit_cont(TSCont cont, TSEvent event, void *edata); std::atomic _active = 0; // Current active number of txns. This has to always stay <= limit above - std::atomic _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue. + std::atomic _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue TSMutex _queue_lock, _active_lock; // Resource locks std::deque _queue; // Queue for the pending TXN's - TSCont _queue_cont = nullptr; // Continuation processing the queue periodically. + TSCont _queue_cont = nullptr; // Continuation processing the queue periodically + TSAction _action = nullptr; // The action associated with the queue continuation, needed to shut it down }; From e546f8aa35a9d7d3939cfd8153dbdbdfb605fdc3 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Fri, 26 Mar 2021 16:26:04 -0600 Subject: [PATCH 05/10] Add optional HTTP header for capturing delay metrics --- doc/admin-guide/plugins/rate_limit.en.rst | 13 ++++++++- plugins/experimental/rate_limit/limiter.cc | 28 +++++++++++++++++++ plugins/experimental/rate_limit/limiter.h | 13 +++++++-- plugins/experimental/rate_limit/rate_limit.cc | 5 +++- 4 files changed, 54 insertions(+), 5 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index 94395d92dad..113ee4cf270 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -57,6 +57,16 @@ are available: An optional HTTP status error code, to be used together with the :option:`--queue` option above. The default is `429`. +.. option:: --header + + This is an optional HTTP header name, which will be added to the client + request header IF the transaction was delayed (queued). This can be useful + to for example log the delays for later analysis. + + It is recommended that an `@` header is used here, e.g. `@RateLimit-Delay`, + since this header will not leave the ATS server instance. The value here is + appended to the header should one already exist. + Examples -------- @@ -72,4 +82,5 @@ This example would put a hard transaction (in) limit to 256, with no backoff queue: map http://cdn.example.com/ http://some-server.example.com \ - @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=0 + @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=0 \ + @pparam=--header=@RateLimit-Delay diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 32fe6930ad4..30ec633689b 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -25,10 +25,13 @@ int RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) { RateLimiter *limiter = static_cast(TSContDataGet(cont)); + QueueTime now = std::chrono::system_clock::now(); // Only do this once per "loop" while (limiter->size() > 0 && limiter->reserve()) { QueueItem item = limiter->pop(); + long delay = std::chrono::duration_cast(now - std::get<2>(item)).count(); + limiter->delayHeader(std::get<0>(item), delay, limiter->header); TSDebug(PLUGIN_NAME, "Enabling queued txn"); // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done TSHttpTxnHookAdd(std::get<0>(item), TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); @@ -82,3 +85,28 @@ RateLimiter::setupQueueCont() TSContDataSet(_queue_cont, this); _action = TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME, TS_THREAD_POOL_TASK); } + +/////////////////////////////////////////////////////////////////////////////// +// Add a header with the delay imposed on this transaction. This can be used +// for logging, and other types of metrics. +// +void +RateLimiter::delayHeader(TSHttpTxn txnp, long delay, const std::string &header) const +{ + if (header.size() > 0) { + TSMLoc hdr_loc = nullptr; + TSMBuffer bufp = nullptr; + TSMLoc field_loc = nullptr; + + if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, header.c_str(), header.size(), &field_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, delay)) { + TSDebug(PLUGIN_NAME, "The TXN was delayed %ldms", delay); + TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + } + } +} diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 309d005434f..e8dc63f67a2 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -21,13 +21,15 @@ #include #include #include +#include #include constexpr char const PLUGIN_NAME[] = "rate_limit"; constexpr unsigned QUEUE_DELAY_TIME = 100; // Examine the queue every 100ms. -using QueueItem = std::tuple; +using QueueTime = std::chrono::time_point; +using QueueItem = std::tuple; /////////////////////////////////////////////////////////////////////////////// // Configuration object for a rate limiting remap rule. @@ -99,8 +101,10 @@ class RateLimiter void push(TSHttpTxn txnp, TSCont cont) { + QueueTime now = std::chrono::system_clock::now(); + TSMutexLock(_queue_lock); - _queue.push_back(std::make_tuple(txnp, cont)); + _queue.push_back(std::make_tuple(txnp, cont, now)); ++_size; TSMutexUnlock(_queue_lock); } @@ -108,7 +112,7 @@ class RateLimiter QueueItem pop() { - QueueItem item{nullptr, nullptr}; + QueueItem item; TSMutexLock(_queue_lock); if (!_queue.empty()) { @@ -121,6 +125,8 @@ class RateLimiter return item; // ToDo: do we see RVO here ? } + void delayHeader(TSHttpTxn txpn, long delay, const std::string &header) const; + // Continuation creation and scheduling void setupQueueCont(); @@ -138,6 +144,7 @@ class RateLimiter unsigned limit = 100; // Arbitrary default, probably should be a required config unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay private: static int queue_process_cont(TSCont cont, TSEvent event, void *edata); diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc index 8d91aa4525c..c97947c2a95 100644 --- a/plugins/experimental/rate_limit/rate_limit.cc +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -54,10 +54,10 @@ TSReturnCode TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSED */, int /* errbuf_size ATS_UNUSED */) { static const struct option longopt[] = { - // There's only one limiter right now, so no option for --limiter {const_cast("limit"), required_argument, nullptr, 'l'}, {const_cast("queue"), required_argument, nullptr, 'q'}, {const_cast("error"), required_argument, nullptr, 'e'}, + {const_cast("header"), required_argument, nullptr, 'h'}, // EOF {nullptr, no_argument, nullptr, '\0'}, }; @@ -82,6 +82,9 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE case 'e': limiter->error = strtol(optarg, nullptr, 10); break; + case 'h': + limiter->header = optarg; + break; } if (opt == -1) { break; From 968b08c4a46a3ee8e13dfad0af5daaae46b00308 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Fri, 26 Mar 2021 16:50:03 -0600 Subject: [PATCH 06/10] Add support for an optional Retry-After header --- doc/admin-guide/plugins/rate_limit.en.rst | 26 +++++++++++++- plugins/experimental/rate_limit/limiter.cc | 35 +++++++++++++++++-- plugins/experimental/rate_limit/limiter.h | 2 ++ plugins/experimental/rate_limit/rate_limit.cc | 7 +++- 4 files changed, 66 insertions(+), 4 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index 113ee4cf270..837a1a7ff81 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -57,6 +57,11 @@ are available: An optional HTTP status error code, to be used together with the :option:`--queue` option above. The default is `429`. +.. option:: --retry + + An optional retry-after value, which if set will cause rejected (e.g. `429`) + responses to also include a header `Retry-After`. + .. option:: --header This is an optional HTTP header name, which will be added to the client @@ -79,8 +84,27 @@ code `429` is used when queue is full. This example would put a hard transaction (in) limit to 256, with no backoff -queue: +queue, and add a header with the transaction delay if it was queued: map http://cdn.example.com/ http://some-server.example.com \ @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=0 \ @pparam=--header=@RateLimit-Delay + +This final example will limit the active transaction, queue size, and also +add a `Retry-After` header once the queue is full and we return a `429` error: + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=1024 \ + @pparam=--retry=3600 @pparam=--header=@RateLimit-Delay + +In this case, the response would look like this when the queue is full: + + HTTP/1.1 429 Too Many Requests + Date: Fri, 26 Mar 2021 22:42:38 GMT + Connection: keep-alive + Server: ATS/10.0.0 + Cache-Control: no-store + Content-Type: text/html + Content-Language: en + Retry-After: 3600 + Content-Length: 207 diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 30ec633689b..97d618f4ae4 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -49,7 +49,6 @@ int RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) { RateLimiter *limiter = static_cast(TSContDataGet(cont)); - TSDebug(PLUGIN_NAME, "rate_limit_cont() called with event == %d", static_cast(event)); switch (event) { case TS_EVENT_HTTP_TXN_CLOSE: @@ -66,8 +65,15 @@ RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) return TS_EVENT_NONE; break; + case TS_EVENT_HTTP_SEND_RESPONSE_HDR: // This is only applicable when we set an error in remap + limiter->retryAfter(static_cast(edata), limiter->retry); + TSContDestroy(cont); // We are done with this continuation now + TSHttpTxnReenable(static_cast(edata), TS_EVENT_HTTP_CONTINUE); + return TS_EVENT_CONTINUE; + break; + default: - TSDebug(PLUGIN_NAME, "Unknown event"); + TSDebug(PLUGIN_NAME, "Unknown event %d", static_cast(event)); TSError("Unknown event in %s", PLUGIN_NAME); break; } @@ -110,3 +116,28 @@ RateLimiter::delayHeader(TSHttpTxn txnp, long delay, const std::string &header) } } } + +/////////////////////////////////////////////////////////////////////////////// +// Add a header with the delay imposed on this transaction. This can be used +// for logging, and other types of metrics. +// +void +RateLimiter::retryAfter(TSHttpTxn txnp, unsigned retry) const +{ + if (retry > 0) { + TSMLoc hdr_loc = nullptr; + TSMBuffer bufp = nullptr; + TSMLoc field_loc = nullptr; + + if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, "Retry-After", 12, &field_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, retry)) { + TSDebug(PLUGIN_NAME, "Added a Retry-After: %u", retry); + TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + } + } +} diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index e8dc63f67a2..7e38885a2b6 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -126,6 +126,7 @@ class RateLimiter } void delayHeader(TSHttpTxn txpn, long delay, const std::string &header) const; + void retryAfter(TSHttpTxn txpn, unsigned after) const; // Continuation creation and scheduling void setupQueueCont(); @@ -144,6 +145,7 @@ class RateLimiter unsigned limit = 100; // Arbitrary default, probably should be a required config unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + unsigned retry = 0; // If > 0, we will also send a Retry-After: header with this retry value std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay private: diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc index c97947c2a95..9967a8f34cf 100644 --- a/plugins/experimental/rate_limit/rate_limit.cc +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -57,6 +57,7 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE {const_cast("limit"), required_argument, nullptr, 'l'}, {const_cast("queue"), required_argument, nullptr, 'q'}, {const_cast("error"), required_argument, nullptr, 'e'}, + {const_cast("retry"), required_argument, nullptr, 'r'}, {const_cast("header"), required_argument, nullptr, 'h'}, // EOF {nullptr, no_argument, nullptr, '\0'}, @@ -82,6 +83,9 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE case 'e': limiter->error = strtol(optarg, nullptr, 10); break; + case 'r': + limiter->retry = strtol(optarg, nullptr, 10); + break; case 'h': limiter->header = optarg; break; @@ -112,8 +116,9 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri) if (!limiter->reserve()) { if (!limiter->max_queue || limiter->full()) { // We are running at limit, and the queue has reached max capacity, give back an error and be done. - TSDebug(PLUGIN_NAME, "Rejecting request, we're at capacity and queue is full"); TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); + limiter->setupTxnCont(ih, txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK); + TSDebug(PLUGIN_NAME, "Rejecting request, we're at capacity and queue is full"); } else { limiter->setupTxnCont(ih, txnp, TS_HTTP_POST_REMAP_HOOK); TSDebug(PLUGIN_NAME, "Adding rate limiting hook, we are at capacity"); From 6d0665be3fc5b471cc7534fa5be52fec13b944a2 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Sat, 27 Mar 2021 11:19:48 -0600 Subject: [PATCH 07/10] Added --maxage option, to error out old txns --- doc/admin-guide/plugins/rate_limit.en.rst | 13 ++++-- plugins/experimental/rate_limit/limiter.cc | 38 +++++++++++----- plugins/experimental/rate_limit/limiter.h | 44 ++++++++++++++----- plugins/experimental/rate_limit/rate_limit.cc | 4 ++ 4 files changed, 72 insertions(+), 27 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index 837a1a7ff81..2a7953617c4 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -65,12 +65,17 @@ are available: .. option:: --header This is an optional HTTP header name, which will be added to the client - request header IF the transaction was delayed (queued). This can be useful - to for example log the delays for later analysis. + request header IF the transaction was delayed (queued). The value of the + header is the delay, in milliseconds. This can be useful to for example + log the delays for later analysis. It is recommended that an `@` header is used here, e.g. `@RateLimit-Delay`, - since this header will not leave the ATS server instance. The value here is - appended to the header should one already exist. + since this header will not leave the ATS server instance. + +.. option:: --maxage + + An optional `max-age` for how long a transaction can sit in the delay queue. + The value (default 0) is the age in seconds. Examples -------- diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 97d618f4ae4..500ddf01756 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -27,23 +27,42 @@ RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) RateLimiter *limiter = static_cast(TSContDataGet(cont)); QueueTime now = std::chrono::system_clock::now(); // Only do this once per "loop" + // Try to enable some queued txns (if any) if there are slots available while (limiter->size() > 0 && limiter->reserve()) { QueueItem item = limiter->pop(); + TSHttpTxn txnp = std::get<0>(item); long delay = std::chrono::duration_cast(now - std::get<2>(item)).count(); - limiter->delayHeader(std::get<0>(item), delay, limiter->header); - TSDebug(PLUGIN_NAME, "Enabling queued txn"); + limiter->delayHeader(txnp, delay); + TSDebug(PLUGIN_NAME, "Enabling queued txn after %ldms", delay); // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done - TSHttpTxnHookAdd(std::get<0>(item), TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); - TSHttpTxnReenable(std::get<0>(item), TS_EVENT_HTTP_CONTINUE); + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + } + + // Kill any queued txns if they are too old + if (limiter->max_age && limiter->size() > 0) { + now = std::chrono::system_clock::now(); // Update the "now", for some extra accuracy + + while (limiter->size() > 0 && limiter->hasOldTxn(now)) { + // The oldest object on the queue is too old on the queue, so "kill" it. + QueueItem item = limiter->pop(); + TSHttpTxn txnp = std::get<0>(item); + long age = std::chrono::duration_cast(now - std::get<2>(item)).count(); + + limiter->delayHeader(txnp, age); + TSDebug(PLUGIN_NAME, "Queued TXN is too old (%ldms), erroring out", age); + TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); + TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, std::get<1>(item)); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + } } return TS_EVENT_NONE; } /////////////////////////////////////////////////////////////////////////////// -// The main rate limiting continuation. ToDo: Maybe this should be in the -// RateLimiter class (static)? +// The main rate limiting continuation. // int RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) @@ -52,7 +71,6 @@ RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) switch (event) { case TS_EVENT_HTTP_TXN_CLOSE: - TSDebug(PLUGIN_NAME, "Decrementing active count"); limiter->release(); TSContDestroy(cont); // We are done with this continuation now TSHttpTxnReenable(static_cast(edata), TS_EVENT_HTTP_CONTINUE); @@ -60,7 +78,6 @@ RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) break; case TS_EVENT_HTTP_POST_REMAP: - TSDebug(PLUGIN_NAME, "Delaying request"); limiter->push(static_cast(edata), cont); return TS_EVENT_NONE; break; @@ -97,7 +114,7 @@ RateLimiter::setupQueueCont() // for logging, and other types of metrics. // void -RateLimiter::delayHeader(TSHttpTxn txnp, long delay, const std::string &header) const +RateLimiter::delayHeader(TSHttpTxn txnp, long delay) const { if (header.size() > 0) { TSMLoc hdr_loc = nullptr; @@ -107,7 +124,6 @@ RateLimiter::delayHeader(TSHttpTxn txnp, long delay, const std::string &header) if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc)) { if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, header.c_str(), header.size(), &field_loc)) { if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, delay)) { - TSDebug(PLUGIN_NAME, "The TXN was delayed %ldms", delay); TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); } TSHandleMLocRelease(bufp, hdr_loc, field_loc); @@ -130,7 +146,7 @@ RateLimiter::retryAfter(TSHttpTxn txnp, unsigned retry) const TSMLoc field_loc = nullptr; if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc)) { - if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, "Retry-After", 12, &field_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, "Retry-After", 11, &field_loc)) { if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, retry)) { TSDebug(PLUGIN_NAME, "Added a Retry-After: %u", retry); TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 7e38885a2b6..6125c40068a 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -19,9 +19,10 @@ #include #include #include -#include #include #include +#include +#include #include @@ -58,14 +59,14 @@ class RateLimiter { TSReleaseAssert(_active <= limit); TSMutexLock(_active_lock); - if (_active == limit) { - TSMutexUnlock(_active_lock); - return false; - } else { + if (_active < limit) { ++_active; - TSDebug(PLUGIN_NAME, "Active txns == %u", active()); - TSMutexUnlock(_active_lock); + TSMutexUnlock(_active_lock); // Reduce the critical section, release early + TSDebug(PLUGIN_NAME, "Reserving a slot, active txns == %u", active()); return true; + } else { + TSMutexUnlock(_active_lock); + return false; } } @@ -75,6 +76,7 @@ class RateLimiter TSMutexLock(_active_lock); --_active; TSMutexUnlock(_active_lock); + TSDebug(PLUGIN_NAME, "Releasing a slot, active txns == %u", active()); } // Current size of the active_in connections @@ -104,7 +106,7 @@ class RateLimiter QueueTime now = std::chrono::system_clock::now(); TSMutexLock(_queue_lock); - _queue.push_back(std::make_tuple(txnp, cont, now)); + _queue.push_front(std::make_tuple(txnp, cont, now)); ++_size; TSMutexUnlock(_queue_lock); } @@ -116,16 +118,33 @@ class RateLimiter TSMutexLock(_queue_lock); if (!_queue.empty()) { - item = std::move(_queue.front()); - _queue.pop_front(); + item = std::move(_queue.back()); + _queue.pop_back(); --_size; } TSMutexUnlock(_queue_lock); - return item; // ToDo: do we see RVO here ? + return item; + } + + bool + hasOldTxn(QueueTime now) const + { + TSMutexLock(_queue_lock); + if (!_queue.empty()) { + QueueItem item = _queue.back(); + TSMutexUnlock(_queue_lock); // A little ugly but this reduces the critical section for the lock a little bit. + + long age = std::chrono::duration_cast(now - std::get<2>(item)).count(); + + return (age >= max_age); + } else { + TSMutexUnlock(_queue_lock); + return false; + } } - void delayHeader(TSHttpTxn txpn, long delay, const std::string &header) const; + void delayHeader(TSHttpTxn txpn, long delay) const; void retryAfter(TSHttpTxn txpn, unsigned after) const; // Continuation creation and scheduling @@ -144,6 +163,7 @@ class RateLimiter // These are the configurable portions of this limiter, public so sue me. unsigned limit = 100; // Arbitrary default, probably should be a required config unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max + unsigned max_age = 0; // Max age (ms) in the queue, at which point we send an error unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) unsigned retry = 0; // If > 0, we will also send a Retry-After: header with this retry value std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc index 9967a8f34cf..1e3dab78225 100644 --- a/plugins/experimental/rate_limit/rate_limit.cc +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -59,6 +59,7 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE {const_cast("error"), required_argument, nullptr, 'e'}, {const_cast("retry"), required_argument, nullptr, 'r'}, {const_cast("header"), required_argument, nullptr, 'h'}, + {const_cast("maxage"), required_argument, nullptr, 'm'}, // EOF {nullptr, no_argument, nullptr, '\0'}, }; @@ -86,6 +87,9 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE case 'r': limiter->retry = strtol(optarg, nullptr, 10); break; + case 'm': + limiter->max_age = 1000 * strtol(optarg, nullptr, 10); + break; case 'h': limiter->header = optarg; break; From b7b2acdfa334de6b6d2abc814964b4198e38709d Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Tue, 30 Mar 2021 11:24:02 -0600 Subject: [PATCH 08/10] Better use of std::chrono, now that TSHRtime is dead --- doc/admin-guide/plugins/rate_limit.en.rst | 2 +- plugins/experimental/rate_limit/limiter.cc | 22 +++++++++---------- plugins/experimental/rate_limit/limiter.h | 16 +++++++------- plugins/experimental/rate_limit/rate_limit.cc | 2 +- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index 2a7953617c4..9562bdf0999 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -75,7 +75,7 @@ are available: .. option:: --maxage An optional `max-age` for how long a transaction can sit in the delay queue. - The value (default 0) is the age in seconds. + The value (default 0) is the age in milliseconds. Examples -------- diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 500ddf01756..5e96e651ee8 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -29,29 +29,29 @@ RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) // Try to enable some queued txns (if any) if there are slots available while (limiter->size() > 0 && limiter->reserve()) { - QueueItem item = limiter->pop(); - TSHttpTxn txnp = std::get<0>(item); - long delay = std::chrono::duration_cast(now - std::get<2>(item)).count(); + QueueItem item = limiter->pop(); + TSHttpTxn txnp = std::get<0>(item); + std::chrono::microseconds delay = std::chrono::duration_cast(now - std::get<2>(item)); limiter->delayHeader(txnp, delay); - TSDebug(PLUGIN_NAME, "Enabling queued txn after %ldms", delay); + TSDebug(PLUGIN_NAME, "Enabling queued txn after %ldms", static_cast(delay.count())); // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); } // Kill any queued txns if they are too old - if (limiter->max_age && limiter->size() > 0) { + if (limiter->max_age > std::chrono::milliseconds::zero() && limiter->size() > 0) { now = std::chrono::system_clock::now(); // Update the "now", for some extra accuracy while (limiter->size() > 0 && limiter->hasOldTxn(now)) { // The oldest object on the queue is too old on the queue, so "kill" it. - QueueItem item = limiter->pop(); - TSHttpTxn txnp = std::get<0>(item); - long age = std::chrono::duration_cast(now - std::get<2>(item)).count(); + QueueItem item = limiter->pop(); + TSHttpTxn txnp = std::get<0>(item); + std::chrono::milliseconds age = std::chrono::duration_cast(now - std::get<2>(item)); limiter->delayHeader(txnp, age); - TSDebug(PLUGIN_NAME, "Queued TXN is too old (%ldms), erroring out", age); + TSDebug(PLUGIN_NAME, "Queued TXN is too old (%ldms), erroring out", static_cast(age.count())); TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, std::get<1>(item)); TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); @@ -114,7 +114,7 @@ RateLimiter::setupQueueCont() // for logging, and other types of metrics. // void -RateLimiter::delayHeader(TSHttpTxn txnp, long delay) const +RateLimiter::delayHeader(TSHttpTxn txnp, std::chrono::microseconds delay) const { if (header.size() > 0) { TSMLoc hdr_loc = nullptr; @@ -123,7 +123,7 @@ RateLimiter::delayHeader(TSHttpTxn txnp, long delay) const if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc)) { if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, header.c_str(), header.size(), &field_loc)) { - if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, delay)) { + if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, static_cast(delay.count()))) { TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); } TSHandleMLocRelease(bufp, hdr_loc, field_loc); diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 6125c40068a..13cd7496666 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -135,7 +135,7 @@ class RateLimiter QueueItem item = _queue.back(); TSMutexUnlock(_queue_lock); // A little ugly but this reduces the critical section for the lock a little bit. - long age = std::chrono::duration_cast(now - std::get<2>(item)).count(); + std::chrono::milliseconds age = std::chrono::duration_cast(now - std::get<2>(item)); return (age >= max_age); } else { @@ -144,7 +144,7 @@ class RateLimiter } } - void delayHeader(TSHttpTxn txpn, long delay) const; + void delayHeader(TSHttpTxn txpn, std::chrono::microseconds delay) const; void retryAfter(TSHttpTxn txpn, unsigned after) const; // Continuation creation and scheduling @@ -161,12 +161,12 @@ class RateLimiter } // These are the configurable portions of this limiter, public so sue me. - unsigned limit = 100; // Arbitrary default, probably should be a required config - unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max - unsigned max_age = 0; // Max age (ms) in the queue, at which point we send an error - unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) - unsigned retry = 0; // If > 0, we will also send a Retry-After: header with this retry value - std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay + unsigned limit = 100; // Arbitrary default, probably should be a required config + unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max + unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + unsigned retry = 0; // If > 0, we will also send a Retry-After: header with this retry value + std::chrono::milliseconds max_age = std::chrono::milliseconds::zero(); // Max age (ms) in the queue + std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay private: static int queue_process_cont(TSCont cont, TSEvent event, void *edata); diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc index 1e3dab78225..37d8c65b5b1 100644 --- a/plugins/experimental/rate_limit/rate_limit.cc +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -88,7 +88,7 @@ TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSE limiter->retry = strtol(optarg, nullptr, 10); break; case 'm': - limiter->max_age = 1000 * strtol(optarg, nullptr, 10); + limiter->max_age = std::chrono::milliseconds(strtol(optarg, nullptr, 10)); break; case 'h': limiter->header = optarg; From c0791796e76f6ce536428f75022a04b6ce44c173 Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Tue, 6 Apr 2021 16:36:27 -0600 Subject: [PATCH 09/10] Fixed spelling errors --- doc/admin-guide/plugins/rate_limit.en.rst | 2 +- plugins/experimental/rate_limit/limiter.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst index 9562bdf0999..aea3b466a68 100644 --- a/doc/admin-guide/plugins/rate_limit.en.rst +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -80,7 +80,7 @@ are available: Examples -------- -This example shows a simple rate limiting of `128` concurently active client +This example shows a simple rate limiting of `128` concurrently active client transactions, with a maximum queue size of `256`. The default of HTTP status code `429` is used when queue is full. diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 5e96e651ee8..0fe868f48bb 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -18,7 +18,7 @@ #include "limiter.h" /////////////////////////////////////////////////////////////////////////////// -// This is the continuation that gets scheduled perdiocally to process the +// This is the continuation that gets scheduled periodically to process the // deque of waiting TXNs. // int From 390d958e4be2816ccb193dd106a6ac3dc6e6f99f Mon Sep 17 00:00:00 2001 From: Leif Hedstrom Date: Fri, 9 Apr 2021 15:11:41 -0600 Subject: [PATCH 10/10] Addresses Brian's review comments --- plugins/experimental/rate_limit/limiter.cc | 16 +++++++--------- plugins/experimental/rate_limit/limiter.h | 4 ++-- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc index 0fe868f48bb..5960e9719ef 100644 --- a/plugins/experimental/rate_limit/limiter.cc +++ b/plugins/experimental/rate_limit/limiter.cc @@ -29,14 +29,13 @@ RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) // Try to enable some queued txns (if any) if there are slots available while (limiter->size() > 0 && limiter->reserve()) { - QueueItem item = limiter->pop(); - TSHttpTxn txnp = std::get<0>(item); - std::chrono::microseconds delay = std::chrono::duration_cast(now - std::get<2>(item)); + auto [txnp, contp, start_time] = limiter->pop(); + std::chrono::microseconds delay = std::chrono::duration_cast(now - start_time); limiter->delayHeader(txnp, delay); TSDebug(PLUGIN_NAME, "Enabling queued txn after %ldms", static_cast(delay.count())); // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done - TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, std::get<1>(item)); + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, contp); TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); } @@ -46,14 +45,13 @@ RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) while (limiter->size() > 0 && limiter->hasOldTxn(now)) { // The oldest object on the queue is too old on the queue, so "kill" it. - QueueItem item = limiter->pop(); - TSHttpTxn txnp = std::get<0>(item); - std::chrono::milliseconds age = std::chrono::duration_cast(now - std::get<2>(item)); + auto [txnp, contp, start_time] = limiter->pop(); + std::chrono::milliseconds age = std::chrono::duration_cast(now - start_time); limiter->delayHeader(txnp, age); TSDebug(PLUGIN_NAME, "Queued TXN is too old (%ldms), erroring out", static_cast(age.count())); TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); - TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, std::get<1>(item)); + TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, contp); TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); } } @@ -106,7 +104,7 @@ RateLimiter::setupQueueCont() _queue_cont = TSContCreate(queue_process_cont, TSMutexCreate()); TSReleaseAssert(_queue_cont); TSContDataSet(_queue_cont, this); - _action = TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME, TS_THREAD_POOL_TASK); + _action = TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME.count(), TS_THREAD_POOL_TASK); } /////////////////////////////////////////////////////////////////////////////// diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h index 13cd7496666..947f6a9f3ae 100644 --- a/plugins/experimental/rate_limit/limiter.h +++ b/plugins/experimental/rate_limit/limiter.h @@ -26,8 +26,8 @@ #include -constexpr char const PLUGIN_NAME[] = "rate_limit"; -constexpr unsigned QUEUE_DELAY_TIME = 100; // Examine the queue every 100ms. +constexpr char const PLUGIN_NAME[] = "rate_limit"; +constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{100}; // Examine the queue every 100ms using QueueTime = std::chrono::time_point; using QueueItem = std::tuple;