Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/ts/parentselectdefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,5 @@ typedef struct {
bool responseIsRetryable;
bool goDirect;
bool parentIsProxy;
bool no_cache;
} TSResponseAction;
368 changes: 205 additions & 163 deletions plugins/experimental/parent_select/consistenthash.cc

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion plugins/experimental/parent_select/consistenthash.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,18 @@ struct PLNextHopConsistentHashTxn {
size_t hostname_len = 0;
in_port_t port = 0;
bool retry = false;
bool no_cache = false;
};

class PLNextHopConsistentHash : public PLNextHopSelectionStrategy
{
std::vector<std::shared_ptr<ATSConsistentHash>> rings;
uint64_t getHashKey(uint64_t sm_id, TSMBuffer reqp, TSMLoc url, TSMLoc parent_selection_url, ATSHash64 *h);

std::shared_ptr<PLHostRecord> chashLookup(const std::shared_ptr<ATSConsistentHash> &ring, uint32_t cur_ring,
PLNextHopConsistentHashTxn *state, bool *wrapped, uint64_t sm_id, TSMBuffer reqp,
TSMLoc url, TSMLoc parent_selection_url);

public:
const uint32_t LineNumberPlaceholder = 99999;

Expand All @@ -72,7 +77,8 @@ class PLNextHopConsistentHash : public PLNextHopSelectionStrategy
~PLNextHopConsistentHash();
bool Init(const YAML::Node &n);
void next(TSHttpTxn txnp, void *strategyTxn, const char *exclude_hostname, size_t exclude_hostname_len, in_port_t exclude_port,
const char **out_hostname, size_t *out_hostname_len, in_port_t *out_port, bool *out_retry, time_t now = 0) override;
const char **out_hostname, size_t *out_hostname_len, in_port_t *out_port, bool *out_retry, bool *out_no_cache,
time_t now = 0) override;
void mark(TSHttpTxn txnp, void *strategyTxn, const char *hostname, const size_t hostname_len, const in_port_t port,
const PLNHCmd status, const time_t now) override;
void *newTxn() override;
Expand Down
27 changes: 17 additions & 10 deletions plugins/experimental/parent_select/parent_select.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ struct StrategyTxn {
size_t prev_host_len;
in_port_t prev_port;
bool prev_is_retry;
bool prev_no_cache;
};

int
Expand Down Expand Up @@ -81,9 +82,10 @@ handle_send_request(TSHttpTxn txnp, StrategyTxn *strategyTxn)
strategyTxn->prev_host_len = ra.hostname_len;
strategyTxn->prev_port = ra.port;
strategyTxn->prev_is_retry = ra.is_retry;
strategyTxn->prev_no_cache = ra.no_cache;

strategy->next(txnp, strategyTxn->txn, ra.hostname, ra.hostname_len, ra.port, &ra.hostname, &ra.hostname_len, &ra.port,
&ra.is_retry);
&ra.is_retry, &ra.no_cache);

ra.nextHopExists = strategy->nextHopExists(txnp);
ra.fail = !ra.nextHopExists; // failed is whether to fail and return to the client. failed=false means to retry the parent we set
Expand Down Expand Up @@ -121,6 +123,7 @@ mark_response(TSHttpTxn txnp, StrategyTxn *strategyTxn, TSHttpStatus status)
ra.hostname_len = strategyTxn->prev_host_len;
ra.port = strategyTxn->prev_port;
ra.is_retry = strategyTxn->prev_is_retry;
ra.no_cache = strategyTxn->prev_no_cache;
TSDebug(PLUGIN_NAME, "mark_response using prev %.*s:%d", int(ra.hostname_len), ra.hostname, ra.port);
} else {
TSHttpTxnResponseActionGet(txnp, &ra);
Expand Down Expand Up @@ -201,6 +204,7 @@ handle_read_response(TSHttpTxn txnp, StrategyTxn *strategyTxn)
strategyTxn->prev_host_len = 0;
strategyTxn->prev_port = 0;
strategyTxn->prev_is_retry = false;
strategyTxn->prev_no_cache = false;

TSHandleMLocRelease(resp, TS_NULL_MLOC, resp_hdr);
TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
Expand Down Expand Up @@ -235,6 +239,7 @@ handle_os_dns(TSHttpTxn txnp, StrategyTxn *strategyTxn)
strategyTxn->prev_host = nullptr;
strategyTxn->prev_port = 0;
strategyTxn->prev_is_retry = false;
strategyTxn->prev_no_cache = false;
TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE);
return TS_SUCCESS;
}
Expand All @@ -247,7 +252,7 @@ handle_os_dns(TSHttpTxn txnp, StrategyTxn *strategyTxn)
const size_t exclude_host_len = 0;
const in_port_t exclude_port = 0;
strategy->next(txnp, strategyTxn->txn, exclude_host, exclude_host_len, exclude_port, &ra.hostname, &ra.hostname_len, &ra.port,
&ra.is_retry);
&ra.is_retry, &ra.no_cache);

