diff --git a/doc/admin-guide/plugins/index.en.rst b/doc/admin-guide/plugins/index.en.rst index 6f73b82083f..c9037c9a610 100644 --- a/doc/admin-guide/plugins/index.en.rst +++ b/doc/admin-guide/plugins/index.en.rst @@ -164,6 +164,7 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi MP4 Multiplexer MySQL Remap + Rate Limit Signed URLs Slice SSL Headers @@ -228,6 +229,9 @@ directory of the |TS| source tree. Experimental plugins can be compiled by passi :doc:`Prefetch ` Pre-fetch objects based on the requested URL path pattern. +:doc:`Rate Limit ` + Simple transaction rate limiting. + :doc:`Remap Purge ` This remap plugin allows the administrator to easily setup remotely controlled ``PURGE`` for the content of an entire remap rule. diff --git a/doc/admin-guide/plugins/rate_limit.en.rst b/doc/admin-guide/plugins/rate_limit.en.rst new file mode 100644 index 00000000000..aea3b466a68 --- /dev/null +++ b/doc/admin-guide/plugins/rate_limit.en.rst @@ -0,0 +1,115 @@ +.. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _admin-plugins-rate-limit: + +Rate Limit Plugin +******************** + +The :program:`rate_limit` plugin provides basic mechanism for how much +traffic a particular service (remap rule) is allowed to do. Currently, +the only implementation is a limit on how many active client transactions +a service can have. However, it would be easy to refactor this plugin to +allow for adding new limiter policies later on. + +The limit counters and queues are per remap rule only, i.e. there is +(currently) no way to group transaction limits from different remap rules +into a single rate limiter. + +All configuration is done via :file:`remap.config`, and the following options +are available: + +.. program:: rate-limit + +.. option:: --limit + + The maximum number of active client transactions. + +.. option:: --queue + + When the limit (above) has been reached, all new transactions are placed + on a FIFO queue. This option (optional) sets an upper bound on how many + queued transactions we will allow. When this threshold is reached, all + additional transactions are immediately served with an error message. + + The queue is effectively disabled if this is set to `0`, which implies + that when the transaction limit is reached, we immediately start serving + error responses. + + The default queue size is `UINT_MAX`, which is essentially unlimited. + +.. option:: --error + + An optional HTTP status error code, to be used together with the + :option:`--queue` option above. The default is `429`. + +.. option:: --retry + + An optional retry-after value, which if set will cause rejected (e.g. `429`) + responses to also include a header `Retry-After`. + +.. option:: --header + + This is an optional HTTP header name, which will be added to the client + request header IF the transaction was delayed (queued). The value of the + header is the delay, in milliseconds. This can be useful to for example + log the delays for later analysis. + + It is recommended that an `@` header is used here, e.g. `@RateLimit-Delay`, + since this header will not leave the ATS server instance. + +.. option:: --maxage + + An optional `max-age` for how long a transaction can sit in the delay queue. + The value (default 0) is the age in milliseconds. + +Examples +-------- + +This example shows a simple rate limiting of `128` concurrently active client +transactions, with a maximum queue size of `256`. The default of HTTP status +code `429` is used when queue is full. + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=128 @pparam=--queue=256 + + +This example would put a hard transaction (in) limit to 256, with no backoff +queue, and add a header with the transaction delay if it was queued: + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=0 \ + @pparam=--header=@RateLimit-Delay + +This final example will limit the active transaction, queue size, and also +add a `Retry-After` header once the queue is full and we return a `429` error: + + map http://cdn.example.com/ http://some-server.example.com \ + @plugin=rate_limit.so @pparam=--limit=256 @pparam=--queue=1024 \ + @pparam=--retry=3600 @pparam=--header=@RateLimit-Delay + +In this case, the response would look like this when the queue is full: + + HTTP/1.1 429 Too Many Requests + Date: Fri, 26 Mar 2021 22:42:38 GMT + Connection: keep-alive + Server: ATS/10.0.0 + Cache-Control: no-store + Content-Type: text/html + Content-Language: en + Retry-After: 3600 + Content-Length: 207 diff --git a/plugins/Makefile.am b/plugins/Makefile.am index 52ce5d8bfdc..785e495669f 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -79,6 +79,7 @@ include experimental/memory_profile/Makefile.inc include experimental/metalink/Makefile.inc include experimental/money_trace/Makefile.inc include experimental/mp4/Makefile.inc +include experimental/rate_limit/Makefile.inc include experimental/redo_cache_lookup/Makefile.inc include experimental/remap_stats/Makefile.inc include experimental/slice/Makefile.inc diff --git a/plugins/experimental/rate_limit/Makefile.inc b/plugins/experimental/rate_limit/Makefile.inc new file mode 100644 index 00000000000..250ce134bd2 --- /dev/null +++ b/plugins/experimental/rate_limit/Makefile.inc @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +pkglib_LTLIBRARIES += experimental/rate_limit/rate_limit.la + +experimental_rate_limit_rate_limit_la_SOURCES = \ + experimental/rate_limit/rate_limit.cc \ + experimental/rate_limit/limiter.cc diff --git a/plugins/experimental/rate_limit/limiter.cc b/plugins/experimental/rate_limit/limiter.cc new file mode 100644 index 00000000000..5960e9719ef --- /dev/null +++ b/plugins/experimental/rate_limit/limiter.cc @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "limiter.h" + +/////////////////////////////////////////////////////////////////////////////// +// This is the continuation that gets scheduled periodically to process the +// deque of waiting TXNs. +// +int +RateLimiter::queue_process_cont(TSCont cont, TSEvent event, void *edata) +{ + RateLimiter *limiter = static_cast(TSContDataGet(cont)); + QueueTime now = std::chrono::system_clock::now(); // Only do this once per "loop" + + // Try to enable some queued txns (if any) if there are slots available + while (limiter->size() > 0 && limiter->reserve()) { + auto [txnp, contp, start_time] = limiter->pop(); + std::chrono::microseconds delay = std::chrono::duration_cast(now - start_time); + + limiter->delayHeader(txnp, delay); + TSDebug(PLUGIN_NAME, "Enabling queued txn after %ldms", static_cast(delay.count())); + // Since this was a delayed transaction, we need to add the TXN_CLOSE hook to free the slot when done + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, contp); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + } + + // Kill any queued txns if they are too old + if (limiter->max_age > std::chrono::milliseconds::zero() && limiter->size() > 0) { + now = std::chrono::system_clock::now(); // Update the "now", for some extra accuracy + + while (limiter->size() > 0 && limiter->hasOldTxn(now)) { + // The oldest object on the queue is too old on the queue, so "kill" it. + auto [txnp, contp, start_time] = limiter->pop(); + std::chrono::milliseconds age = std::chrono::duration_cast(now - start_time); + + limiter->delayHeader(txnp, age); + TSDebug(PLUGIN_NAME, "Queued TXN is too old (%ldms), erroring out", static_cast(age.count())); + TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); + TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, contp); + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_ERROR); + } + } + + return TS_EVENT_NONE; +} + +/////////////////////////////////////////////////////////////////////////////// +// The main rate limiting continuation. +// +int +RateLimiter::rate_limit_cont(TSCont cont, TSEvent event, void *edata) +{ + RateLimiter *limiter = static_cast(TSContDataGet(cont)); + + switch (event) { + case TS_EVENT_HTTP_TXN_CLOSE: + limiter->release(); + TSContDestroy(cont); // We are done with this continuation now + TSHttpTxnReenable(static_cast(edata), TS_EVENT_HTTP_CONTINUE); + return TS_EVENT_CONTINUE; + break; + + case TS_EVENT_HTTP_POST_REMAP: + limiter->push(static_cast(edata), cont); + return TS_EVENT_NONE; + break; + + case TS_EVENT_HTTP_SEND_RESPONSE_HDR: // This is only applicable when we set an error in remap + limiter->retryAfter(static_cast(edata), limiter->retry); + TSContDestroy(cont); // We are done with this continuation now + TSHttpTxnReenable(static_cast(edata), TS_EVENT_HTTP_CONTINUE); + return TS_EVENT_CONTINUE; + break; + + default: + TSDebug(PLUGIN_NAME, "Unknown event %d", static_cast(event)); + TSError("Unknown event in %s", PLUGIN_NAME); + break; + } + return TS_EVENT_NONE; +} + +/////////////////////////////////////////////////////////////////////////////// +// Setup the continuous queue processing continuation +// +void +RateLimiter::setupQueueCont() +{ + _queue_cont = TSContCreate(queue_process_cont, TSMutexCreate()); + TSReleaseAssert(_queue_cont); + TSContDataSet(_queue_cont, this); + _action = TSContScheduleEveryOnPool(_queue_cont, QUEUE_DELAY_TIME.count(), TS_THREAD_POOL_TASK); +} + +/////////////////////////////////////////////////////////////////////////////// +// Add a header with the delay imposed on this transaction. This can be used +// for logging, and other types of metrics. +// +void +RateLimiter::delayHeader(TSHttpTxn txnp, std::chrono::microseconds delay) const +{ + if (header.size() > 0) { + TSMLoc hdr_loc = nullptr; + TSMBuffer bufp = nullptr; + TSMLoc field_loc = nullptr; + + if (TS_SUCCESS == TSHttpTxnClientReqGet(txnp, &bufp, &hdr_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, header.c_str(), header.size(), &field_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, static_cast(delay.count()))) { + TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + } + } +} + +/////////////////////////////////////////////////////////////////////////////// +// Add a header with the delay imposed on this transaction. This can be used +// for logging, and other types of metrics. +// +void +RateLimiter::retryAfter(TSHttpTxn txnp, unsigned retry) const +{ + if (retry > 0) { + TSMLoc hdr_loc = nullptr; + TSMBuffer bufp = nullptr; + TSMLoc field_loc = nullptr; + + if (TS_SUCCESS == TSHttpTxnClientRespGet(txnp, &bufp, &hdr_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldCreateNamed(bufp, hdr_loc, "Retry-After", 11, &field_loc)) { + if (TS_SUCCESS == TSMimeHdrFieldValueIntSet(bufp, hdr_loc, field_loc, -1, retry)) { + TSDebug(PLUGIN_NAME, "Added a Retry-After: %u", retry); + TSMimeHdrFieldAppend(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, hdr_loc, field_loc); + } + TSHandleMLocRelease(bufp, TS_NULL_MLOC, hdr_loc); + } + } +} diff --git a/plugins/experimental/rate_limit/limiter.h b/plugins/experimental/rate_limit/limiter.h new file mode 100644 index 00000000000..947f6a9f3ae --- /dev/null +++ b/plugins/experimental/rate_limit/limiter.h @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +constexpr char const PLUGIN_NAME[] = "rate_limit"; +constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{100}; // Examine the queue every 100ms + +using QueueTime = std::chrono::time_point; +using QueueItem = std::tuple; + +/////////////////////////////////////////////////////////////////////////////// +// Configuration object for a rate limiting remap rule. +// +class RateLimiter +{ +public: + RateLimiter() : _queue_lock(TSMutexCreate()), _active_lock(TSMutexCreate()) {} + + ~RateLimiter() + { + if (_action) { + TSActionCancel(_action); + } + if (_queue_cont) { + TSContDestroy(_queue_cont); + } + TSMutexDestroy(_queue_lock); + TSMutexDestroy(_active_lock); + } + + // Reserve / release a slot from the active connect limits. Reserve will return + // false if we are unable to reserve a slot. + bool + reserve() + { + TSReleaseAssert(_active <= limit); + TSMutexLock(_active_lock); + if (_active < limit) { + ++_active; + TSMutexUnlock(_active_lock); // Reduce the critical section, release early + TSDebug(PLUGIN_NAME, "Reserving a slot, active txns == %u", active()); + return true; + } else { + TSMutexUnlock(_active_lock); + return false; + } + } + + void + release() + { + TSMutexLock(_active_lock); + --_active; + TSMutexUnlock(_active_lock); + TSDebug(PLUGIN_NAME, "Releasing a slot, active txns == %u", active()); + } + + // Current size of the active_in connections + unsigned + active() const + { + return _active.load(); + } + + // Current size of the queue + unsigned + size() const + { + return _size.load(); + } + + // Is the queue full (at it's max size)? + bool + full() const + { + return (_size == max_queue); + } + + void + push(TSHttpTxn txnp, TSCont cont) + { + QueueTime now = std::chrono::system_clock::now(); + + TSMutexLock(_queue_lock); + _queue.push_front(std::make_tuple(txnp, cont, now)); + ++_size; + TSMutexUnlock(_queue_lock); + } + + QueueItem + pop() + { + QueueItem item; + + TSMutexLock(_queue_lock); + if (!_queue.empty()) { + item = std::move(_queue.back()); + _queue.pop_back(); + --_size; + } + TSMutexUnlock(_queue_lock); + + return item; + } + + bool + hasOldTxn(QueueTime now) const + { + TSMutexLock(_queue_lock); + if (!_queue.empty()) { + QueueItem item = _queue.back(); + TSMutexUnlock(_queue_lock); // A little ugly but this reduces the critical section for the lock a little bit. + + std::chrono::milliseconds age = std::chrono::duration_cast(now - std::get<2>(item)); + + return (age >= max_age); + } else { + TSMutexUnlock(_queue_lock); + return false; + } + } + + void delayHeader(TSHttpTxn txpn, std::chrono::microseconds delay) const; + void retryAfter(TSHttpTxn txpn, unsigned after) const; + + // Continuation creation and scheduling + void setupQueueCont(); + + void + setupTxnCont(void *ih, TSHttpTxn txnp, TSHttpHookID hook) + { + TSCont cont = TSContCreate(rate_limit_cont, nullptr); + TSReleaseAssert(cont); + + TSContDataSet(cont, ih); + TSHttpTxnHookAdd(txnp, hook, cont); + } + + // These are the configurable portions of this limiter, public so sue me. + unsigned limit = 100; // Arbitrary default, probably should be a required config + unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max + unsigned error = 429; // Error code when we decide not to allow a txn to be processed (e.g. queue full) + unsigned retry = 0; // If > 0, we will also send a Retry-After: header with this retry value + std::chrono::milliseconds max_age = std::chrono::milliseconds::zero(); // Max age (ms) in the queue + std::string header; // Header to put the latency metrics in, e.g. @RateLimit-Delay + +private: + static int queue_process_cont(TSCont cont, TSEvent event, void *edata); + static int rate_limit_cont(TSCont cont, TSEvent event, void *edata); + + std::atomic _active = 0; // Current active number of txns. This has to always stay <= limit above + std::atomic _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue + + TSMutex _queue_lock, _active_lock; // Resource locks + std::deque _queue; // Queue for the pending TXN's + TSCont _queue_cont = nullptr; // Continuation processing the queue periodically + TSAction _action = nullptr; // The action associated with the queue continuation, needed to shut it down +}; diff --git a/plugins/experimental/rate_limit/rate_limit.cc b/plugins/experimental/rate_limit/rate_limit.cc new file mode 100644 index 00000000000..37d8c65b5b1 --- /dev/null +++ b/plugins/experimental/rate_limit/rate_limit.cc @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include + +#include "limiter.h" + +/////////////////////////////////////////////////////////////////////////////// +// Setup stuff for the remap plugin +// +TSReturnCode +TSRemapInit(TSRemapInterface *api_info, char *errbuf, int errbuf_size) +{ + if (!api_info) { + strncpy(errbuf, "[tsremap_init] - Invalid TSRemapInterface argument", errbuf_size - 1); + return TS_ERROR; + } + + if (api_info->tsremap_version < TSREMAP_VERSION) { + snprintf(errbuf, errbuf_size - 1, "[TSRemapInit] - Incorrect API version %ld.%ld", api_info->tsremap_version >> 16, + (api_info->tsremap_version & 0xffff)); + return TS_ERROR; + } + + TSDebug(PLUGIN_NAME, "plugin is successfully initialized"); + return TS_SUCCESS; +} + +void +TSRemapDeleteInstance(void *ih) +{ + delete static_cast(ih); +} + +TSReturnCode +TSRemapNewInstance(int argc, char *argv[], void **ih, char * /* errbuf ATS_UNUSED */, int /* errbuf_size ATS_UNUSED */) +{ + static const struct option longopt[] = { + {const_cast("limit"), required_argument, nullptr, 'l'}, + {const_cast("queue"), required_argument, nullptr, 'q'}, + {const_cast("error"), required_argument, nullptr, 'e'}, + {const_cast("retry"), required_argument, nullptr, 'r'}, + {const_cast("header"), required_argument, nullptr, 'h'}, + {const_cast("maxage"), required_argument, nullptr, 'm'}, + // EOF + {nullptr, no_argument, nullptr, '\0'}, + }; + + RateLimiter *limiter = new RateLimiter(); + TSReleaseAssert(limiter); + // argv contains the "to" and "from" URLs. Skip the first so that the + // second one poses as the program name. + --argc; + ++argv; + + while (true) { + int opt = getopt_long(argc, (char *const *)argv, "", longopt, nullptr); + + switch (opt) { + case 'l': + limiter->limit = strtol(optarg, nullptr, 10); + break; + case 'q': + limiter->max_queue = strtol(optarg, nullptr, 10); + break; + case 'e': + limiter->error = strtol(optarg, nullptr, 10); + break; + case 'r': + limiter->retry = strtol(optarg, nullptr, 10); + break; + case 'm': + limiter->max_age = std::chrono::milliseconds(strtol(optarg, nullptr, 10)); + break; + case 'h': + limiter->header = optarg; + break; + } + if (opt == -1) { + break; + } + } + + limiter->setupQueueCont(); + + TSDebug(PLUGIN_NAME, "Added active_in limiter rule (limit=%u, queue=%u, error=%u", limiter->limit, limiter->max_queue, + limiter->error); + *ih = static_cast(limiter); + + return TS_SUCCESS; +} + +/////////////////////////////////////////////////////////////////////////////// +// This is the main "entry" point for the plugin, called for every request. +// +TSRemapStatus +TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri) +{ + RateLimiter *limiter = static_cast(ih); + + if (limiter) { + if (!limiter->reserve()) { + if (!limiter->max_queue || limiter->full()) { + // We are running at limit, and the queue has reached max capacity, give back an error and be done. + TSHttpTxnStatusSet(txnp, static_cast(limiter->error)); + limiter->setupTxnCont(ih, txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK); + TSDebug(PLUGIN_NAME, "Rejecting request, we're at capacity and queue is full"); + } else { + limiter->setupTxnCont(ih, txnp, TS_HTTP_POST_REMAP_HOOK); + TSDebug(PLUGIN_NAME, "Adding rate limiting hook, we are at capacity"); + } + } else { + limiter->setupTxnCont(ih, txnp, TS_HTTP_TXN_CLOSE_HOOK); + TSDebug(PLUGIN_NAME, "Adding txn-close hook, we're not at capacity"); + } + } + + return TSREMAP_NO_REMAP; +}