From 0654349220e812b43fcf26c29b8c70c06499288b Mon Sep 17 00:00:00 2001 From: Brian Geffon Date: Fri, 20 Nov 2015 09:37:12 -0800 Subject: [PATCH 1/4] TS-4042: Add feature to buffer request body before making downstream requests --- configure.ac | 1 + lib/atscppapi/src/Transaction.cc | 34 +++++ lib/atscppapi/src/include/atscppapi/Plugin.h | 34 ++++- .../src/include/atscppapi/Transaction.h | 17 +++ lib/atscppapi/src/utils_internal.cc | 7 +- lib/ts/apidefs.h.in | 5 + mgmt/RecordsConfig.cc | 1 - plugins/Makefile.am | 1 + plugins/request_buffer/Makefile.am | 21 +++ plugins/request_buffer/request_buffer.cc | 119 +++++++++++++++ proxy/InkAPI.cc | 50 +++++++ proxy/PluginVC.cc | 3 +- proxy/api/ts/ts.h | 31 ++++ proxy/hdrs/HTTP.h | 1 + proxy/http/HttpConfig.cc | 7 + proxy/http/HttpConfig.h | 5 + proxy/http/HttpDebugNames.cc | 6 + proxy/http/HttpSM.cc | 138 +++++++++++++++++- proxy/http/HttpSM.h | 7 +- proxy/http/HttpTransact.cc | 97 +++++++++--- proxy/http/HttpTransact.h | 4 + proxy/logstats.cc | 2 + 22 files changed, 553 insertions(+), 38 deletions(-) create mode 100644 plugins/request_buffer/Makefile.am create mode 100644 plugins/request_buffer/request_buffer.cc diff --git a/configure.ac b/configure.ac index 22f8a59b9e0..989ad91e0f4 100644 --- a/configure.ac +++ b/configure.ac @@ -1867,6 +1867,7 @@ AC_CONFIG_FILES([ plugins/healthchecks/Makefile plugins/libloader/Makefile plugins/regex_remap/Makefile + plugins/request_buffer/Makefile plugins/stats_over_http/Makefile plugins/tcpinfo/Makefile proxy/Makefile diff --git a/lib/atscppapi/src/Transaction.cc b/lib/atscppapi/src/Transaction.cc index b9396efce70..bf9761fe3a0 100644 --- a/lib/atscppapi/src/Transaction.cc +++ b/lib/atscppapi/src/Transaction.cc @@ -349,6 +349,12 @@ Transaction::getServerResponseHeaderSize() return static_cast(TSHttpTxnServerRespHdrBytesGet(state_->txn_)); } +size_t +Transaction::getClientRequestBodySize() +{ + return static_cast(TSHttpTxnClientReqBodyBytesGet(state_->txn_)); +} + size_t Transaction::getClientResponseBodySize() { @@ -414,6 +420,34 @@ Transaction::redirectTo(std::string const &url) TSHttpTxnRedirectUrlSet(state_->txn_, s, url.length()); } +std::string +Transaction::getClientRequestBody() +{ + std::string ret; + TSIOBufferReader buffer_reader = TSHttpTxnGetClientRequestBufferReader(state_->txn_); + int64_t read_avail = TSIOBufferReaderAvail(buffer_reader); + if (read_avail == 0) + return ret; + + int64_t consumed = 0; + int64_t data_len = 0; + const char *char_data = NULL; + TSIOBufferBlock block = TSIOBufferReaderStart(buffer_reader); + while (block != NULL) { + char_data = TSIOBufferBlockReadStart(block, buffer_reader, &data_len); + ret.append(char_data, data_len); + block = TSIOBufferBlockNext(block); + } + + return ret; +} + +int64_t +Transaction::getClientRequestContentLength() +{ + return TSHttpTxnGetClientRequestContentLength(state_->txn_); +} + namespace { /** diff --git a/lib/atscppapi/src/include/atscppapi/Plugin.h b/lib/atscppapi/src/include/atscppapi/Plugin.h index eb72625dc1f..b06eefc22c9 100644 --- a/lib/atscppapi/src/include/atscppapi/Plugin.h +++ b/lib/atscppapi/src/include/atscppapi/Plugin.h @@ -54,13 +54,15 @@ class Plugin : noncopyable HOOK_READ_REQUEST_HEADERS_PRE_REMAP = 0, /**< This hook will be fired before remap has occured. */ HOOK_READ_REQUEST_HEADERS_POST_REMAP, /**< This hook will be fired directly after remap has occured. */ HOOK_SEND_REQUEST_HEADERS, /**< This hook will be fired right before request headers are sent to the origin */ - HOOK_READ_RESPONSE_HEADERS, /**< This hook will be fired right after response headers have been read from the origin */ - HOOK_SEND_RESPONSE_HEADERS, /**< This hook will be fired right before the response headers are sent to the client */ - HOOK_OS_DNS, /**< This hook will be fired right after the OS DNS lookup */ - HOOK_READ_REQUEST_HEADERS, /**< This hook will be fired after the request is read. */ - HOOK_READ_CACHE_HEADERS, /**< This hook will be fired after the CACHE hdrs. */ - HOOK_CACHE_LOOKUP_COMPLETE, /**< This hook will be fired after caceh lookup complete. */ - HOOK_SELECT_ALT /**< This hook will be fired after select alt. */ + HOOK_READ_RESPONSE_HEADERS, /**< This hook will be fired right after response headers have been read from the origin */ + HOOK_SEND_RESPONSE_HEADERS, /**< This hook will be fired right before the response headers are sent to the client */ + HOOK_OS_DNS, /**< This hook will be fired right after the OS DNS lookup */ + HOOK_READ_REQUEST_HEADERS, /**< This hook will be fired after the request is read. */ + HOOK_READ_CACHE_HEADERS, /**< This hook will be fired after the CACHE hdrs. */ + HOOK_CACHE_LOOKUP_COMPLETE, /**< This hook will be fired after caceh lookup complete. */ + HOOK_SELECT_ALT, /**< This hook will be fired after select alt. */ + HOOK_HTTP_REQUEST_BUFFER_READ, /**< This hook will be fired after reading data during request buffering */ + HOOK_HTTP_REQUEST_BUFFER_READ_COMPLETE /**< This hook will be fired after reading data complete during request buffering */ }; /** @@ -155,6 +157,24 @@ class Plugin : noncopyable virtual ~Plugin(){}; + /** + * This method must be implemented when you hook HOOK_HTTP_REQUEST_BUFFER_READ + */ + virtual void + handleHttpRequestBufferRead(Transaction &transaction) + { + transaction.resume(); + }; + + /** + * This method must be implemented when you hook HOOK_HTTP_REQUEST_BUFFER_READ_COMPLETE + */ + virtual void + handleHttpRequestBufferReadComplete(Transaction &transaction) + { + transaction.resume(); + }; + protected: /** * \note This interface can never be implemented directly, it should be implemented diff --git a/lib/atscppapi/src/include/atscppapi/Transaction.h b/lib/atscppapi/src/include/atscppapi/Transaction.h index 02b33972fe3..6b3b681e677 100644 --- a/lib/atscppapi/src/include/atscppapi/Transaction.h +++ b/lib/atscppapi/src/include/atscppapi/Transaction.h @@ -323,6 +323,12 @@ class Transaction : noncopyable * @return server response header size */ size_t getServerResponseHeaderSize(); + /** + * Get the nubmber of bytes for the client reques + * + * @return client request body size */ + size_t getClientRequestBodySize(); + /** * Get the number of bytes for the client response. * This can differ from the server response size because of transformations. @@ -342,6 +348,17 @@ class Transaction : noncopyable */ void redirectTo(std::string const &url); + + /** + * Get the body (if applicable) from a client request. + */ + std::string getClientRequestBody(); + + /** + * Get the content length from a client request. + */ + int64_t getClientRequestContentLength(); + bool configIntSet(TSOverridableConfigKey conf, int value); bool configIntGet(TSOverridableConfigKey conf, int *value); bool configFloatSet(TSOverridableConfigKey conf, float value); diff --git a/lib/atscppapi/src/utils_internal.cc b/lib/atscppapi/src/utils_internal.cc index ff0012e1367..0f9955f1188 100644 --- a/lib/atscppapi/src/utils_internal.cc +++ b/lib/atscppapi/src/utils_internal.cc @@ -141,7 +141,12 @@ void inline invokePluginForEvent(Plugin *plugin, TSHttpTxn ats_txn_handle, TSEve case TS_EVENT_HTTP_SELECT_ALT: plugin->handleSelectAlt(transaction); break; - + case TS_EVENT_HTTP_REQUEST_BUFFER_READ: + plugin->handleHttpRequestBufferRead(transaction); + break; + case TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE: + plugin->handleHttpRequestBufferReadComplete(transaction); + break; default: assert(false); /* we should never get here */ break; diff --git a/lib/ts/apidefs.h.in b/lib/ts/apidefs.h.in index e8acced00f1..a2ce9ed3bd4 100644 --- a/lib/ts/apidefs.h.in +++ b/lib/ts/apidefs.h.in @@ -274,6 +274,8 @@ typedef enum { TS_HTTP_PRE_REMAP_HOOK, TS_HTTP_POST_REMAP_HOOK, TS_HTTP_RESPONSE_CLIENT_HOOK, + TS_HTTP_REQUEST_BUFFER_READ_HOOK, + TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK, // Putting the SSL hooks in the same enum space // So both sets of hooks can be set by the same Hook function TS_SSL_FIRST_HOOK, @@ -440,6 +442,8 @@ typedef enum { TS_EVENT_LIFECYCLE_SERVER_SSL_CTX_INITIALIZED = 60021, TS_EVENT_LIFECYCLE_CLIENT_SSL_CTX_INITIALIZED = 60022, TS_EVENT_VCONN_PRE_ACCEPT = 60023, + TS_EVENT_HTTP_REQUEST_BUFFER_READ = 60024, + TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE = 60025, TS_EVENT_MGMT_UPDATE = 60100, /* EVENTS 60200 - 60202 for internal use */ @@ -693,6 +697,7 @@ typedef enum { TS_CONFIG_HTTP_NUMBER_OF_REDIRECTIONS, TS_CONFIG_HTTP_CACHE_MAX_OPEN_WRITE_RETRIES, TS_CONFIG_HTTP_REDIRECT_USE_ORIG_CACHE_KEY, + TS_CONFIG_HTTP_REQUEST_BUFFER_ENABLED, TS_CONFIG_LAST_ENTRY } TSOverridableConfigKey; diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index f3a2566d591..6ecce2a4d74 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -2024,7 +2024,6 @@ static const RecordElement RecordsConfig[] = //# //########### {RECT_CONFIG, "proxy.config.cache.http.compatibility.4-2-0-fixup", RECD_INT, "1", RECU_DYNAMIC, RR_NULL, RECC_NULL, NULL, RECA_NULL}, - }; // clang-format on diff --git a/plugins/Makefile.am b/plugins/Makefile.am index 0c34f82f475..07009a37f69 100644 --- a/plugins/Makefile.am +++ b/plugins/Makefile.am @@ -24,6 +24,7 @@ SUBDIRS = \ healthchecks \ libloader \ regex_remap \ + request_buffer \ stats_over_http \ tcpinfo diff --git a/plugins/request_buffer/Makefile.am b/plugins/request_buffer/Makefile.am new file mode 100644 index 00000000000..0dc8f0d576e --- /dev/null +++ b/plugins/request_buffer/Makefile.am @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include $(top_srcdir)/build/plugins.mk + +pkglib_LTLIBRARIES = request_buffer.la +request_buffer_la_SOURCES = request_buffer.cc +request_buffer_la_LDFLAGS = $(TS_PLUGIN_LDFLAGS) diff --git a/plugins/request_buffer/request_buffer.cc b/plugins/request_buffer/request_buffer.cc new file mode 100644 index 00000000000..e4c70093d12 --- /dev/null +++ b/plugins/request_buffer/request_buffer.cc @@ -0,0 +1,119 @@ +/* request_buffer.cc - Plugin to enable request buffer for the given transaction. + */ + +#include +#include +#include +#include +#include + +#include "ts/ts.h" +#include "ts/ink_defs.h" + +#define PLUGIN_NAME "request_buffer" + +static const int MIN_BYTE_PER_SEC = 1000; +static int TXN_INDEX_ARG_TIME; +struct TimeRecord { + timespec start_time; + TimeRecord() { clock_gettime(CLOCK_MONOTONIC, &start_time); } +}; +bool +is_post_request(TSHttpTxn txnp) +{ + const char *method; + int method_len; + TSMLoc req_loc; + TSMBuffer req_bufp; + if (TSHttpTxnClientReqGet(txnp, &req_bufp, &req_loc) == TS_ERROR) { + TSError("Error while retrieving client request header\n"); + return false; + } + method = TSHttpHdrMethodGet(req_bufp, req_loc, &method_len); + if (static_cast(method_len) != strlen(TS_HTTP_METHOD_POST) || strncasecmp(method, TS_HTTP_METHOD_POST, method_len) != 0) { + TSHandleMLocRelease(req_bufp, TS_NULL_MLOC, req_loc); + return false; + } + TSHandleMLocRelease(req_bufp, TS_NULL_MLOC, req_loc); + return true; +} +bool +reached_min_speed(TSHttpTxn txnp, int body_len) +{ + TimeRecord *timeRecord = (TimeRecord *)TSHttpTxnArgGet(txnp, TXN_INDEX_ARG_TIME); + timespec now_time; + clock_gettime(CLOCK_MONOTONIC, &now_time); + double time_diff_in_sec = + (now_time.tv_sec - timeRecord->start_time.tv_sec) + 1e-9 * (now_time.tv_nsec - timeRecord->start_time.tv_nsec); + TSDebug("http", "time_diff_in_sec = %f, body_len = %d, date_rate = %f\n", time_diff_in_sec, body_len, + body_len / time_diff_in_sec); + return body_len / time_diff_in_sec >= MIN_BYTE_PER_SEC; +} +static int +hook_handler(TSCont contp, TSEvent event, void *edata) +{ + TSHttpTxn txnp = (TSHttpTxn)(edata); + if (event == TS_EVENT_HTTP_READ_REQUEST_HDR && is_post_request(txnp)) { + // enable the request body buffering + TSHttpTxnConfigIntSet(txnp, TS_CONFIG_HTTP_REQUEST_BUFFER_ENABLED, 1); + + // save the start time for calculating the data rate + TimeRecord *timeRecord = new TimeRecord(); + TSHttpTxnArgSet(txnp, TXN_INDEX_ARG_TIME, static_cast(timeRecord)); + + TSHttpTxnHookAdd(txnp, TS_HTTP_REQUEST_BUFFER_READ_HOOK, TSContCreate(hook_handler, TSMutexCreate())); + TSHttpTxnHookAdd(txnp, TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK, TSContCreate(hook_handler, TSMutexCreate())); + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, TSContCreate(hook_handler, TSMutexCreate())); + } else if (event == TS_EVENT_HTTP_REQUEST_BUFFER_READ || event == TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE) { + int64_t ret_len = TSHttpTxnClientReqBodyBytesGet(txnp); + if (!reached_min_speed(txnp, ret_len)) { + TSError("[hook_handler] Error : reached_min_speed checking failed\n"); + TSHttpTxnReenable(txnp, TS_EVENT_ERROR); + return 0; + } + + // get the received request body + TSIOBufferReader buffer_reader = TSHttpTxnGetClientRequestBufferReader(txnp); + int64_t read_avail = TSIOBufferReaderAvail(buffer_reader); + if (read_avail) { + char *body = (char *)TSmalloc(sizeof(char) * read_avail); + int64_t consumed = 0; + int64_t data_len = 0; + const char *char_data = NULL; + TSIOBufferBlock block = TSIOBufferReaderStart(buffer_reader); + while (block != NULL) { + char_data = TSIOBufferBlockReadStart(block, buffer_reader, &data_len); + memcpy(body + consumed, char_data, data_len); + consumed += data_len; + block = TSIOBufferBlockNext(block); + } + // play with the body + // ... + TSfree(body); + } + } else if (event == TS_EVENT_HTTP_TXN_CLOSE) { + TimeRecord *timeRecord = (TimeRecord *)TSHttpTxnArgGet(txnp, TXN_INDEX_ARG_TIME); + delete timeRecord; + } + TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); + return 0; +} + +void +TSPluginInit(int argc, const char *argv[]) +{ + TSPluginRegistrationInfo info; + + info.plugin_name = (char *)PLUGIN_NAME; + ; + info.vendor_name = (char *)"Apache Software Foundation"; + info.support_email = (char *)"dev@trafficserver.apache.org"; + if (TSPluginRegister(&info) != TS_SUCCESS) { + TSError("[PluginInit] Plugin registration failed.\n"); + } else { + if (TSHttpArgIndexReserve(PLUGIN_NAME, "Stores the transaction context", &TXN_INDEX_ARG_TIME) != TS_SUCCESS) { + TSError("[PluginInit] failed to reserve an argument index"); + } + TSHttpHookAdd(TS_HTTP_READ_REQUEST_HDR_HOOK, TSContCreate(hook_handler, TSMutexCreate())); + } +} diff --git a/proxy/InkAPI.cc b/proxy/InkAPI.cc index 8373e1677ac..08ded642901 100644 --- a/proxy/InkAPI.cc +++ b/proxy/InkAPI.cc @@ -7964,6 +7964,9 @@ _conf_to_memberp(TSOverridableConfigKey conf, OverridableHttpConfigParams *overr typ = OVERRIDABLE_TYPE_INT; ret = &overridableHttpConfig->slow_log_threshold; break; + case TS_CONFIG_HTTP_REQUEST_BUFFER_ENABLED: + ret = &overridableHttpConfig->request_buffer_enabled; + break; case TS_CONFIG_BODY_FACTORY_TEMPLATE_BASE: typ = OVERRIDABLE_TYPE_STRING; ret = &overridableHttpConfig->body_factory_template_base; @@ -8319,6 +8322,10 @@ TSHttpTxnConfigFind(const char *name, int length, TSOverridableConfigKey *conf, case 40: switch (name[length - 1]) { + case 'd': + if (!strncmp(name, "proxy.config.http.request_buffer_enabled", length)) + cnf = TS_CONFIG_HTTP_REQUEST_BUFFER_ENABLED; + break; case 'e': if (!strncmp(name, "proxy.config.http.down_server.cache_time", length)) cnf = TS_CONFIG_HTTP_DOWN_SERVER_CACHE_TIME; @@ -8903,3 +8910,46 @@ TSVConnReenable(TSVConn vconn) } } } + +tsapi char * +TSHttpTxnGetClientRequestBody(TSHttpTxn txnp, int *len) +{ + char *ret = NULL; + + sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS); + HttpSM *sm = (HttpSM *)txnp; + int64_t read_avail = sm->ua_buffer_reader->read_avail(); + if (read_avail == 0 || sm->t_state.hdr_info.request_content_length <= 0) + return NULL; + + ret = (char *)TSmalloc(sizeof(char) * read_avail); + + int64_t consumed = 0; + int64_t data_len = 0; + const char *char_data = NULL; + TSIOBufferBlock block = TSIOBufferReaderStart((TSIOBufferReader)sm->ua_buffer_reader); + while (block != NULL) { + char_data = TSIOBufferBlockReadStart(block, (TSIOBufferReader)sm->ua_buffer_reader, &data_len); + memcpy(ret + consumed, char_data, data_len); + consumed += data_len; + block = TSIOBufferBlockNext(block); + } + + *len = (int)consumed; + + return ret; +} + +tsapi TSIOBufferReader +TSHttpTxnGetClientRequestBufferReader(TSHttpTxn txnp) +{ + sdk_assert(sdk_sanity_check_txn(txnp) == TS_SUCCESS); + HttpSM *sm = (HttpSM *)txnp; + return (TSIOBufferReader)sm->ua_buffer_reader; +} + +tsapi int64_t +TSHttpTxnGetClientRequestContentLength(TSHttpTxn txnp) +{ + return ((HttpSM *)txnp)->t_state.hdr_info.request_content_length; +} diff --git a/proxy/PluginVC.cc b/proxy/PluginVC.cc index fb296964d2d..522e7a8f625 100644 --- a/proxy/PluginVC.cc +++ b/proxy/PluginVC.cc @@ -639,7 +639,8 @@ PluginVC::process_read_side(bool other_side_call) water_mark = MAX(water_mark, PVC_DEFAULT_MAX_BYTES); int64_t buf_space = water_mark - output_buffer->max_read_avail(); if (buf_space <= 0) { - Debug("pvc", "[%u] %s: process_read_side no buffer space", core_obj->id, PVC_TYPE); + Debug("pvc", "[%u] %s: process_read_side no buffer space, output_buffer=%p, water_mark=%lld, max_read_available=%lld", + core_obj->id, PVC_TYPE, output_buffer, water_mark, output_buffer->max_read_avail()); return; } act_on = MIN(act_on, buf_space); diff --git a/proxy/api/ts/ts.h b/proxy/api/ts/ts.h index 7ef9914fc60..375a455357c 100644 --- a/proxy/api/ts/ts.h +++ b/proxy/api/ts/ts.h @@ -2387,6 +2387,37 @@ tsapi const char *TSHttpHookNameLookup(TSHttpHookID hook); */ tsapi const char *TSHttpEventNameLookup(TSEvent event); +/** + Get the body (if applicable) from a client request. YOU MUST + free the pointer returned from this call. This call will return + NULL if no body exists. Changes to this string will not affect + the request body. The second parameter will contain the length + of the string returned. + + @return char * that contains the entire buffered request body, this + must be freed by the caller using TSFree! +*/ +tsapi char *TSHttpTxnGetClientPequestBody(TSHttpTxn txnp, int *length); + +/** + Get the IOBufferReader (if applicable) from a client request. It's easy to + get the request body from the returned IOBufferReader. + + @param txnp the transaction pointer + + @return TSIOBufferReader the IOBufferReader +*/ +tsapi TSIOBufferReader TSHttpTxnGetClientRequestBufferReader(TSHttpTxn txnp); + + +/** + Get the content length of a client request. + + @param txnp the transaction pointer + + @return int64_t the content length +*/ +tsapi int64_t TSHttpTxnGetClientRequestContentLength(TSHttpTxn txnp); #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/proxy/hdrs/HTTP.h b/proxy/hdrs/HTTP.h index 7d91687ded7..b6896b7c3c4 100644 --- a/proxy/hdrs/HTTP.h +++ b/proxy/hdrs/HTTP.h @@ -135,6 +135,7 @@ enum SquidLogCode { SQUID_LOG_UDP_FUTURE_1 = 'n', SQUID_LOG_UDP_FUTURE_2 = 'o', SQUID_LOG_ERR_READ_TIMEOUT = 'p', + SQUID_LOG_ERR_REQUESTBUFFER_TIMEOUT = 'P', SQUID_LOG_ERR_LIFETIME_EXP = 'q', SQUID_LOG_ERR_POST_ENTITY_TOO_LARGE = 'L', SQUID_LOG_ERR_NO_CLIENTS_BIG_OBJ = 'r', diff --git a/proxy/http/HttpConfig.cc b/proxy/http/HttpConfig.cc index 5a94fcba5eb..9f3273826b9 100644 --- a/proxy/http/HttpConfig.cc +++ b/proxy/http/HttpConfig.cc @@ -815,6 +815,10 @@ register_stat_callbacks() (int)https_incoming_requests_stat, RecRawStatSyncCount); RecRegisterRawStat(http_rsb, RECT_PROCESS, "proxy.process.https.total_client_connections", RECD_COUNTER, RECP_PERSISTENT, (int)https_total_client_connections_stat, RecRawStatSyncCount); + RecRegisterRawStat(http_rsb, RECT_PROCESS, "proxy.process.http.request_body_receive_timeout", RECD_COUNTER, RECP_PERSISTENT, + (int)http_request_body_receive_timeout_stat, RecRawStatSyncCount); + RecRegisterRawStat(http_rsb, RECT_PROCESS, "proxy.process.http.total_request_buffer_memory", RECD_INT, RECP_NON_PERSISTENT, + (int)http_total_request_buffer_memory, RecRawStatSyncSum); RecRegisterRawStat(http_rsb, RECT_PROCESS, "proxy.process.http.post_body_too_large", RECD_COUNTER, RECP_PERSISTENT, (int)http_post_body_too_large, RecRawStatSyncCount); // milestones @@ -1092,6 +1096,9 @@ HttpConfig::startup() HttpEstablishStaticConfigLongLong(c.oride.default_buffer_size_index, "proxy.config.http.default_buffer_size"); HttpEstablishStaticConfigLongLong(c.oride.default_buffer_water_mark, "proxy.config.http.default_buffer_water_mark"); + // Request buffering + HttpEstablishStaticConfigByte(c.oride.request_buffer_enabled, "proxy.config.http.request_buffer_enabled"); + // Stat Page Info HttpEstablishStaticConfigByte(c.enable_http_info, "proxy.config.http.enable_http_info"); diff --git a/proxy/http/HttpConfig.h b/proxy/http/HttpConfig.h index 5f99cbdeaf2..6d898ce6fab 100644 --- a/proxy/http/HttpConfig.h +++ b/proxy/http/HttpConfig.h @@ -233,6 +233,8 @@ enum { http_total_x_redirect_stat, + http_request_body_receive_timeout_stat, + http_total_request_buffer_memory, // Times http_total_transactions_time_stat, http_parent_proxy_transaction_time_stat, @@ -384,6 +386,7 @@ struct OverridableHttpConfigParams { freshness_fuzz_min_time(0), max_cache_open_read_retries(-1), cache_open_read_retry_time(10), cache_generation_number(-1), max_cache_open_write_retries(1), background_fill_active_timeout(60), http_chunking_size(4096), flow_high_water_mark(0), flow_low_water_mark(0), default_buffer_size_index(8), default_buffer_water_mark(32768), slow_log_threshold(0), + request_buffer_enabled(0), // Strings / floats must come last body_factory_template_base(NULL), body_factory_template_base_len(0), proxy_response_server_string(NULL), @@ -561,6 +564,8 @@ struct OverridableHttpConfigParams { MgmtInt default_buffer_size_index; MgmtInt default_buffer_water_mark; MgmtInt slow_log_threshold; + + MgmtByte request_buffer_enabled; // IMPORTANT: Here comes all strings / floats configs. /////////////////////////////////////////////////////////////////// diff --git a/proxy/http/HttpDebugNames.cc b/proxy/http/HttpDebugNames.cc index 7db357d8b29..768dd2c8db9 100644 --- a/proxy/http/HttpDebugNames.cc +++ b/proxy/http/HttpDebugNames.cc @@ -386,6 +386,8 @@ HttpDebugNames::get_action_name(HttpTransact::StateMachineAction_t e) return ("SM_ACTION_API_POST_REMAP"); case HttpTransact::SM_ACTION_POST_REMAP_SKIP: return ("SM_ACTION_POST_REMAP_SKIP"); + case HttpTransact::SM_ACTION_WAIT_FOR_FULL_BODY: + return ("SM_ACTION_WAIT_FOR_FULL_BODY"); } return ("unknown state name"); @@ -483,6 +485,10 @@ HttpDebugNames::get_api_hook_name(TSHttpHookID t) return "TS_VCONN_PRE_ACCEPT_HOOK"; case TS_SSL_CERT_HOOK: return "TS_SSL_CERT_HOOK"; + case TS_HTTP_REQUEST_BUFFER_READ_HOOK: + return "TS_HTTP_REQUEST_BUFFER_READ_HOOK"; + case TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK: + return "TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK"; } return "unknown hook"; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index 188aef5589b..c93e864b6b2 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -268,11 +268,12 @@ HttpSM::HttpSM() : Continuation(NULL), sm_id(-1), magic(HTTP_SM_MAGIC_DEAD), // YTS Team, yamsat Plugin enable_redirection(false), redirect_url(NULL), redirect_url_len(0), redirection_tries(0), transfered_bytes(0), - post_failed(false), debug_on(false), plugin_tunnel_type(HTTP_NO_PLUGIN_TUNNEL), plugin_tunnel(NULL), reentrancy_count(0), - history_pos(0), tunnel(), ua_entry(NULL), ua_session(NULL), background_fill(BACKGROUND_FILL_NONE), ua_raw_buffer_reader(NULL), - server_entry(NULL), server_session(NULL), will_be_private_ss(false), shared_session_retries(0), server_buffer_reader(NULL), - transform_info(), post_transform_info(), has_active_plugin_agents(false), second_cache_sm(NULL), default_handler(NULL), - pending_action(NULL), historical_action(NULL), last_action(HttpTransact::SM_ACTION_UNDEFINED), + post_failed(false), debug_on(false), request_fully_received(false), plugin_tunnel_type(HTTP_NO_PLUGIN_TUNNEL), + plugin_tunnel(NULL), reentrancy_count(0), history_pos(0), tunnel(), ua_entry(NULL), ua_session(NULL), + background_fill(BACKGROUND_FILL_NONE), ua_raw_buffer_reader(NULL), server_entry(NULL), server_session(NULL), + will_be_private_ss(false), shared_session_retries(0), server_buffer_reader(NULL), transform_info(), post_transform_info(), + has_active_plugin_agents(false), second_cache_sm(NULL), default_handler(NULL), pending_action(NULL), historical_action(NULL), + last_action(HttpTransact::SM_ACTION_UNDEFINED), // TODO: Now that bodies can be empty, should the body counters be set to -1 ? TS-2213 client_request_hdr_bytes(0), client_request_body_bytes(0), server_request_hdr_bytes(0), server_request_body_bytes(0), server_response_hdr_bytes(0), server_response_body_bytes(0), client_response_hdr_bytes(0), client_response_body_bytes(0), @@ -308,6 +309,7 @@ HttpSM::cleanup() } magic = HTTP_SM_MAGIC_DEAD; debug_on = false; + request_fully_received = false; } void @@ -831,6 +833,127 @@ HttpSM::state_drain_client_request_body(int event, void *data) } #endif /* PROXY_DRAIN */ +void +HttpSM::wait_for_full_body() +{ + int64_t request_bytes = t_state.hdr_info.request_content_length; // handy save + int64_t avail = ua_buffer_reader->read_avail(); + + client_request_body_bytes = (avail < request_bytes) ? avail : request_bytes; + + ua_buffer_reader->mbuf->size_index = buffer_size_to_index(request_bytes, MAX_BUFFER_SIZE_INDEX); + + // we shouldn't pre-allocate large blocks just because the client sent a large content-length (basic DoS) + if (ua_buffer_reader->mbuf->size_index > BUFFER_SIZE_INDEX_32K) { + // If you need more than 32kb, just use a 256kb buffer, anything below 32kb will use the best fit. + // Since this would likely be a file post (such as a profile image) and not a form submission. + // We do this to prevent fragmentation for larger buckets. + ua_buffer_reader->mbuf->size_index = BUFFER_SIZE_INDEX_256K; + } + + DebugSM("http_request_wait", "[%" PRId64 "] buffer size to index changed: %" PRId64, sm_id, ua_buffer_reader->mbuf->size_index); + + // keep track of the total memory we will use for requests (even if it's never actually allocated). + // Tis is just the total memory that we could possibly use for request buffer. + HTTP_SUM_DYN_STAT(http_total_request_buffer_memory, request_bytes); + + ua_buffer_reader->mbuf->water_mark = request_bytes; + int64_t avail_for_write = ua_buffer_reader->mbuf->block_write_avail(); + if (avail_for_write < (request_bytes - client_request_body_bytes)) { + ua_buffer_reader->mbuf->add_block(); + } + + ua_entry->vc_handler = &HttpSM::state_wait_for_full_body; + ua_entry->read_vio = ua_entry->vc->do_io_read(this, request_bytes - client_request_body_bytes, ua_buffer_reader->mbuf); +} + +int +HttpSM::state_wait_for_full_body(int event, void *data) +{ + STATE_ENTER(&HttpSM::state_wait_for_full_body, event); + + ink_assert(ua_entry->read_vio == (VIO *)data); + ink_assert(ua_entry->vc == ua_session); + + int64_t avail = ua_buffer_reader->read_avail(); + int64_t left = t_state.hdr_info.request_content_length - client_request_body_bytes; + int64_t avail_for_write = ua_buffer_reader->mbuf->block_write_avail(); + + DebugSM("http_request_wait", "[%" PRId64 "] event %d, data %p, current buffer avail: %" PRId64 ", remaining: %" PRId64 + ", block available for write: %" PRId64 ", block count: %d", + sm_id, event, data, avail, left, avail_for_write, ua_buffer_reader->block_count()); + if (event == VC_EVENT_READ_READY && avail_for_write <= left) { + ua_buffer_reader->mbuf->add_block(); + DebugSM("http_request_wait", "[%" PRId64 "] adding block for request body", sm_id); + } + + switch (event) { + case VC_EVENT_EOS: + case VC_EVENT_ERROR: { + // Nothing we can do + HTTP_SUM_DYN_STAT(http_total_request_buffer_memory, -t_state.hdr_info.request_content_length); + set_ua_abort(HttpTransact::ABORTED, event); + terminate_sm = true; + break; + } + case VC_EVENT_ACTIVE_TIMEOUT: + case VC_EVENT_INACTIVITY_TIMEOUT: { + // Handle timeout case. + HTTP_SUM_DYN_STAT(http_total_request_buffer_memory, -t_state.hdr_info.request_content_length); + ua_entry->vc->do_io_shutdown(IO_SHUTDOWN_READ); + set_ua_abort(HttpTransact::ABORTED, event); + call_transact_and_set_next_state(HttpTransact::ClientRequestTimeout); + break; + } + case VC_EVENT_READ_READY: { + client_request_body_bytes = avail; // since we're never consuming. + DebugSM("http_request_wait", "[%" PRId64 "] VC_EVENT_READ_READY: Post wait for full body: received: %" PRId64 + " bytes expecting %" PRId64 " dest buffer=%p", + sm_id, avail, t_state.hdr_info.request_content_length, ua_buffer_reader->mbuf); + APIHook *hook = api_hooks.get(TS_HTTP_REQUEST_BUFFER_READ_HOOK); + while (hook) { + hook->invoke(TS_EVENT_HTTP_REQUEST_BUFFER_READ, this); + hook = hook->m_link.next; + } + ua_entry->read_vio->reenable(); + break; + } + case VC_EVENT_READ_COMPLETE: { + // We've finished draing the REQUEST body + HTTP_SUM_DYN_STAT(http_total_request_buffer_memory, -t_state.hdr_info.request_content_length); + int64_t avail = ua_buffer_reader->read_avail(); + + DebugSM("http_request_wait", "[%" PRId64 "] VC_EVENT_READ_COMPLETE: Request wait for full body is complete, received: %" PRId64 + " bytes expecting %" PRId64, + sm_id, avail, t_state.hdr_info.request_content_length); + + client_request_body_bytes = avail; // since we're never consuming. + + ink_assert(client_request_body_bytes == t_state.hdr_info.request_content_length); + + // At this point if an expect: 100-continue header existed it can be removed + // as we are going to send the full request body to the origin now + int len = 0; + const char *expect = t_state.hdr_info.client_request.value_get(MIME_FIELD_EXPECT, MIME_LEN_EXPECT, &len); + + if ((len == HTTP_LEN_100_CONTINUE) && (strncasecmp(expect, HTTP_VALUE_100_CONTINUE, HTTP_LEN_100_CONTINUE) == 0)) { + t_state.hdr_info.client_request.field_delete(MIME_FIELD_EXPECT, MIME_LEN_EXPECT); + } + + request_fully_received = true; + ua_buffer_reader->mbuf->size_index = HTTP_HEADER_BUFFER_SIZE_INDEX; + ua_entry->vc_handler = &HttpSM::state_watch_for_client_abort; + ua_entry->read_vio = ua_entry->vc->do_io_read(this, INT64_MAX, ua_buffer_reader->mbuf); + call_transact_and_set_next_state(HttpTransact::HandleRequest); + break; + } + default: + ink_release_assert(0); + } + + return EVENT_DONE; +} + int HttpSM::state_watch_for_client_abort(int event, void *data) @@ -7334,6 +7457,11 @@ HttpSM::set_next_state() } #endif /* PROXY_DRAIN */ + case HttpTransact::SM_ACTION_WAIT_FOR_FULL_BODY: { + wait_for_full_body(); + break; + } + case HttpTransact::SM_ACTION_CONTINUE: { ink_release_assert(!"Not implemented"); } diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h index 976489902fd..5398a30a712 100644 --- a/proxy/http/HttpSM.h +++ b/proxy/http/HttpSM.h @@ -277,6 +277,9 @@ class HttpSM : public Continuation bool post_failed; // Added to identify post failure bool debug_on; // Transaction specific debug flag + bool request_fully_received; + bool request_fully_received_invoked; + // Tunneling request to plugin HttpPluginTunnel_t plugin_tunnel_type; PluginVCCore *plugin_tunnel; @@ -308,10 +311,10 @@ class HttpSM : public Continuation void set_http_schedule(Continuation *); int get_http_schedule(int event, void *data); -protected: IOBufferReader *ua_buffer_reader; IOBufferReader *ua_raw_buffer_reader; +protected: HttpVCTableEntry *server_entry; HttpServerSession *server_session; @@ -358,6 +361,7 @@ class HttpSM : public Continuation #ifdef PROXY_DRAIN int state_drain_client_request_body(int event, void *data); #endif /* PROXY_DRAIN */ + int state_wait_for_full_body(int event, void *data); int state_read_client_request_header(int event, void *data); int state_watch_for_client_abort(int event, void *data); int state_read_push_response_header(int event, void *data); @@ -426,6 +430,7 @@ class HttpSM : public Continuation #ifdef PROXY_DRAIN void do_drain_request_body(); #endif + void wait_for_full_body(); bool do_congestion_control_lookup(); diff --git a/proxy/http/HttpTransact.cc b/proxy/http/HttpTransact.cc index 343a0e66a85..ad332c0e0dd 100644 --- a/proxy/http/HttpTransact.cc +++ b/proxy/http/HttpTransact.cc @@ -513,6 +513,18 @@ HttpTransact::BadRequest(State *s) TRANSACT_RETURN(SM_ACTION_SEND_ERROR_CACHE_NOOP, NULL); } +void +HttpTransact::ClientRequestTimeout(State *s) +{ + DebugTxn("http_trans", "[ClientRequestTimeout]" + "client timeout while requesting."); + HTTP_INCREMENT_TRANS_STAT(http_request_body_receive_timeout_stat); + bootstrap_state_variables_from_request(s, &s->hdr_info.client_request); + build_error_response(s, HTTP_STATUS_REQUEST_TIMEOUT, "Request Timeout", "timeout#activity", NULL); + s->squid_codes.log_code = SQUID_LOG_ERR_REQUESTBUFFER_TIMEOUT; + TRANSACT_RETURN(SM_ACTION_SEND_ERROR_CACHE_NOOP, NULL); +} + void HttpTransact::HandleBlindTunnel(State *s) { @@ -1197,42 +1209,83 @@ HttpTransact::handleIfRedirect(State *s) return false; } + +void +HttpTransact::HandleRequestNoOp(State *s) +{ +} + void HttpTransact::HandleRequest(State *s) { DebugTxn("http_trans", "START HttpTransact::HandleRequest"); + if (!s->request_data.hdr) { // we haven't initialized + ink_assert(!s->hdr_info.server_request.valid()); - ink_assert(!s->hdr_info.server_request.valid()); + HTTP_INCREMENT_TRANS_STAT(http_incoming_requests_stat); - HTTP_INCREMENT_TRANS_STAT(http_incoming_requests_stat); + if (s->client_info.port_attribute == HttpProxyPort::TRANSPORT_SSL) { + HTTP_INCREMENT_TRANS_STAT(https_incoming_requests_stat); + } - if (s->client_info.port_attribute == HttpProxyPort::TRANSPORT_SSL) { - HTTP_INCREMENT_TRANS_STAT(https_incoming_requests_stat); - } + /////////////////////////////////////////////// + // if request is bad, return error response // + /////////////////////////////////////////////// - /////////////////////////////////////////////// - // if request is bad, return error response // - /////////////////////////////////////////////// + if (!(is_request_valid(s, &s->hdr_info.client_request))) { + HTTP_INCREMENT_TRANS_STAT(http_invalid_client_requests_stat); + DebugTxn("http_seq", "[HttpTransact::HandleRequest] request invalid."); + s->next_action = SM_ACTION_SEND_ERROR_CACHE_NOOP; + // s->next_action = HttpTransact::PROXY_INTERNAL_CACHE_NOOP; + return; + } + DebugTxn("http_seq", "[HttpTransact::HandleRequest] request valid."); - if (!(is_request_valid(s, &s->hdr_info.client_request))) { - HTTP_INCREMENT_TRANS_STAT(http_invalid_client_requests_stat); - DebugTxn("http_seq", "[HttpTransact::HandleRequest] request invalid."); - s->next_action = SM_ACTION_SEND_ERROR_CACHE_NOOP; - // s->next_action = HttpTransact::PROXY_INTERNAL_CACHE_NOOP; - return; - } - DebugTxn("http_seq", "[HttpTransact::HandleRequest] request valid."); + if (is_debug_tag_set("http_chdr_describe")) { + obj_describe(s->hdr_info.client_request.m_http, 1); + } - if (is_debug_tag_set("http_chdr_describe")) { - obj_describe(s->hdr_info.client_request.m_http, 1); + // at this point we are guaranteed that the request is good and acceptable. + // initialize some state variables from the request (client version, + // client keep-alive, cache action, etc. + initialize_state_variables_from_request(s, &s->hdr_info.client_request); } - // at this point we are guaranteed that the request is good and acceptable. - // initialize some state variables from the request (client version, - // client keep-alive, cache action, etc. - initialize_state_variables_from_request(s, &s->hdr_info.client_request); + if (s->txn_conf->request_buffer_enabled && s->hdr_info.request_content_length > 0) { + // Let's check if we've already fully received the request. + if (s->hdr_info.request_content_length <= s->state_machine->ua_buffer_reader->read_avail()) { + s->state_machine->request_fully_received = true; + } + + int len = 0; + const char *expect = s->hdr_info.client_request.value_get(MIME_FIELD_EXPECT, MIME_LEN_EXPECT, &len); + if ((len == HTTP_LEN_100_CONTINUE) && (strncasecmp(expect, HTTP_VALUE_100_CONTINUE, HTTP_LEN_100_CONTINUE) == 0)) { + s->hdr_info.client_request.field_delete(MIME_FIELD_EXPECT, MIME_LEN_EXPECT); + } + if (!s->state_machine->request_fully_received) { + DebugTxn("http_trans", "Waiting for full request body of length: %" PRId64, s->hdr_info.request_content_length); + // We set the transact return point to HttpTransaction::handlerequestnoop because + // we only want to advance the state machine when the body is fully received and + // dispatching plugins need a place to reenable to, so we’ll reenable to a noop and + // activity on the VConn will ensure the transaction doesn’t get lost. + TRANSACT_RETURN(SM_ACTION_WAIT_FOR_FULL_BODY, HttpTransact::HandleRequestNoOp); + return; + } else { + // We set the return point to NoOp so we don’t recursively call into HandleRequest and + // once all plugins have been fully dispatched we’ll continue exectuing through handle request (which + // at some point later sets the trasact return point). So if a plugin decides not to reenable, when it finally + // does reenable it will continue executing at the next state which will be determined later in this method. + s->transact_return_point = HttpTransact::HandleRequestNoOp; + APIHook *hook = s->state_machine->txn_hook_get(TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK); + while (hook) { + hook->invoke(TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE, s->state_machine); + hook = hook->m_link.next; + } + DebugTxn("http_trans", "Finished receiving request body of length %" PRId64, s->hdr_info.request_content_length); + } + } // The following chunk of code will limit the maximum number of websocket connections (TS-3659) if (s->is_upgrade_request && s->is_websocket && s->http_config_param->max_websocket_connections >= 0) { int64_t val = 0; diff --git a/proxy/http/HttpTransact.h b/proxy/http/HttpTransact.h index c5a91000ba4..438d794d33e 100644 --- a/proxy/http/HttpTransact.h +++ b/proxy/http/HttpTransact.h @@ -476,6 +476,8 @@ class HttpTransact SM_ACTION_DRAIN_REQUEST_BODY, #endif /* PROXY_DRAIN */ + SM_ACTION_WAIT_FOR_FULL_BODY, + SM_ACTION_SERVE_FROM_CACHE, SM_ACTION_SERVER_READ, SM_ACTION_SERVER_PARSE_NEXT_HDR, @@ -1115,12 +1117,14 @@ class HttpTransact static void PerformRemap(State *s); static void ModifyRequest(State *s); static void HandleRequest(State *s); + static void HandleRequestNoOp(State *s); static bool handleIfRedirect(State *s); static void StartAccessControl(State *s); static void StartAuth(State *s); static void HandleRequestAuthorized(State *s); static void BadRequest(State *s); + static void ClientRequestTimeout(State *s); static void HandleFiltering(State *s); static void DecideCacheLookup(State *s); static void LookupSkipOpenServer(State *s); diff --git a/proxy/logstats.cc b/proxy/logstats.cc index 4922e152c00..c72ef277ad8 100644 --- a/proxy/logstats.cc +++ b/proxy/logstats.cc @@ -422,6 +422,7 @@ class UrlLru case SQUID_LOG_ERR_INVALID_REQ: case SQUID_LOG_ERR_UNKNOWN: case SQUID_LOG_ERR_READ_TIMEOUT: + case SQUID_LOG_ERR_REQUESTBUFFER_TIMEOUT: ++(l->errors); break; } @@ -491,6 +492,7 @@ class UrlLru case SQUID_LOG_ERR_INVALID_REQ: case SQUID_LOG_ERR_UNKNOWN: case SQUID_LOG_ERR_READ_TIMEOUT: + case SQUID_LOG_ERR_REQUESTBUFFER_TIMEOUT: l->errors = 1; break; } From e5fc86ea4755f9d3c2ce263fb3507f0fcad69261 Mon Sep 17 00:00:00 2001 From: Brian Geffon Date: Fri, 4 Dec 2015 13:21:37 -0800 Subject: [PATCH 2/4] TS-4042: adding example c++ plugin --- configure.ac | 1 + lib/atscppapi/examples/Makefile.am | 1 + .../examples/request_buffer/Makefile.am | 25 ++++ .../request_buffer/RequestBufferPlugin.cc | 109 ++++++++++++++++++ lib/atscppapi/src/Plugin.cc | 3 +- lib/atscppapi/src/utils_internal.cc | 4 + 6 files changed, 142 insertions(+), 1 deletion(-) create mode 100644 lib/atscppapi/examples/request_buffer/Makefile.am create mode 100644 lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc diff --git a/configure.ac b/configure.ac index 989ad91e0f4..801668410ff 100644 --- a/configure.ac +++ b/configure.ac @@ -1960,6 +1960,7 @@ AS_IF([test "x$enable_cppapi" = "xyes"], [ lib/atscppapi/examples/stat_example/Makefile lib/atscppapi/examples/timeout_example/Makefile lib/atscppapi/examples/transactionhook/Makefile + lib/atscppapi/examples/request_buffer/Makefile lib/atscppapi/examples/async_http_fetch_streaming/Makefile lib/atscppapi/src/Makefile ])]) diff --git a/lib/atscppapi/examples/Makefile.am b/lib/atscppapi/examples/Makefile.am index 9f56838cf5d..7273e5288d2 100644 --- a/lib/atscppapi/examples/Makefile.am +++ b/lib/atscppapi/examples/Makefile.am @@ -19,6 +19,7 @@ SUBDIRS = \ helloworld \ globalhook \ transactionhook \ + request_buffer \ multiple_transaction_hooks \ clientrequest \ serverresponse \ diff --git a/lib/atscppapi/examples/request_buffer/Makefile.am b/lib/atscppapi/examples/request_buffer/Makefile.am new file mode 100644 index 00000000000..d2fb445a896 --- /dev/null +++ b/lib/atscppapi/examples/request_buffer/Makefile.am @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +include $(top_srcdir)/build/plugins.mk + +AM_CPPFLAGS += -I$(top_srcdir)/lib/atscppapi/src/include -Wno-unused-variable + +target=RequestBufferPlugin.so +pkglib_LTLIBRARIES = RequestBufferPlugin.la +RequestBufferPlugin_la_SOURCES = RequestBufferPlugin.cc +RequestBufferPlugin_la_LDFLAGS = -module -avoid-version -shared -L$(top_builddir)/lib/atscppapi/src/ -latscppapi $(TS_PLUGIN_LDFLAGS) diff --git a/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc b/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc new file mode 100644 index 00000000000..053c66a21ed --- /dev/null +++ b/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc @@ -0,0 +1,109 @@ +/** + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + + +#include +#include +#include +#include + +using namespace atscppapi; +static const int MIN_BYTE_PER_SEC = 1000; + +class TimeRecord +{ +public: + TimeRecord() { clock_gettime(CLOCK_MONOTONIC, &start_time); } + const timespec + getStartTime() const + { + return start_time; + } + +private: + timespec start_time; +}; +class RequestBufferPlugin : public atscppapi::TransactionPlugin +{ +public: + RequestBufferPlugin(Transaction &transaction) : TransactionPlugin(transaction) + { + // enable the request body buffering + transaction.configIntSet(TS_CONFIG_HTTP_REQUEST_BUFFER_ENABLED, 1); + // save the start time for calculating the data rate + timeRecord = new TimeRecord(); + + TransactionPlugin::registerHook(HOOK_HTTP_REQUEST_BUFFER_READ); + TransactionPlugin::registerHook(HOOK_HTTP_REQUEST_BUFFER_READ_COMPLETE); + std::cout << "Constructed!" << std::endl; + } + virtual ~RequestBufferPlugin() + { + delete timeRecord; // cleanup + std::cout << "Destroyed!" << std::endl; + } + void + handleHttpRequestBufferRead(Transaction &transaction) + { + std::cout << "request buffer read" << transaction.getClientRequestBody().size() << std::endl; + reached_min_speed(transaction) ? transaction.resume() : transaction.error(); + } + void + handleHttpRequestBufferReadComplete(Transaction &transaction) + { + std::cout << "request buffer complete!" << transaction.getClientRequestBody().size() << std::endl; + reached_min_speed(transaction) ? transaction.resume() : transaction.error(); + } + +private: + TimeRecord *timeRecord; + bool + reached_min_speed(Transaction &transaction) + { + int64_t body_len = transaction.getClientRequestBodySize(); + timespec now_time; + clock_gettime(CLOCK_MONOTONIC, &now_time); + double time_diff_in_sec = + (now_time.tv_sec - timeRecord->getStartTime().tv_sec) + 1e-9 * (now_time.tv_nsec - timeRecord->getStartTime().tv_nsec); + std::cout << "time_diff_in_sec = " << time_diff_in_sec << ", body_len = " << body_len + << ", date_rate = " << body_len / time_diff_in_sec << std::endl; + return body_len / time_diff_in_sec >= MIN_BYTE_PER_SEC; + } +}; + +class GlobalHookPlugin : public atscppapi::GlobalPlugin +{ +public: + GlobalHookPlugin() { GlobalPlugin::registerHook(HOOK_READ_REQUEST_HEADERS); } + virtual void + handleReadRequestHeaders(Transaction &transaction) + { + std::cout << "Hello from handleReadRequestHeaders!" << std::endl; + if (transaction.getClientRequest().getMethod() == HTTP_METHOD_POST) { + transaction.addPlugin(new RequestBufferPlugin(transaction)); + } + transaction.resume(); + } +}; + +void +TSPluginInit(int argc ATSCPPAPI_UNUSED, const char *argv[] ATSCPPAPI_UNUSED) +{ + RegisterGlobalPlugin("CPP_Example_RequestBuffer", "apache", "dev@trafficserver.apache.org"); + new GlobalHookPlugin(); +} diff --git a/lib/atscppapi/src/Plugin.cc b/lib/atscppapi/src/Plugin.cc index 176a030d813..5f14ddc38b4 100644 --- a/lib/atscppapi/src/Plugin.cc +++ b/lib/atscppapi/src/Plugin.cc @@ -25,7 +25,8 @@ const std::string atscppapi::HOOK_TYPE_STRINGS[] = { std::string("HOOK_READ_REQUEST_HEADERS_PRE_REMAP"), std::string("HOOK_READ_REQUEST_HEADERS_POST_REMAP"), std::string("HOOK_SEND_REQUEST_HEADERS"), std::string("HOOK_READ_RESPONSE_HEADERS"), std::string("HOOK_SEND_RESPONSE_HEADERS"), std::string("HOOK_OS_DNS"), std::string("HOOK_READ_REQUEST_HEADERS"), std::string("HOOK_READ_CACHE_HEADERS"), - std::string("HOOK_CACHE_LOOKUP_COMPLETE"), std::string("HOOK_SELECT_ALT")}; + std::string("HOOK_CACHE_LOOKUP_COMPLETE"), std::string("HOOK_SELECT_ALT"), std::string("HOOK_HTTP_REQUEST_BUFFER_READ"), + std::string("TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK")}; void atscppapi::RegisterGlobalPlugin(std::string name, std::string vendor, std::string email) diff --git a/lib/atscppapi/src/utils_internal.cc b/lib/atscppapi/src/utils_internal.cc index 0f9955f1188..2f6dd4d9cb0 100644 --- a/lib/atscppapi/src/utils_internal.cc +++ b/lib/atscppapi/src/utils_internal.cc @@ -197,6 +197,10 @@ utils::internal::convertInternalHookToTsHook(Plugin::HookType hooktype) return TS_HTTP_CACHE_LOOKUP_COMPLETE_HOOK; case Plugin::HOOK_SELECT_ALT: return TS_HTTP_SELECT_ALT_HOOK; + case Plugin::HOOK_HTTP_REQUEST_BUFFER_READ: + return TS_HTTP_REQUEST_BUFFER_READ_HOOK; + case Plugin::HOOK_HTTP_REQUEST_BUFFER_READ_COMPLETE: + return TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK; default: assert(false); // shouldn't happen, let's catch it early break; From ba890314cde41778cc446dde0862c70a46dd05d6 Mon Sep 17 00:00:00 2001 From: Brian Geffon Date: Fri, 4 Dec 2015 13:29:41 -0800 Subject: [PATCH 3/4] TS-4042: Fix typo --- proxy/api/ts/ts.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/api/ts/ts.h b/proxy/api/ts/ts.h index 375a455357c..9b482b3f8db 100644 --- a/proxy/api/ts/ts.h +++ b/proxy/api/ts/ts.h @@ -2397,7 +2397,7 @@ tsapi const char *TSHttpEventNameLookup(TSEvent event); @return char * that contains the entire buffered request body, this must be freed by the caller using TSFree! */ -tsapi char *TSHttpTxnGetClientPequestBody(TSHttpTxn txnp, int *length); +tsapi char *TSHttpTxnGetClientRequestBody(TSHttpTxn txnp, int *length); /** Get the IOBufferReader (if applicable) from a client request. It's easy to From 295ef61ef5094da2fc222d87fadf4f93b3a134b4 Mon Sep 17 00:00:00 2001 From: Zizhong Zhang Date: Sun, 6 Dec 2015 22:17:44 -0800 Subject: [PATCH 4/4] TS-4042: Fixing support for chunked encoding and adding tests --- ci/tsqa/tests/helpers.py | 1 + ci/tsqa/tests/test_request_buffer.py | 222 ++++++++++++++++++ .../request_buffer/RequestBufferPlugin.cc | 2 +- plugins/request_buffer/request_buffer.cc | 2 +- proxy/http/HttpSM.cc | 49 +++- proxy/http/HttpSM.h | 2 +- proxy/http/HttpTransact.cc | 19 +- 7 files changed, 278 insertions(+), 19 deletions(-) create mode 100644 ci/tsqa/tests/test_request_buffer.py diff --git a/ci/tsqa/tests/helpers.py b/ci/tsqa/tests/helpers.py index af1104b34e2..cb029f456ba 100644 --- a/ci/tsqa/tests/helpers.py +++ b/ci/tsqa/tests/helpers.py @@ -46,6 +46,7 @@ def getEnv(cls): ef = tsqa.environment.EnvironmentFactory(SOURCE_DIR, os.path.join(TMP_DIR, 'base_envs'), default_configure={'enable-experimental-plugins': None, + 'enable-cppapi' : None, 'enable-example-plugins': None, 'enable-test-tools': None, 'disable-dependency-tracking': None, diff --git a/ci/tsqa/tests/test_request_buffer.py b/ci/tsqa/tests/test_request_buffer.py new file mode 100644 index 00000000000..a2abb2e14e3 --- /dev/null +++ b/ci/tsqa/tests/test_request_buffer.py @@ -0,0 +1,222 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import socket +import tsqa +import tsqa.test_cases +import tsqa.utils +import thread +import SocketServer +import time +import requests +unittest = tsqa.utils.import_unittest() +import helpers +import threading +import logging + +log = logging.getLogger(__name__) + +class AtomicCounter: + def __init__(self): + self.lock = threading.Lock() + self.counter = 0 + + def inc(self, increment = 1): + self.lock.acquire() + self.counter += increment + self.lock.release() + + def get(self): + value = 0 + self.lock.acquire() + value = self.counter + self.lock.release() + return value + + def reset(self): + self.lock.acquire() + self.counter = 0 + self.lock.release() + +data_receive_times = AtomicCounter() + +class EchoReceivedLengthHandler(SocketServer.BaseRequestHandler): + """ + A subclass of RequestHandler that sends back how many bytes it has received + """ + + def handle(self): + chunked = False + global data_receive_times + + while True: + data = self.request.recv(65536) + log.info("server receive data length: %d" % (len(data))) + if len(data) > 0 and len(data) < 200: + log.info("server receive data: %s" % (data)) + if not data: + log.info('Client disconnected') + break + if 'Transfer-Encoding: chunked' in data: + chunked = True + + data_receive_times.inc(); + if not chunked or '0\r\n\r\n' in data: + log.info('Sending data back to the client') + resp_str = str(len(data)) + resp = ('HTTP/1.1 200 OK\r\n' + 'Content-Length: %d\r\n' + 'Content-Type: text/html; charset=UTF-8\r\n' + 'Connection: keep-alive\r\n' + '\r\n%s' %( + len(resp_str), + resp_str + )) + self.request.sendall(resp) + + if chunked and '0\r\n\r\n' in data: + log.info('Client disconnected') + break + + chunked = False + +class TestRequestBuffer(helpers.EnvironmentCase): + @classmethod + def setUpEnv(cls, env): + cls.traffic_server_host = '127.0.0.1' + cls.traffic_server_port = int(cls.configs['records.config']['CONFIG']['proxy.config.http.server_ports']) + cls.configs['records.config']['CONFIG']['proxy.config.http.chunking_enabled'] = 1 + cls.configs['records.config']['CONFIG']['proxy.config.diags.debug.enabled'] = 1 + cls.configs['records.config']['CONFIG']['proxy.config.diags.debug.tags'] = 'http.*' + # create a socket server + cls.port = tsqa.utils.bind_unused_port()[1] + cls.socket_server = tsqa.endpoint.SocketServerDaemon(EchoReceivedLengthHandler, port=cls.port) + cls.socket_server.start() + cls.socket_server.ready.wait() + log.info(cls.environment.layout.logdir) + log.info("socket_server_port = %d, cls.traffic_server_port= %d" % (cls.socket_server.port, cls.traffic_server_port)) + cls.configs['remap.config'].add_line('map / http://127.0.0.1:{0}'.format(cls.socket_server.port)) + cls.configs['plugin.config'].add_line('%s/RequestBufferPlugin.so' %(cls.environment.layout.plugindir)) + + def test_request_buffer_content_length_0(self): + """ + test for sending post header with content length 0 + """ + global data_receive_times + data_receive_times.reset() + small_post_headers = {'Content-Length' : 0} + ret = requests.post( + 'http://127.0.0.1:%d' % (self.traffic_server_port), + headers = small_post_headers + ) + + self.assertEqual(data_receive_times.get(), 1) + self.assertEqual(ret.status_code, 200) + + + def test_request_buffer_content_length_small(self): + """ + test for sending post request all in once + """ + global data_receive_times + data_receive_times.reset() + + req = 'POST / HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length:2\r\nHost: 127.0.0.1\r\n\r\nab' + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.connect((self.traffic_server_host, self.traffic_server_port)) + conn.setblocking(1) + conn.send(req) + resp = conn.recv(4096) + + # data_receive_times will be 1 or 2. Request header and body may send separately + self.assertEqual(data_receive_times.get() == 1 or data_receive_times.get() == 2, True) + self.assertIn('HTTP/1.1 200 OK', resp) + + def test_request_buffer_content_length_large(self): + """ + test for sending large post + """ + global data_receive_times + data_receive_times.reset() + + str_length = 2000 + send_times = 30 + content_str = '' + for i in xrange(0, str_length): + content_str = content_str + 'a' + + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.connect((self.traffic_server_host, self.traffic_server_port)) + + hdr = 'POST / HTTP/1.1\r\nConnection: keep-alive\r\nContent-Length:%d\r\nHost: 127.0.0.1\r\n\r\n' % (str_length * send_times) + conn.setblocking(1) + conn.send(hdr) + for i in xrange(0, send_times): + time.sleep(0.1) + self.assertEqual(data_receive_times.get(), 0) + conn.send(content_str) + + log.info("recv_times = %d, send_times = %d" % (data_receive_times.get(), send_times)) + resp = conn.recv(4096) + self.assertIn('HTTP/1.1 200 OK', resp) + + def test_request_buffer_chunked_small(self): + """ + test for sending post request with chunked encoding all in once + """ + global data_receive_times + data_receive_times.reset() + + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.connect((self.traffic_server_host, self.traffic_server_port)) + req = 'POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\nConnection: keep-alive\r\n\r\n2\r\n12\r\n0\r\n\r\n' + conn.setblocking(1) + conn.send(req) + + resp = conn.recv(4096) + + log.info("recv_times = %d, send_times = 1" % (data_receive_times.get())) + # data_receive_times will be 1 or 2. Request header and body may send separately + self.assertEqual(data_receive_times.get() == 1 or data_receive_times.get() == 2, True) + self.assertIn('HTTP/1.1 200 OK', resp) + + def test_request_buffer_chunked_large(self): + """ + test for sending large post data with chunked encoding + """ + global data_receive_times + data_receive_times.reset() + str_length = 2000 + chunked_size = hex(str_length)[2:] + send_times = 30 + content_str = '' + for i in xrange(0, str_length): + content_str = content_str + 'a' + + conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + conn.connect((self.traffic_server_host, self.traffic_server_port)) + hdr = 'POST / HTTP/1.1\r\nTransfer-Encoding: chunked\r\nConnection: keep-alive\r\n\r\n' + conn.setblocking(1) + conn.send(hdr) + + for i in xrange(0, send_times): + time.sleep(0.1) + self.assertEqual(data_receive_times.get(), 0) + conn.send('%s\r\n%s\r\n' %(chunked_size, content_str)) + + conn.send('0\r\n\r\n') + log.info("recv_times = %d, send_times = %d" % (data_receive_times.get(), send_times)) + resp = conn.recv(4096) + self.assertIn('HTTP/1.1 200 OK', resp) diff --git a/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc b/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc index 053c66a21ed..d1b513b2140 100644 --- a/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc +++ b/lib/atscppapi/examples/request_buffer/RequestBufferPlugin.cc @@ -67,7 +67,7 @@ class RequestBufferPlugin : public atscppapi::TransactionPlugin handleHttpRequestBufferReadComplete(Transaction &transaction) { std::cout << "request buffer complete!" << transaction.getClientRequestBody().size() << std::endl; - reached_min_speed(transaction) ? transaction.resume() : transaction.error(); + transaction.resume(); } private: diff --git a/plugins/request_buffer/request_buffer.cc b/plugins/request_buffer/request_buffer.cc index e4c70093d12..3ca44e415e0 100644 --- a/plugins/request_buffer/request_buffer.cc +++ b/plugins/request_buffer/request_buffer.cc @@ -66,7 +66,7 @@ hook_handler(TSCont contp, TSEvent event, void *edata) TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, TSContCreate(hook_handler, TSMutexCreate())); } else if (event == TS_EVENT_HTTP_REQUEST_BUFFER_READ || event == TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE) { int64_t ret_len = TSHttpTxnClientReqBodyBytesGet(txnp); - if (!reached_min_speed(txnp, ret_len)) { + if (event == TS_EVENT_HTTP_REQUEST_BUFFER_READ && !reached_min_speed(txnp, ret_len)) { TSError("[hook_handler] Error : reached_min_speed checking failed\n"); TSHttpTxnReenable(txnp, TS_EVENT_ERROR); return 0; diff --git a/proxy/http/HttpSM.cc b/proxy/http/HttpSM.cc index c93e864b6b2..7c14ca6f881 100644 --- a/proxy/http/HttpSM.cc +++ b/proxy/http/HttpSM.cc @@ -310,6 +310,8 @@ HttpSM::cleanup() magic = HTTP_SM_MAGIC_DEAD; debug_on = false; request_fully_received = false; + + // need not to call chunked_handler.clear() because the ACTION_PASSTHRU mode is using } void @@ -836,6 +838,17 @@ HttpSM::state_drain_client_request_body(int event, void *data) void HttpSM::wait_for_full_body() { + if (t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING) { + ua_buffer_reader->mbuf->size_index = BUFFER_SIZE_INDEX_32K; + // if you have less than 4k remaining in your write avail add a new buffer block that's 32kb + if (ua_buffer_reader->mbuf->block_write_avail() <= BUFFER_SIZE_FOR_INDEX(BUFFER_SIZE_INDEX_4K)) { + ua_buffer_reader->mbuf->add_block(); + } + ua_entry->vc_handler = &HttpSM::state_wait_for_full_body; + ua_entry->read_vio = ua_entry->vc->do_io_read(this, INT64_MAX, ua_buffer_reader->mbuf); + + return; + } int64_t request_bytes = t_state.hdr_info.request_content_length; // handy save int64_t avail = ua_buffer_reader->read_avail(); @@ -875,16 +888,21 @@ HttpSM::state_wait_for_full_body(int event, void *data) ink_assert(ua_entry->read_vio == (VIO *)data); ink_assert(ua_entry->vc == ua_session); + int64_t avail_for_write = ua_buffer_reader->mbuf->block_write_avail(); int64_t avail = ua_buffer_reader->read_avail(); int64_t left = t_state.hdr_info.request_content_length - client_request_body_bytes; - int64_t avail_for_write = ua_buffer_reader->mbuf->block_write_avail(); - - DebugSM("http_request_wait", "[%" PRId64 "] event %d, data %p, current buffer avail: %" PRId64 ", remaining: %" PRId64 - ", block available for write: %" PRId64 ", block count: %d", - sm_id, event, data, avail, left, avail_for_write, ua_buffer_reader->block_count()); - if (event == VC_EVENT_READ_READY && avail_for_write <= left) { - ua_buffer_reader->mbuf->add_block(); - DebugSM("http_request_wait", "[%" PRId64 "] adding block for request body", sm_id); + if (t_state.client_info.transfer_encoding != HttpTransact::CHUNKED_ENCODING) { + DebugSM("http_request_wait", "[%" PRId64 "] event %d, data %p, current buffer avail: %" PRId64 ", remaining: %" PRId64 + ", block available for write: %" PRId64 ", block count: %d", + sm_id, event, data, avail, left, avail_for_write, ua_buffer_reader->block_count()); + if (event == VC_EVENT_READ_READY && avail_for_write <= left) { + ua_buffer_reader->mbuf->add_block(); + DebugSM("http_request_wait", "[%" PRId64 "] adding block for request body", sm_id); + } + } else { + if (event == VC_EVENT_READ_READY && avail_for_write <= BUFFER_SIZE_FOR_INDEX(BUFFER_SIZE_INDEX_4K)) { + ua_buffer_reader->mbuf->add_block(); + } } switch (event) { @@ -906,6 +924,16 @@ HttpSM::state_wait_for_full_body(int event, void *data) break; } case VC_EVENT_READ_READY: { + if (t_state.client_info.transfer_encoding == HttpTransact::CHUNKED_ENCODING) { + bool done = chunked_handler.process_chunked_content(); + DebugSM("http_request_wait", "[%" PRId64 "] VC_EVENT_READ_READY: chunked_handler.state = [%d]", sm_id, chunked_handler.state); + if (done) { + ua_entry->read_vio->done(); + state_wait_for_full_body(VC_EVENT_READ_COMPLETE, data); + break; + } + } + client_request_body_bytes = avail; // since we're never consuming. DebugSM("http_request_wait", "[%" PRId64 "] VC_EVENT_READ_READY: Post wait for full body: received: %" PRId64 " bytes expecting %" PRId64 " dest buffer=%p", @@ -929,8 +957,9 @@ HttpSM::state_wait_for_full_body(int event, void *data) client_request_body_bytes = avail; // since we're never consuming. - ink_assert(client_request_body_bytes == t_state.hdr_info.request_content_length); - + if (t_state.client_info.transfer_encoding != HttpTransact::CHUNKED_ENCODING) { + ink_assert(client_request_body_bytes == t_state.hdr_info.request_content_length); + } // At this point if an expect: 100-continue header existed it can be removed // as we are going to send the full request body to the origin now int len = 0; diff --git a/proxy/http/HttpSM.h b/proxy/http/HttpSM.h index 5398a30a712..a4f1e40b7a1 100644 --- a/proxy/http/HttpSM.h +++ b/proxy/http/HttpSM.h @@ -278,7 +278,7 @@ class HttpSM : public Continuation bool debug_on; // Transaction specific debug flag bool request_fully_received; - bool request_fully_received_invoked; + ChunkedHandler chunked_handler; // Tunneling request to plugin HttpPluginTunnel_t plugin_tunnel_type; diff --git a/proxy/http/HttpTransact.cc b/proxy/http/HttpTransact.cc index ad332c0e0dd..d92dc7d5bbe 100644 --- a/proxy/http/HttpTransact.cc +++ b/proxy/http/HttpTransact.cc @@ -1250,12 +1250,20 @@ HttpTransact::HandleRequest(State *s) // client keep-alive, cache action, etc. initialize_state_variables_from_request(s, &s->hdr_info.client_request); } - - if (s->txn_conf->request_buffer_enabled && s->hdr_info.request_content_length > 0) { + if (s->txn_conf->request_buffer_enabled && + (s->hdr_info.request_content_length > 0 || s->client_info.transfer_encoding == CHUNKED_ENCODING)) { // Let's check if we've already fully received the request. - if (s->hdr_info.request_content_length <= s->state_machine->ua_buffer_reader->read_avail()) { - s->state_machine->request_fully_received = true; + if (s->client_info.transfer_encoding == CHUNKED_ENCODING) { + if (s->state_machine->chunked_handler.chunked_reader == NULL) { + s->state_machine->chunked_handler.init_by_action(s->state_machine->ua_buffer_reader, ChunkedHandler::ACTION_PASSTHRU); + s->state_machine->chunked_handler.state = s->state_machine->chunked_handler.CHUNK_READ_SIZE; + } + s->state_machine->request_fully_received = s->state_machine->chunked_handler.process_chunked_content(); + } else { + s->state_machine->request_fully_received = + s->hdr_info.request_content_length <= s->state_machine->ua_buffer_reader->read_avail(); } + s->state_machine->client_request_body_bytes = s->state_machine->ua_buffer_reader->read_avail(); int len = 0; const char *expect = s->hdr_info.client_request.value_get(MIME_FIELD_EXPECT, MIME_LEN_EXPECT, &len); @@ -1271,13 +1279,12 @@ HttpTransact::HandleRequest(State *s) // dispatching plugins need a place to reenable to, so we’ll reenable to a noop and // activity on the VConn will ensure the transaction doesn’t get lost. TRANSACT_RETURN(SM_ACTION_WAIT_FOR_FULL_BODY, HttpTransact::HandleRequestNoOp); - return; } else { // We set the return point to NoOp so we don’t recursively call into HandleRequest and // once all plugins have been fully dispatched we’ll continue exectuing through handle request (which // at some point later sets the trasact return point). So if a plugin decides not to reenable, when it finally // does reenable it will continue executing at the next state which will be determined later in this method. - s->transact_return_point = HttpTransact::HandleRequestNoOp; + TRANSACT_SETUP_RETURN(SM_ACTION_WAIT_FOR_FULL_BODY, HttpTransact::HandleRequestNoOp); APIHook *hook = s->state_machine->txn_hook_get(TS_HTTP_REQUEST_BUFFER_READ_COMPLETE_HOOK); while (hook) { hook->invoke(TS_EVENT_HTTP_REQUEST_BUFFER_COMPLETE, s->state_machine);