ra.fail = ra.hostname == nullptr; // failed is whether to immediately fail and return the client a 502. In this case: whether or
// not we found another parent.
Expand Down Expand Up @@ -419,6 +424,7 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri)
strategyTxn->prev_host = nullptr;
strategyTxn->prev_port = 0;
strategyTxn->prev_is_retry = false;
strategyTxn->prev_no_cache = false;
TSContDataSet(cont, (void *)strategyTxn);

// TSHttpTxnHookAdd(txnp, TS_HTTP_READ_REQUEST_HDR_HOOK, cont);
Expand All @@ -434,22 +440,23 @@ TSRemapDoRemap(void *ih, TSHttpTxn txnp, TSRemapRequestInfo *rri)
constexpr const size_t exclude_host_len = 0;
constexpr const in_port_t exclude_port = 0;
strategy->next(txnp, strategyTxn->txn, exclude_host, exclude_host_len, exclude_port, &ra.hostname, &ra.hostname_len, &ra.port,
&ra.is_retry);
&ra.is_retry, &ra.no_cache);

if (ra.hostname == nullptr) {
ra.nextHopExists = ra.hostname != nullptr;
ra.fail = !ra.nextHopExists;
// The action here is used for the very first connection, not any retry. So of course we should try it.
ra.responseIsRetryable = true;
ra.goDirect = strategy->goDirect();
ra.parentIsProxy = strategy->parentIsProxy();

if (ra.fail && !ra.goDirect) {
// TODO make configurable
TSDebug(PLUGIN_NAME, "TSRemapDoRemap strategy '%s' next returned nil, returning 502!", strategy->name());
TSHttpTxnStatusSet(txnp, TS_HTTP_STATUS_BAD_GATEWAY);
// TODO verify TS_EVENT_HTTP_TXN_CLOSE fires, and if not, free the cont here.
return TSREMAP_DID_REMAP;
}

ra.fail = false;
ra.nextHopExists = true;
ra.responseIsRetryable =
true; // The action here is used for the very first connection, not any retry. So of course we should try it.
ra.goDirect = strategy->goDirect();
ra.parentIsProxy = strategy->parentIsProxy();
TSDebug(PLUGIN_NAME, "TSRemapDoRemap setting response_action hostname '%.*s' port %d direct %d proxy %d", int(ra.hostname_len),
ra.hostname, ra.port, ra.goDirect, ra.parentIsProxy);
TSHttpTxnResponseActionSet(txnp, &ra);
Expand Down
52 changes: 47 additions & 5 deletions plugins/experimental/parent_select/strategy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,13 @@
//

// ring mode strings
constexpr std::string_view alternate_rings = "alternate_ring";
constexpr std::string_view exhaust_rings = "exhaust_ring";
const std::string_view alternate_rings = "alternate_ring";
const std::string_view exhaust_rings = "exhaust_ring";
const std::string_view peering_rings = "peering_ring";

// health check strings
constexpr std::string_view active_health_check = "active";
constexpr std::string_view passive_health_check = "passive";
const std::string_view active_health_check = "active";
const std::string_view passive_health_check = "passive";

PLNextHopSelectionStrategy::PLNextHopSelectionStrategy(const std::string_view &name)
{
Expand All @@ -66,6 +67,8 @@ bool
PLNextHopSelectionStrategy::Init(const YAML::Node &n)
{
PL_NH_Debug(PL_NH_DEBUG_TAG, "calling Init()");
std::string self_host;
bool self_host_used = false;

try {
if (n["scheme"]) {
Expand Down Expand Up @@ -96,6 +99,10 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
ignore_self_detect = n["ignore_self_detect"].as<bool>();
}

if (n["cache_peer_result"]) {
cache_peer_result = n["cache_peer_result"].as<bool>();
}

// failover node.
YAML::Node failover_node;
if (n["failover"]) {
Expand All @@ -106,6 +113,13 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
ring_mode = PL_NH_ALTERNATE_RING;
} else if (ring_mode_val == exhaust_rings) {
ring_mode = PL_NH_EXHAUST_RING;
} else if (ring_mode_val == peering_rings) {
ring_mode = PL_NH_PEERING_RING;
YAML::Node self_node = failover_node["self"];
if (self_node) {
self_host = self_node.Scalar();
PL_NH_Debug(PL_NH_DEBUG_TAG, "%s is self", self_host.c_str());
}
} else {
ring_mode = PL_NH_ALTERNATE_RING;
PL_NH_Note("Invalid 'ring_mode' value, '%s', for the strategy named '%s', using default '%s'.", ring_mode_val.c_str(),
Expand Down Expand Up @@ -212,9 +226,16 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
std::shared_ptr<PLHostRecord> host_rec = std::make_shared<PLHostRecord>(hosts_list[hst].as<PLHostRecord>());
host_rec->group_index = grp;
host_rec->host_index = hst;
if (TSHostnameIsSelf(host_rec->hostname.c_str(), host_rec->hostname.size()) == TS_SUCCESS) {
if (self_host == host_rec->hostname ||
TSHostnameIsSelf(host_rec->hostname.c_str(), host_rec->hostname.size()) == TS_SUCCESS) {
if (ring_mode == PL_NH_PEERING_RING && grp != 0) {
throw std::invalid_argument("self host (" + self_host +
") can only appear in first host group for peering ring mode");
}
TSHostStatusSet(host_rec->hostname.c_str(), host_rec->hostname.size(), TSHostStatus::TS_HOST_STATUS_DOWN, 0,
static_cast<unsigned int>(TS_HOST_STATUS_SELF_DETECT));
host_rec->self = true;
self_host_used = true;
}
hosts_inner.push_back(std::move(host_rec));
num_parents++;
Expand All @@ -225,12 +246,33 @@ PLNextHopSelectionStrategy::Init(const YAML::Node &n)
}
}
}
if (!self_host.empty() && !self_host_used) {
throw std::invalid_argument("self host (" + self_host + ") does not appear in the first (peer) group");
}
} catch (std::exception &ex) {
PL_NH_Note("Error parsing the strategy named '%s' due to '%s', this strategy will be ignored.", strategy_name.c_str(),
ex.what());
return false;
}

if (ring_mode == PL_NH_PEERING_RING) {
if (groups == 1) {
if (!go_direct) {
PL_NH_Error("when ring mode is '%s', go_direct must be true when there is only one host group.", peering_rings.data());
return false;
}
} else if (groups != 2) {
PL_NH_Error("when ring mode is '%s', requires two host groups (peering group and an upstream group),"
" or just a single peering group with go_direct.",
peering_rings.data());
return false;
}
// if (policy_type != PL_NH_CONSISTENT_HASH) {
// PL_NH_Error("ring mode '%s', is only implemented for a 'consistent_hash' policy.", peering_rings.data());
// return false;
// }
}

return true;
}

Expand Down
44 changes: 29 additions & 15 deletions plugins/experimental/parent_select/strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,15 @@

constexpr const char *PL_NH_DEBUG_TAG = "plugin_nexthop";

// ring mode strings
extern const std::string_view alternate_rings;
extern const std::string_view exhaust_rings;
extern const std::string_view peering_rings;

// health check strings
extern const std::string_view active_health_check;
extern const std::string_view passive_health_check;

#define PL_NH_Debug(tag, fmt, ...) TSDebug(tag, "[%s:%d]: " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
#define PL_NH_Error(fmt, ...) TSError("(%s) [%s:%d]: " fmt, PLUGIN_NAME, __FILE__, __LINE__, ##__VA_ARGS__)
#define PL_NH_Note(fmt, ...) TSDebug(PL_NH_DEBUG_TAG, "[%s:%d]: " fmt, __FILE__, __LINE__, ##__VA_ARGS__)
Expand All @@ -59,7 +68,7 @@ enum PLNHPolicyType {

enum PLNHSchemeType { PL_NH_SCHEME_NONE = 0, PL_NH_SCHEME_HTTP, PL_NH_SCHEME_HTTPS };

enum PLNHRingMode { PL_NH_ALTERNATE_RING = 0, PL_NH_EXHAUST_RING };
enum PLNHRingMode { PL_NH_ALTERNATE_RING = 0, PL_NH_EXHAUST_RING, PL_NH_PEERING_RING };

// response codes container
struct PLResponseCodes {
Expand Down Expand Up @@ -96,13 +105,14 @@ struct PLNHProtocol {
struct PLHostRecord : ATSConsistentHashNode {
std::mutex _mutex;
std::string hostname;
time_t failedAt;
uint32_t failCount;
time_t upAt;
std::atomic<time_t> failedAt;
std::atomic<uint32_t> failCount;
std::atomic<time_t> upAt;
float weight;
std::string hash_string;
int host_index;
int group_index;
bool self = false;
std::vector<std::shared_ptr<PLNHProtocol>> protocols;

// construct without locking the _mutex.
Expand All @@ -123,30 +133,33 @@ struct PLHostRecord : ATSConsistentHashNode {
PLHostRecord(const PLHostRecord &o)
{
hostname = o.hostname;
failedAt = o.failedAt;
failCount = o.failCount;
upAt = o.upAt;
failedAt = o.failedAt.load();
failCount = o.failCount.load();
upAt = o.upAt.load();
weight = o.weight;
hash_string = o.hash_string;
host_index = -1;
group_index = -1;
available = true;
host_index = o.host_index;
group_index = o.group_index;
available = o.available.load();
protocols = o.protocols;
self = o.self;
}

// assign without copying the _mutex.
PLHostRecord &
operator=(const PLHostRecord &o)
{
hostname = o.hostname;
failedAt = o.failedAt;
upAt = o.upAt;
failedAt = o.failedAt.load();
failCount = o.failCount.load();
upAt = o.upAt.load();
weight = o.weight;
hash_string = o.hash_string;
host_index = o.host_index;
group_index = o.group_index;
available = o.available.load();
protocols = o.protocols;
self = o.self;
return *this;
}

Expand Down Expand Up @@ -209,7 +222,7 @@ class TSNextHopSelectionStrategy
virtual const char *name() = 0;
virtual void next(TSHttpTxn txnp, void *strategyTxn, const char *exclude_hostname, size_t exclude_hostname_len,
in_port_t exclude_port, const char **out_hostname, size_t *out_hostname_len, in_port_t *out_port,
bool *out_retry, time_t now = 0) = 0;
bool *out_retry, bool *out_no_cache, time_t now = 0) = 0;
virtual void mark(TSHttpTxn txnp, void *strategyTxn, const char *hostname, const size_t hostname_len, const in_port_t port,
const PLNHCmd status, const time_t now = 0) = 0;
virtual bool nextHopExists(TSHttpTxn txnp) = 0;
Expand All @@ -234,9 +247,9 @@ class PLNextHopSelectionStrategy : public TSNextHopSelectionStrategy

virtual void next(TSHttpTxn txnp, void *strategyTxn, const char *exclude_hostname, size_t exclude_hostname_len,
in_port_t exclude_port, const char **out_hostname, size_t *out_hostname_len, in_port_t *out_port,
bool *out_retry, time_t now = 0) = 0;
bool *out_retry, bool *out_no_cache, time_t now = 0) = 0;
virtual void mark(TSHttpTxn txnp, void *strategyTxn, const char *hostname, const size_t hostname_len, const in_port_t port,
const PLNHCmd status, const time_t now = 0) = 0;
const PLNHCmd status, const time_t now = 0) = 0;
virtual bool nextHopExists(TSHttpTxn txnp);
virtual bool codeIsFailure(TSHttpStatus response_code);
virtual bool responseIsRetryable(unsigned int current_retry_attempts, TSHttpStatus response_code);
Expand All @@ -256,6 +269,7 @@ class PLNextHopSelectionStrategy : public TSNextHopSelectionStrategy
bool go_direct = true;
bool parent_is_proxy = true;
bool ignore_self_detect = false;
bool cache_peer_result = true;
PLNHSchemeType scheme = PL_NH_SCHEME_NONE;
PLNHRingMode ring_mode = PL_NH_ALTERNATE_RING;
PLResponseCodes resp_codes; // simple retry codes
Expand Down
13 changes: 10 additions & 3 deletions proxy/http/HttpTransact.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3642,9 +3642,16 @@ HttpTransact::handle_response_from_parent(State *s)
}
// the next hop strategy is configured not
// to cache a response from a next hop peer.
if (s->parent_result.do_not_cache_response) {
TxnDebug("http_trans", "response is from a next hop peer, do not cache.");
s->cache_info.action = CACHE_DO_NO_ACTION;
if (s->response_action.handled) {
if (s->response_action.action.no_cache) {
TxnDebug("http_trans", "plugin set response_action.no_cache, do not cache.");
s->cache_info.action = CACHE_DO_NO_ACTION;
}
} else {
if (s->parent_result.do_not_cache_response) {
TxnDebug("http_trans", "response is from a next hop peer, do not cache.");
s->cache_info.action = CACHE_DO_NO_ACTION;
}
}
handle_forward_server_connection_open(s);
break;
Expand Down
1 change: 1 addition & 0 deletions tests/gold_tests/pluginTest/parent_select/body.gold
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This is the body.
Loading