From cb3f15e1f5d42fc629040c442b0942d5305b35ab Mon Sep 17 00:00:00 2001 From: Brian Olsen Date: Wed, 23 Sep 2020 23:47:29 +0000 Subject: [PATCH] default to throttling and subsequently simplify the transfer code fix throttle check bug for the push side --- plugins/experimental/slice/Config.cc | 5 -- plugins/experimental/slice/Config.h | 3 +- plugins/experimental/slice/client.cc | 30 ++++--- plugins/experimental/slice/server.cc | 39 +++++---- plugins/experimental/slice/slice.h | 4 +- plugins/experimental/slice/transfer.cc | 106 +++++++++---------------- plugins/experimental/slice/util.cc | 1 + 7 files changed, 80 insertions(+), 108 deletions(-) diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc index 54df2f1428b..0408389b417 100644 --- a/plugins/experimental/slice/Config.cc +++ b/plugins/experimental/slice/Config.cc @@ -112,7 +112,6 @@ Config::fromArgs(int const argc, char const *const argv[]) {const_cast("exclude-regex"), required_argument, nullptr, 'e'}, {const_cast("include-regex"), required_argument, nullptr, 'i'}, {const_cast("ref-relative"), no_argument, nullptr, 'l'}, - {const_cast("throttle"), no_argument, nullptr, 'o'}, {const_cast("pace-errorlog"), required_argument, nullptr, 'p'}, {const_cast("remap-host"), required_argument, nullptr, 'r'}, {const_cast("blockbytes-test"), required_argument, nullptr, 't'}, @@ -182,10 +181,6 @@ Config::fromArgs(int const argc, char const *const argv[]) m_reftype = RefType::Relative; DEBUG_LOG("Reference slice relative to request (not slice block 0)"); } break; - case 'o': { - m_throttle = true; - DEBUG_LOG("Enabling internal block throttling"); - } break; case 'p': { int const secsread = atoi(optarg); if (0 < secsread) { diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h index 1f59511e8c0..4a5b3d6c0f3 100644 --- a/plugins/experimental/slice/Config.h +++ b/plugins/experimental/slice/Config.h @@ -41,8 +41,7 @@ struct Config { RegexType m_regex_type{None}; pcre *m_regex{nullptr}; pcre_extra *m_regex_extra{nullptr}; - bool m_throttle{false}; // internal block throttling - int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s + int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s enum RefType { First, Relative }; RefType m_reftype{First}; // reference slice is relative to request diff --git a/plugins/experimental/slice/client.cc b/plugins/experimental/slice/client.cc index d5cbd52f132..c1a9b59ef94 100644 --- a/plugins/experimental/slice/client.cc +++ b/plugins/experimental/slice/client.cc @@ -140,22 +140,20 @@ handle_client_resp(TSCont contp, TSEvent event, Data *const data) } break; case BlockState::Pending: { - bool start_next_block = true; - - if (data->m_config->m_throttle) { - TSVIO const output_vio = data->m_dnstream.m_write.m_vio; - int64_t const output_done = TSVIONDoneGet(output_vio); - int64_t const output_sent = data->m_bytessent; - int64_t const threshout = data->m_config->m_blockbytes; - - if (threshout < (output_sent - output_done)) { - start_next_block = false; - DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, (output_sent - output_done)); - } - - if (start_next_block) { - DEBUG_LOG("Starting next block request"); - request_block(contp, data); + // throttle + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + int64_t const output_done = TSVIONDoneGet(output_vio); + int64_t const output_sent = data->m_bytessent; + int64_t const threshout = data->m_config->m_blockbytes; + int64_t const buffered = output_sent - output_done; + + if (threshout < buffered) { + DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, buffered); + } else { + DEBUG_LOG("Starting next block request"); + if (!request_block(contp, data)) { + data->m_blockstate = BlockState::Fail; + return; } } } break; diff --git a/plugins/experimental/slice/server.cc b/plugins/experimental/slice/server.cc index bdc37daf049..089b1fcf454 100644 --- a/plugins/experimental/slice/server.cc +++ b/plugins/experimental/slice/server.cc @@ -549,13 +549,16 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) // header may have been successfully parsed but with caveats switch (data->m_blockstate) { // request new version of current internal slice - case BlockState::PendingInt: { - request_block(contp, data); - return; - } break; - // request new version of reference slice + case BlockState::PendingInt: case BlockState::PendingRef: { - request_block(contp, data); + if (!request_block(contp, data)) { + data->m_blockstate = BlockState::Fail; + if (data->m_dnstream.m_write.isOpen()) { + TSVIOReenable(data->m_dnstream.m_write.m_vio); + } else { + shutdown(contp, data); + } + } return; } break; case BlockState::ActiveRef: { @@ -644,29 +647,33 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data) // Don't immediately request the next slice if the client // isn't keeping up - bool start_next_block = true; + if (data->m_dnstream.m_write.isOpen()) { + bool start_next_block = true; - // throttle condition - if (data->m_config->m_throttle && data->m_dnstream.m_read.isOpen()) { + // check throttle condition TSVIO const output_vio = data->m_dnstream.m_write.m_vio; int64_t const output_done = TSVIONDoneGet(output_vio); int64_t const output_sent = data->m_bytessent; int64_t const threshout = data->m_config->m_blockbytes; + int64_t const buffered = output_sent - output_done; - if (threshout < (output_sent - output_done)) { + if (threshout < buffered) { start_next_block = false; - DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, (output_sent - output_done)); + DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, buffered); } - } - if (start_next_block) { - request_block(contp, data); + if (start_next_block) { + if (!request_block(contp, data)) { + data->m_blockstate = BlockState::Fail; + abort(contp, data); + return; + } + } } - } else { data->m_upstream.close(); data->m_blockstate = BlockState::Done; - if (!data->m_dnstream.m_read.isOpen()) { + if (!data->m_dnstream.m_write.isOpen()) { shutdown(contp, data); } } diff --git a/plugins/experimental/slice/slice.h b/plugins/experimental/slice/slice.h index bd58818d033..ed86dd6771d 100644 --- a/plugins/experimental/slice/slice.h +++ b/plugins/experimental/slice/slice.h @@ -39,8 +39,8 @@ constexpr std::string_view X_CRR_IMS_HEADER = {"X-Crr-Ims"}; #define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__) #define DEBUG_LOG(fmt, ...) TSDebug(PLUGIN_NAME, "[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__) -#define ERROR_LOG(fmt, ...) \ - TSError("[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \ +#define ERROR_LOG(fmt, ...) \ + TSError("[%s/%s:% 4d] %s(): " fmt, PLUGIN_NAME, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \ TSDebug(PLUGIN_NAME, "[%s:%04d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__) #else diff --git a/plugins/experimental/slice/transfer.cc b/plugins/experimental/slice/transfer.cc index f83b0882c0b..ce31b754031 100644 --- a/plugins/experimental/slice/transfer.cc +++ b/plugins/experimental/slice/transfer.cc @@ -33,56 +33,40 @@ transfer_content_bytes(Data *const data) int64_t consumed = 0; // input vio bytes visited int64_t copied = 0; // output bytes transferred - bool const canWrite = data->m_dnstream.m_write.isOpen(); - bool done = false; - - TSIOBufferBlock block = TSIOBufferReaderStart(reader); - - while (!done && nullptr != block) { - int64_t bavail = TSIOBufferBlockReadAvail(block, reader); - - if (0 == bavail) { - block = TSIOBufferBlockNext(block); - } else { - int64_t toconsume = 0; - - if (canWrite) { - int64_t const toskip = std::min(data->m_blockskip, bavail); - if (0 < toskip) { // before bytes - toconsume = toskip; - data->m_blockskip -= toskip; - } else { - int64_t const bytesleft = data->m_bytestosend - data->m_bytessent; - if (0 < bytesleft) { // transfer bytes - int64_t const tocopy = std::min(bavail, bytesleft); - int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0); - - done = (nbytes < tocopy); // output buffer stuffed - - copied += nbytes; - data->m_bytessent += nbytes; - - toconsume = nbytes; - } else { // after bytes - toconsume = bavail; - } - } - } else { // drain - toconsume = bavail; - } + int64_t avail = TSIOBufferReaderAvail(reader); + if (0 < avail) { + int64_t const toskip = std::min(data->m_blockskip, avail); + if (0 < toskip) { + TSIOBufferReaderConsume(reader, toskip); + data->m_blockskip -= toskip; + avail -= toskip; + consumed += toskip; + } + } - if (0 < toconsume) { - if (bavail == toconsume) { - block = TSIOBufferBlockNext(block); - } - TSIOBufferReaderConsume(reader, toconsume); - consumed += toconsume; - } + // bool const canWrite = data->m_dnstream.m_write.isOpen(); + + if (0 < avail) { + int64_t const bytesleft = data->m_bytestosend - data->m_bytessent; + int64_t const tocopy = std::min(avail, bytesleft); + if (0 < tocopy) { + copied = TSIOBufferCopy(output_buf, reader, tocopy, 0); + + data->m_bytessent += copied; + TSIOBufferReaderConsume(reader, copied); + + avail -= copied; + consumed += copied; } } - // tell output more data is available - if (0 < copied) { + // if hit fulfillment start bulk consuming + if (0 < avail && data->m_bytestosend <= data->m_bytessent) { + TSIOBufferReaderConsume(reader, avail); + consumed += avail; + } + + if (0 < copied && nullptr != output_vio) { TSVIOReenable(output_vio); } @@ -112,35 +96,23 @@ transfer_all_bytes(Data *const data) TSIOBufferReader const reader = data->m_upstream.m_read.m_reader; TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf; - bool done = false; + int64_t const read_avail = TSIOBufferReaderAvail(reader); - TSIOBufferBlock block = TSIOBufferReaderStart(reader); + if (0 < read_avail) { + int64_t const copied = TSIOBufferCopy(output_buf, reader, read_avail, 0); - while (!done && nullptr != block) { - int64_t bavail = TSIOBufferBlockReadAvail(block, reader); + if (0 < copied) { + TSIOBufferReaderConsume(reader, copied); + consumed = copied; - if (0 == bavail) { - block = TSIOBufferBlockNext(block); - } else { - int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0); - done = nbytes < bavail; // output buffer is full - - if (0 < nbytes) { - if (bavail == nbytes) { - block = TSIOBufferBlockNext(block); - } - TSIOBufferReaderConsume(reader, nbytes); - consumed += nbytes; + TSVIO const output_vio = data->m_dnstream.m_write.m_vio; + if (nullptr != output_vio) { + TSVIOReenable(output_vio); } } } if (0 < consumed) { - TSVIO const output_vio = data->m_dnstream.m_write.m_vio; - if (nullptr != output_vio) { - TSVIOReenable(output_vio); - } - TSVIO const input_vio = data->m_upstream.m_read.m_vio; if (nullptr != input_vio) { TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed); diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc index 04516d4c12a..15c68b4f95f 100644 --- a/plugins/experimental/slice/util.cc +++ b/plugins/experimental/slice/util.cc @@ -128,6 +128,7 @@ request_block(TSCont contp, Data *const data) break; default: ERROR_LOG("Invalid blockstate"); + return false; break; }