Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions plugins/experimental/slice/Config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ Config::fromArgs(int const argc, char const *const argv[])
{const_cast<char *>("exclude-regex"), required_argument, nullptr, 'e'},
{const_cast<char *>("include-regex"), required_argument, nullptr, 'i'},
{const_cast<char *>("ref-relative"), no_argument, nullptr, 'l'},
{const_cast<char *>("throttle"), no_argument, nullptr, 'o'},
{const_cast<char *>("pace-errorlog"), required_argument, nullptr, 'p'},
{const_cast<char *>("remap-host"), required_argument, nullptr, 'r'},
{const_cast<char *>("blockbytes-test"), required_argument, nullptr, 't'},
Expand Down Expand Up @@ -182,10 +181,6 @@ Config::fromArgs(int const argc, char const *const argv[])
m_reftype = RefType::Relative;
DEBUG_LOG("Reference slice relative to request (not slice block 0)");
} break;
case 'o': {
m_throttle = true;
DEBUG_LOG("Enabling internal block throttling");
} break;
case 'p': {
int const secsread = atoi(optarg);
if (0 < secsread) {
Expand Down
3 changes: 1 addition & 2 deletions plugins/experimental/slice/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ struct Config {
RegexType m_regex_type{None};
pcre *m_regex{nullptr};
pcre_extra *m_regex_extra{nullptr};
bool m_throttle{false}; // internal block throttling
int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s
enum RefType { First, Relative };
RefType m_reftype{First}; // reference slice is relative to request

Expand Down
30 changes: 14 additions & 16 deletions plugins/experimental/slice/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -140,22 +140,20 @@ handle_client_resp(TSCont contp, TSEvent event, Data *const data)
} break;

case BlockState::Pending: {
bool start_next_block = true;

if (data->m_config->m_throttle) {
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
int64_t const output_done = TSVIONDoneGet(output_vio);
int64_t const output_sent = data->m_bytessent;
int64_t const threshout = data->m_config->m_blockbytes;

if (threshout < (output_sent - output_done)) {
start_next_block = false;
DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, (output_sent - output_done));
}

if (start_next_block) {
DEBUG_LOG("Starting next block request");
request_block(contp, data);
// throttle
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
int64_t const output_done = TSVIONDoneGet(output_vio);
int64_t const output_sent = data->m_bytessent;
int64_t const threshout = data->m_config->m_blockbytes;
int64_t const buffered = output_sent - output_done;

if (threshout < buffered) {
DEBUG_LOG("%p handle_client_resp: throttling %" PRId64, data, buffered);
} else {
DEBUG_LOG("Starting next block request");
if (!request_block(contp, data)) {
data->m_blockstate = BlockState::Fail;
return;
}
}
} break;
Expand Down
39 changes: 23 additions & 16 deletions plugins/experimental/slice/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,16 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
// header may have been successfully parsed but with caveats
switch (data->m_blockstate) {
// request new version of current internal slice
case BlockState::PendingInt: {
request_block(contp, data);
return;
} break;
// request new version of reference slice
case BlockState::PendingInt:
case BlockState::PendingRef: {
request_block(contp, data);
if (!request_block(contp, data)) {
data->m_blockstate = BlockState::Fail;
if (data->m_dnstream.m_write.isOpen()) {
TSVIOReenable(data->m_dnstream.m_write.m_vio);
} else {
shutdown(contp, data);
}
}
return;
} break;
case BlockState::ActiveRef: {
Expand Down Expand Up @@ -644,29 +647,33 @@ handle_server_resp(TSCont contp, TSEvent event, Data *const data)
// Don't immediately request the next slice if the client
// isn't keeping up

bool start_next_block = true;
if (data->m_dnstream.m_write.isOpen()) {
bool start_next_block = true;

// throttle condition
if (data->m_config->m_throttle && data->m_dnstream.m_read.isOpen()) {
// check throttle condition
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
int64_t const output_done = TSVIONDoneGet(output_vio);
int64_t const output_sent = data->m_bytessent;
int64_t const threshout = data->m_config->m_blockbytes;
int64_t const buffered = output_sent - output_done;

if (threshout < (output_sent - output_done)) {
if (threshout < buffered) {
start_next_block = false;
DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, (output_sent - output_done));
DEBUG_LOG("%p handle_server_resp: throttling %" PRId64, data, buffered);
}
}

if (start_next_block) {
request_block(contp, data);
if (start_next_block) {
if (!request_block(contp, data)) {
data->m_blockstate = BlockState::Fail;
abort(contp, data);
return;
}
}
}

} else {
data->m_upstream.close();
data->m_blockstate = BlockState::Done;
if (!data->m_dnstream.m_read.isOpen()) {
if (!data->m_dnstream.m_write.isOpen()) {
shutdown(contp, data);
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/experimental/slice/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ constexpr std::string_view X_CRR_IMS_HEADER = {"X-Crr-Ims"};
#define __FILENAME__ (strrchr(__FILE__, '/') ? strrchr(__FILE__, '/') + 1 : __FILE__)
#define DEBUG_LOG(fmt, ...) TSDebug(PLUGIN_NAME, "[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)

#define ERROR_LOG(fmt, ...) \
TSError("[%s:% 4d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
#define ERROR_LOG(fmt, ...) \
TSError("[%s/%s:% 4d] %s(): " fmt, PLUGIN_NAME, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__); \
TSDebug(PLUGIN_NAME, "[%s:%04d] %s(): " fmt, __FILENAME__, __LINE__, __func__, ##__VA_ARGS__)

#else
Expand Down
106 changes: 39 additions & 67 deletions plugins/experimental/slice/transfer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,56 +33,40 @@ transfer_content_bytes(Data *const data)
int64_t consumed = 0; // input vio bytes visited
int64_t copied = 0; // output bytes transferred

bool const canWrite = data->m_dnstream.m_write.isOpen();
bool done = false;

TSIOBufferBlock block = TSIOBufferReaderStart(reader);

while (!done && nullptr != block) {
int64_t bavail = TSIOBufferBlockReadAvail(block, reader);

if (0 == bavail) {
block = TSIOBufferBlockNext(block);
} else {
int64_t toconsume = 0;

if (canWrite) {
int64_t const toskip = std::min(data->m_blockskip, bavail);
if (0 < toskip) { // before bytes
toconsume = toskip;
data->m_blockskip -= toskip;
} else {
int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
if (0 < bytesleft) { // transfer bytes
int64_t const tocopy = std::min(bavail, bytesleft);
int64_t const nbytes = TSIOBufferCopy(output_buf, reader, tocopy, 0);

done = (nbytes < tocopy); // output buffer stuffed

copied += nbytes;
data->m_bytessent += nbytes;

toconsume = nbytes;
} else { // after bytes
toconsume = bavail;
}
}
} else { // drain
toconsume = bavail;
}
int64_t avail = TSIOBufferReaderAvail(reader);
if (0 < avail) {
int64_t const toskip = std::min(data->m_blockskip, avail);
if (0 < toskip) {
TSIOBufferReaderConsume(reader, toskip);
data->m_blockskip -= toskip;
avail -= toskip;
consumed += toskip;
}
}

if (0 < toconsume) {
if (bavail == toconsume) {
block = TSIOBufferBlockNext(block);
}
TSIOBufferReaderConsume(reader, toconsume);
consumed += toconsume;
}
// bool const canWrite = data->m_dnstream.m_write.isOpen();

if (0 < avail) {
int64_t const bytesleft = data->m_bytestosend - data->m_bytessent;
int64_t const tocopy = std::min(avail, bytesleft);
if (0 < tocopy) {
copied = TSIOBufferCopy(output_buf, reader, tocopy, 0);

data->m_bytessent += copied;
TSIOBufferReaderConsume(reader, copied);

avail -= copied;
consumed += copied;
}
}

// tell output more data is available
if (0 < copied) {
// if hit fulfillment start bulk consuming
if (0 < avail && data->m_bytestosend <= data->m_bytessent) {
TSIOBufferReaderConsume(reader, avail);
consumed += avail;
}

if (0 < copied && nullptr != output_vio) {
TSVIOReenable(output_vio);
}

Expand Down Expand Up @@ -112,35 +96,23 @@ transfer_all_bytes(Data *const data)
TSIOBufferReader const reader = data->m_upstream.m_read.m_reader;
TSIOBuffer const output_buf = data->m_dnstream.m_write.m_iobuf;

bool done = false;
int64_t const read_avail = TSIOBufferReaderAvail(reader);

TSIOBufferBlock block = TSIOBufferReaderStart(reader);
if (0 < read_avail) {
int64_t const copied = TSIOBufferCopy(output_buf, reader, read_avail, 0);

while (!done && nullptr != block) {
int64_t bavail = TSIOBufferBlockReadAvail(block, reader);
if (0 < copied) {
TSIOBufferReaderConsume(reader, copied);
consumed = copied;

if (0 == bavail) {
block = TSIOBufferBlockNext(block);
} else {
int64_t const nbytes = TSIOBufferCopy(output_buf, reader, bavail, 0);
done = nbytes < bavail; // output buffer is full

if (0 < nbytes) {
if (bavail == nbytes) {
block = TSIOBufferBlockNext(block);
}
TSIOBufferReaderConsume(reader, nbytes);
consumed += nbytes;
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
if (nullptr != output_vio) {
TSVIOReenable(output_vio);
}
}
}

if (0 < consumed) {
TSVIO const output_vio = data->m_dnstream.m_write.m_vio;
if (nullptr != output_vio) {
TSVIOReenable(output_vio);
}

TSVIO const input_vio = data->m_upstream.m_read.m_vio;
if (nullptr != input_vio) {
TSVIONDoneSet(input_vio, TSVIONDoneGet(input_vio) + consumed);
Expand Down
1 change: 1 addition & 0 deletions plugins/experimental/slice/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ request_block(TSCont contp, Data *const data)
break;
default:
ERROR_LOG("Invalid blockstate");
return false;
break;
}

Expand Down