From a7c2b1e584e034629f950a40169945e183c0c9b6 Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Tue, 8 Mar 2022 11:12:51 -0800 Subject: [PATCH 1/2] Add prefetch feature to slice plugin --- plugins/experimental/slice/Config.cc | 4 + plugins/experimental/slice/Config.h | 3 +- plugins/experimental/slice/Data.h | 3 + plugins/experimental/slice/Makefile.inc | 2 + plugins/experimental/slice/prefetch.cc | 134 ++++++++++++++++++++++++ plugins/experimental/slice/prefetch.h | 56 ++++++++++ plugins/experimental/slice/util.cc | 14 +++ 7 files changed, 215 insertions(+), 1 deletion(-) create mode 100644 plugins/experimental/slice/prefetch.cc create mode 100644 plugins/experimental/slice/prefetch.h diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc index 2c810c69347..8f15e500250 100644 --- a/plugins/experimental/slice/Config.cc +++ b/plugins/experimental/slice/Config.cc @@ -123,6 +123,7 @@ Config::fromArgs(int const argc, char const *const argv[]) {const_cast("remap-host"), required_argument, nullptr, 'r'}, {const_cast("skip-header"), required_argument, nullptr, 's'}, {const_cast("blockbytes-test"), required_argument, nullptr, 't'}, + {const_cast("prefetch-count"), required_argument, nullptr, 'c'}, {nullptr, 0, nullptr, 0}, }; @@ -222,6 +223,9 @@ Config::fromArgs(int const argc, char const *const argv[]) DEBUG_LOG("Skipping blockbytes-test in favor of blockbytes"); } } break; + case 'c': { + m_prefetchcount = atoi(optarg); + } break; default: break; } diff --git a/plugins/experimental/slice/Config.h b/plugins/experimental/slice/Config.h index 7188c6ea934..8408ffc2076 100644 --- a/plugins/experimental/slice/Config.h +++ b/plugins/experimental/slice/Config.h @@ -41,7 +41,8 @@ struct Config { RegexType m_regex_type{None}; pcre *m_regex{nullptr}; pcre_extra *m_regex_extra{nullptr}; - int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s + int m_paceerrsecs{0}; // -1 disable logging, 0 no pacing, max 60s + int m_prefetchcount{0}; // 0 disables prefetching enum RefType { First, Relative }; RefType m_reftype{First}; // reference slice is relative to request diff --git a/plugins/experimental/slice/Data.h b/plugins/experimental/slice/Data.h index 496ebdc3964..b0564667f41 100644 --- a/plugins/experimental/slice/Data.h +++ b/plugins/experimental/slice/Data.h @@ -25,6 +25,7 @@ #include "Stage.h" #include +#include struct Config; @@ -92,6 +93,8 @@ struct Data { Stage m_upstream; Stage m_dnstream; + std::map m_fetchstates; + HdrMgr m_req_hdrmgr; // manager for server request HdrMgr m_resp_hdrmgr; // manager for client response diff --git a/plugins/experimental/slice/Makefile.inc b/plugins/experimental/slice/Makefile.inc index be376ca1c64..f02631e204d 100644 --- a/plugins/experimental/slice/Makefile.inc +++ b/plugins/experimental/slice/Makefile.inc @@ -28,6 +28,8 @@ experimental_slice_slice_la_SOURCES = \ experimental/slice/HttpHeader.h \ experimental/slice/intercept.cc \ experimental/slice/intercept.h \ + experimental/slice/prefetch.cc \ + experimental/slice/prefetch.h \ experimental/slice/Range.cc \ experimental/slice/Range.h \ experimental/slice/response.cc \ diff --git a/plugins/experimental/slice/prefetch.cc b/plugins/experimental/slice/prefetch.cc new file mode 100644 index 00000000000..b3ab43e1f42 --- /dev/null +++ b/plugins/experimental/slice/prefetch.cc @@ -0,0 +1,134 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file prefetch.cpp + * @brief Background fetch related classes (header file). + */ + +#include "ts/ts.h" /* ATS API */ +#include "prefetch.h" + +bool +BgBlockFetch::schedule(Data *const data, int blocknum) +{ + bool ret = false; + BgBlockFetch *bg = new BgBlockFetch(blocknum); + if (bg->fetch(data)) { + ret = true; + } else { + delete bg; + } + return ret; +} + +/** + * Initialize and schedule the background fetch + */ +bool +BgBlockFetch::fetch(Data *const data) +{ + if (_bg_stream.m_read.isOpen()) { + // should never happen since the connection was just initialized + ERROR_LOG("Background block request already in flight!"); + return false; + } + + int64_t const blockbeg = (data->m_config->m_blockbytes * _blocknum); + Range blockbe(blockbeg, blockbeg + data->m_config->m_blockbytes); + + char rangestr[1024]; + int rangelen = sizeof(rangestr); + bool const rpstat = blockbe.toStringClosed(rangestr, &rangelen); + TSAssert(rpstat); + + DEBUG_LOG("Request background block: %s", rangestr); + + // reuse the incoming client header, just change the range + HttpHeader header(data->m_req_hdrmgr.m_buffer, data->m_req_hdrmgr.m_lochdr); + + // add/set sub range key and add slicer tag + bool const rangestat = header.setKeyVal(TS_MIME_FIELD_RANGE, TS_MIME_LEN_RANGE, rangestr, rangelen); + + if (!rangestat) { + ERROR_LOG("Error trying to set range request header %s", rangestr); + return false; + } + TSAssert(nullptr == _cont); + + // Setup the continuation + _cont = TSContCreate(handler, TSMutexCreate()); + TSContDataSet(_cont, static_cast(this)); + + // create virtual connection back into ATS + TSHttpConnectOptions options = TSHttpConnectOptionsGet(TS_CONNECT_PLUGIN); + options.addr = reinterpret_cast(&data->m_client_ip); + options.tag = PLUGIN_NAME; + options.id = 0; + options.buffer_index = data->m_buffer_index; + options.buffer_water_mark = data->m_buffer_water_mark; + + TSVConn const upvc = TSHttpConnectPlugin(&options); + + int const hlen = TSHttpHdrLengthGet(header.m_buffer, header.m_lochdr); + + // set up connection with the HttpConnect server + _bg_stream.setupConnection(upvc); + _bg_stream.setupVioWrite(_cont, hlen); + TSHttpHdrPrint(header.m_buffer, header.m_lochdr, _bg_stream.m_write.m_iobuf); + TSVIOReenable(_bg_stream.m_write.m_vio); + + if (TSIsDebugTagSet(PLUGIN_NAME)) { + std::string const headerstr(header.toString()); + DEBUG_LOG("Headers\n%s", headerstr.c_str()); + } + + data->m_fetchstates[_blocknum] = true; + return true; +} + +/** + * @brief Continuation to close background fetch after + * writing to cache is complete or error + * + */ +int +BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */) +{ + BgBlockFetch *bg = static_cast(TSContDataGet(contp)); + + switch (event) { + case TS_EVENT_VCONN_WRITE_COMPLETE: + TSVConnShutdown(bg->_bg_stream.m_vc, 0, 1); + delete bg; + break; + default: + if (event == TS_EVENT_VCONN_INACTIVITY_TIMEOUT) { + DEBUG_LOG("encountered Inactivity Timeout"); + TSVConnAbort(bg->_bg_stream.m_vc, TS_VC_CLOSE_ABORT); + } else { + TSVConnClose(bg->_bg_stream.m_vc); + } + bg->_bg_stream.abort(); + TSContDataSet(contp, nullptr); + delete bg; + TSContDestroy(contp); + break; + } + return 0; +} diff --git a/plugins/experimental/slice/prefetch.h b/plugins/experimental/slice/prefetch.h new file mode 100644 index 00000000000..a20207423b9 --- /dev/null +++ b/plugins/experimental/slice/prefetch.h @@ -0,0 +1,56 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +/** + * @file prefetch.h + * @brief Background fetch classes for slice plugin. + */ + +#pragma once + +#include + +#include "ts/ts.h" +#include "Data.h" +#include "Config.h" + +/** + * @brief Represents a single background fetch. + */ +struct BgBlockFetch { + static bool schedule(Data *const data, int blocknum); + + explicit BgBlockFetch(int blocknum) : _blocknum(blocknum) {} + + bool fetch(Data *const data); + static int handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */); + + /* This is for the actual background fetch / NetVC */ + Stage _bg_stream; + + int _blocknum; + TSCont _cont; + + ~BgBlockFetch() + { + if (nullptr != _cont) { + TSContDestroy(_cont); + _cont = nullptr; + } + } +}; \ No newline at end of file diff --git a/plugins/experimental/slice/util.cc b/plugins/experimental/slice/util.cc index 90ef9d8b525..7e4bc1148ec 100644 --- a/plugins/experimental/slice/util.cc +++ b/plugins/experimental/slice/util.cc @@ -17,6 +17,7 @@ */ #include "util.h" +#include "prefetch.h" #include "Config.h" #include "Data.h" @@ -110,6 +111,19 @@ request_block(TSCont contp, Data *const data) DEBUG_LOG("Headers\n%s", headerstr.c_str()); } + // if prefetch config set, schedule next block requests in background + for (int i = 0; i < data->m_config->m_prefetchcount; i++) { + int nextblocknum = data->m_blocknum + i + 1; + if (data->m_req_range.blockIsInside(data->m_config->m_blockbytes, nextblocknum) && !data->m_fetchstates[nextblocknum]) { + if (BgBlockFetch::schedule(data, nextblocknum)) { + DEBUG_LOG("Background fetch requested"); + } else { + DEBUG_LOG("Background fetch not requested"); + } + } else { + break; + } + } // get ready for data back from the server data->m_upstream.setupVioRead(contp, INT64_MAX); From c996cae062fdfb83130cd126544cde50cd4164f7 Mon Sep 17 00:00:00 2001 From: Serris Lew Date: Fri, 11 Mar 2022 15:06:45 -0800 Subject: [PATCH 2/2] Updated doc and added autest Removed reenabling write for bg continuation that only sends 1 req Standardize autest f format --- doc/admin-guide/plugins/slice.en.rst | 7 + plugins/experimental/slice/Config.cc | 4 +- plugins/experimental/slice/Data.h | 4 +- plugins/experimental/slice/prefetch.cc | 2 +- plugins/experimental/slice/prefetch.h | 2 +- .../pluginTest/slice/slice_prefetch.test.py | 178 ++++++++++++++++++ 6 files changed, 191 insertions(+), 6 deletions(-) create mode 100644 tests/gold_tests/pluginTest/slice/slice_prefetch.test.py diff --git a/doc/admin-guide/plugins/slice.en.rst b/doc/admin-guide/plugins/slice.en.rst index 0246fa78f49..8b56bbd362e 100644 --- a/doc/admin-guide/plugins/slice.en.rst +++ b/doc/admin-guide/plugins/slice.en.rst @@ -126,6 +126,13 @@ The slice plugin supports the following options:: `cache_range_requests` plugin. -i for short + --prefetch-count= (optional) + Default is 0 + Prefetches successive 'n' slice block requests in the background + and cached. Especially for large objects, prefetching can improve + cache miss latency. + -f for short + Examples:: @plugin=slice.so @pparam=--blockbytes=1000000 @plugin=cache_range_requests.so diff --git a/plugins/experimental/slice/Config.cc b/plugins/experimental/slice/Config.cc index 8f15e500250..5cd71d47339 100644 --- a/plugins/experimental/slice/Config.cc +++ b/plugins/experimental/slice/Config.cc @@ -123,7 +123,7 @@ Config::fromArgs(int const argc, char const *const argv[]) {const_cast("remap-host"), required_argument, nullptr, 'r'}, {const_cast("skip-header"), required_argument, nullptr, 's'}, {const_cast("blockbytes-test"), required_argument, nullptr, 't'}, - {const_cast("prefetch-count"), required_argument, nullptr, 'c'}, + {const_cast("prefetch-count"), required_argument, nullptr, 'f'}, {nullptr, 0, nullptr, 0}, }; @@ -223,7 +223,7 @@ Config::fromArgs(int const argc, char const *const argv[]) DEBUG_LOG("Skipping blockbytes-test in favor of blockbytes"); } } break; - case 'c': { + case 'f': { m_prefetchcount = atoi(optarg); } break; default: diff --git a/plugins/experimental/slice/Data.h b/plugins/experimental/slice/Data.h index b0564667f41..9cfbb2f7ead 100644 --- a/plugins/experimental/slice/Data.h +++ b/plugins/experimental/slice/Data.h @@ -25,7 +25,7 @@ #include "Stage.h" #include -#include +#include struct Config; @@ -93,7 +93,7 @@ struct Data { Stage m_upstream; Stage m_dnstream; - std::map m_fetchstates; + std::unordered_map m_fetchstates; HdrMgr m_req_hdrmgr; // manager for server request HdrMgr m_resp_hdrmgr; // manager for client response diff --git a/plugins/experimental/slice/prefetch.cc b/plugins/experimental/slice/prefetch.cc index b3ab43e1f42..2325db77f4b 100644 --- a/plugins/experimental/slice/prefetch.cc +++ b/plugins/experimental/slice/prefetch.cc @@ -91,7 +91,6 @@ BgBlockFetch::fetch(Data *const data) _bg_stream.setupConnection(upvc); _bg_stream.setupVioWrite(_cont, hlen); TSHttpHdrPrint(header.m_buffer, header.m_lochdr, _bg_stream.m_write.m_iobuf); - TSVIOReenable(_bg_stream.m_write.m_vio); if (TSIsDebugTagSet(PLUGIN_NAME)) { std::string const headerstr(header.toString()); @@ -115,6 +114,7 @@ BgBlockFetch::handler(TSCont contp, TSEvent event, void * /* edata ATS_UNUSED */ switch (event) { case TS_EVENT_VCONN_WRITE_COMPLETE: TSVConnShutdown(bg->_bg_stream.m_vc, 0, 1); + bg->_bg_stream.close(); delete bg; break; default: diff --git a/plugins/experimental/slice/prefetch.h b/plugins/experimental/slice/prefetch.h index a20207423b9..e04376bacf6 100644 --- a/plugins/experimental/slice/prefetch.h +++ b/plugins/experimental/slice/prefetch.h @@ -44,7 +44,7 @@ struct BgBlockFetch { Stage _bg_stream; int _blocknum; - TSCont _cont; + TSCont _cont = nullptr; ~BgBlockFetch() { diff --git a/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py b/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py new file mode 100644 index 00000000000..5dc6aed134c --- /dev/null +++ b/tests/gold_tests/pluginTest/slice/slice_prefetch.test.py @@ -0,0 +1,178 @@ +''' +''' +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Test.Summary = ''' +slice plugin prefetch feature test +''' + +# Test description: +# Fill origin server with range requests +# Request content through slice plugin with varied prefetch counts + +Test.SkipUnless( + Condition.PluginExists('slice.so'), + Condition.PluginExists('cache_range_requests.so'), +) +Test.ContinueOnFail = False + +# configure origin server, lookup by Range header +server = Test.MakeOriginServer("server", lookup_key="{%Range}") + +# Define ATS and configure +ts = Test.MakeATSProcess("ts", command="traffic_server") + +block_bytes_1 = 7 +block_bytes_2 = 5 +body = "lets go surfin now" +bodylen = len(body) + +request_header = {"headers": + "GET /path HTTP/1.1\r\n" + + "Host: origin\r\n" + + "\r\n", + "timestamp": "1469733493.993", + "body": "", + } + +response_header = {"headers": + "HTTP/1.1 200 OK\r\n" + + "Connection: close\r\n" + + "Cache-Control: public, max-age=500\r\n" + + "\r\n", + "timestamp": "1469733493.993", + "body": body, + } + +server.addResponse("sessionlog.json", request_header, response_header) + +# Autest OS doesn't support range request, must manually add requests/responses +for block_bytes in [block_bytes_1, block_bytes_2]: + for i in range(bodylen // block_bytes + 1): + b0 = i * block_bytes + b1 = b0 + block_bytes - 1 + req_header = {"headers": + "GET /path HTTP/1.1\r\n" + + "Host: *\r\n" + + "Accept: */*\r\n" + + f"Range: bytes={b0}-{b1}\r\n" + + "\r\n", + "timestamp": "1469733493.993", + "body": "" + } + if (b1 > bodylen - 1): + b1 = bodylen - 1 + resp_header = {"headers": + "HTTP/1.1 206 Partial Content\r\n" + + "Accept-Ranges: bytes\r\n" + + "Cache-Control: public, max-age=500\r\n" + + f"Content-Range: bytes {b0}-{b1}/{bodylen}\r\n" + + "Connection: close\r\n" + + "\r\n", + "timestamp": "1469733493.993", + "body": body[b0:b1 + 1] + } + server.addResponse("sessionlog.json", req_header, resp_header) + +curl_and_args = 'curl -s -D /dev/stdout -o /dev/stderr -x http://127.0.0.1:{}'.format(ts.Variables.port) + +ts.Disk.remap_config.AddLines([ + f'map http://sliceprefetch1bytes1/ http://127.0.0.1:{server.Variables.Port}' + + f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_1} @pparam=--prefetch-count=1' + + ' @plugin=cache_range_requests.so', + f'map http://sliceprefetch2bytes1/ http://127.0.0.1:{server.Variables.Port}' + + f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_1} @pparam=--prefetch-count=2' + + ' @plugin=cache_range_requests.so', + f'map http://sliceprefetch1bytes2/ http://127.0.0.1:{server.Variables.Port}' + + f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_2} @pparam=--prefetch-count=1' + + ' @plugin=cache_range_requests.so', + f'map http://sliceprefetch2bytes2/ http://127.0.0.1:{server.Variables.Port}' + + f' @plugin=slice.so @pparam=--blockbytes-test={block_bytes_2} @pparam=--prefetch-count=2' + + ' @plugin=cache_range_requests.so', +]) + +ts.Disk.records_config.update({ + 'proxy.config.diags.debug.enabled': 1, + 'proxy.config.diags.debug.tags': 'slice|cache_range_requests|pvc', +}) + +# 0 Test - Full object slice with only next block prefetched in background, block bytes= 7 +tr = Test.AddTestRun("Full object slice with only next block prefetched in background, block bytes= 7") +ps = tr.Processes.Default +ps.StartBefore(server, ready=When.PortOpen(server.Variables.Port)) +ps.StartBefore(Test.Processes.ts) +ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_200.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response") +tr.StillRunningAfter = ts + +# 1 Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 7 +tr = Test.AddTestRun("Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 7") +ps = tr.Processes.Default +ps.Command = curl_and_args + ' http://sliceprefetch2bytes1/path' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_200.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response") +tr.StillRunningAfter = ts + +# 2 Test - Full object slice with only next block prefetched in background, block bytes= 5 +tr = Test.AddTestRun("Full object slice with only next block prefetched in background, block bytes= 5") +ps = tr.Processes.Default +ps.Command = curl_and_args + ' http://sliceprefetch1bytes2/path' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_200.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response") +tr.StillRunningAfter = ts + +# 3 Test - Full object slice with nest 2 blocks prefetched in background, block bytes= 5 +tr = Test.AddTestRun("Full object slice with nest 2 blocks prefetched in background, block bytes= 5") +ps = tr.Processes.Default +ps.Command = curl_and_args + ' http://sliceprefetch2bytes2/path' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_200.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("200 OK", "expected 200 OK response") +tr.StillRunningAfter = ts + +# 4 Test - Whole asset via range +tr = Test.AddTestRun("Whole asset via range") +ps = tr.Processes.Default +ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path' + ' -r 0-' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_206.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("206 Partial Content", "expected 206 response") +ps.Streams.stdout.Content += Testers.ContainsExpression("Content-Range: bytes 0-17/18", "mismatch byte content response") +tr.StillRunningAfter = ts + +# 5 Test - Non aligned slice request +tr = Test.AddTestRun("Non aligned slice request") +ps = tr.Processes.Default +ps.Command = curl_and_args + ' http://sliceprefetch1bytes1/path' + ' -r 5-16' +ps.ReturnCode = 0 +ps.Streams.stderr = "gold/slice_mid.stderr.gold" +ps.Streams.stdout.Content = Testers.ContainsExpression("206 Partial Content", "expected 206 response") +ps.Streams.stdout.Content += Testers.ContainsExpression("Content-Range: bytes 5-16/18", "mismatch byte content response") +tr.StillRunningAfter = ts + +# 6 Test - special case, begin inside last slice block but outside asset len +tr = Test.AddTestRun("Invalid end range request, 416") +beg = len(body) + 1 +end = beg + block_bytes +ps = tr.Processes.Default +ps.Command = curl_and_args + f' http://sliceprefetch1bytes1/path -r {beg}-{end}' +ps.Streams.stdout.Content = Testers.ContainsExpression("416 Requested Range Not Satisfiable", "expected 416 response") +tr.StillRunningAfter = ts