Skip to content
Closed
4 changes: 2 additions & 2 deletions bazel/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ def envoy_api_deps(skip_targets):
native.git_repository(
name = "envoy_api",
remote = REPO_LOCATIONS["envoy_api"],
commit = "2ece56b705813e1a7176ceb73abf4fb67563f676",
commit = "1bed17f9de9da7346ce04c354ee491b98577aa60",
)
api_bind_targets = [
"address",
"ads",
"base",
"bootstrap",
"cds",
"discovery",
"eds",
"health_check",
"lds",
Expand Down
10 changes: 8 additions & 2 deletions include/envoy/config/ads.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,25 @@ class AdsApi {
public:
virtual ~AdsApi() {}

/**
* Start streaming ADS messages.
*/
virtual void start() PURE;

/**
* Start a configuration subscription asynchronously for some API type and resources.
* @param type_url type URL corresponding to xDS API, e.g.
* type.googleapis.com/envoy.api.v2.Cluster.
* @param resources vector of resource names to fetch.
* @param resources vector of resource names to watch for on ADS. If this is empty, then all
* resources for type_url will result in callbacks.
* @param callbacks the callbacks to be notified of configuration updates. These must be valid
* until AdsWatch::cancel() is invoked.
* @return AdsWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes away,
* its EDS updates should be cancelled. Ownership of the AdsWatch belongs to the AdsApi object.
*/
virtual AdsWatchPtr subscribe(const std::string& type_url,
const std::vector<std::string>& resources,
AdsCallbacks& calllbacks) PURE;
AdsCallbacks& callbacks) PURE;
};

} // namespace Config
Expand Down
21 changes: 19 additions & 2 deletions source/common/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,24 @@ envoy_cc_library(

envoy_cc_library(
name = "ads_api_lib",
srcs = ["ads_api_impl.cc"],
hdrs = ["ads_api_impl.h"],
external_deps = ["envoy_ads"],
external_deps = ["envoy_discovery"],
deps = [
":utility_lib",
"//include/envoy/config:ads_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:logger_lib",
"//source/common/grpc:async_client_lib",
"//source/common/protobuf",
],
)

envoy_cc_library(
name = "ads_subscription_lib",
hdrs = ["ads_subscription_impl.h"],
external_deps = ["envoy_ads"],
external_deps = ["envoy_discovery"],
deps = [
"//include/envoy/config:ads_interface",
"//include/envoy/config:subscription_interface",
Expand Down Expand Up @@ -209,6 +214,12 @@ envoy_cc_library(
],
)

envoy_cc_library(
name = "resources_lib",
hdrs = ["resources.h"],
deps = ["//source/common/common:singleton"],
)

envoy_cc_library(
name = "rds_json_lib",
srcs = ["rds_json.cc"],
Expand Down Expand Up @@ -258,10 +269,15 @@ envoy_cc_library(
hdrs = ["utility.h"],
external_deps = [
"envoy_base",
"envoy_cds",
"envoy_eds",
"envoy_lds",
"envoy_rds",
"envoy_filter_http_connection_manager",
],
deps = [
":json_utility_lib",
"//include/envoy/config:ads_interface",
"//include/envoy/config:subscription_interface",
"//include/envoy/local_info:local_info_interface",
"//include/envoy/registry",
Expand All @@ -270,6 +286,7 @@ envoy_cc_library(
"//source/common/common:hash_lib",
"//source/common/common:hex_lib",
"//source/common/common:singleton",
"//source/common/grpc:common_lib",
"//source/common/json:config_schemas_lib",
"//source/common/protobuf",
"//source/common/protobuf:utility_lib",
Expand Down
161 changes: 161 additions & 0 deletions source/common/config/ads_api_impl.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
#include "common/config/ads_api_impl.h"

#include "common/config/utility.h"
#include "common/protobuf/protobuf.h"

namespace Envoy {
namespace Config {

// TODO(htuch): describe the ADS protocol

AdsApiImpl::AdsApiImpl(const envoy::api::v2::Node& node,
const envoy::api::v2::ApiConfigSource& ads_config,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher)
: node_(node), service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
"envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources")) {
if (ads_config.cluster_name().empty()) {
ENVOY_LOG(debug, "No ADS clusters defined, ADS will not be initialized.");
return;
}
retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
if (ads_config.cluster_name().size() != 1) {
// TODO(htuch): Add support for multiple clusters, #1170.
throw EnvoyException(
"envoy::api::v2::ApiConfigSource must have a singleton cluster name specified");
}
async_client_.reset(new Grpc::AsyncClientImpl<envoy::api::v2::DiscoveryRequest,
envoy::api::v2::DiscoveryResponse>(
cluster_manager, ads_config.cluster_name()[0]));
}

AdsApiImpl::~AdsApiImpl() {
for (auto watches : watches_) {
for (auto watch : watches.second) {
watch->inserted_ = false;
}
}
}

void AdsApiImpl::start() {
if (async_client_) {
establishNewStream();
}
}

void AdsApiImpl::setRetryTimer() {
retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS));
}

void AdsApiImpl::establishNewStream() {
// TODO(htuch): stats
ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
stream_ = async_client_->start(service_method_, *this);
if (stream_ == nullptr) {
ENVOY_LOG(warn, "Unable to establish new stream");
handleFailure();
return;
}

for (const auto type_url : subscriptions_) {
sendDiscoveryRequest(requests_[type_url]);
}
}

void AdsApiImpl::sendDiscoveryRequest(const envoy::api::v2::DiscoveryRequest& request) {
if (stream_ == nullptr) {
return;
}
stream_->sendMessage(request, false);
}

void AdsApiImpl::handleFailure() {
for (auto watches : watches_) {
for (auto watch : watches.second) {
watch->callbacks_.onConfigUpdateFailed(nullptr);
}
}
setRetryTimer();
}

AdsWatchPtr AdsApiImpl::subscribe(const std::string& type_url,
const std::vector<std::string>& resources,
AdsCallbacks& callbacks) {
auto watch =
std::unique_ptr<AdsWatch>(new AdsWatchImpl(resources, callbacks, watches_[type_url]));
ENVOY_LOG(debug, "ADS subscribe for " + type_url);

// Lazily kick off the requests based on first subscription. This has the
// convenient side-effect that we order messages on the channel based on
// Envoy's internal dependency ordering.
if (requests_.count(type_url) == 0) {
requests_[type_url].set_type_url(type_url);
requests_[type_url].mutable_node()->CopyFrom(node_);
subscriptions_.push_front(type_url);
sendDiscoveryRequest(requests_[type_url]);
}

return watch;
}

void AdsApiImpl::onCreateInitialMetadata(Http::HeaderMap& metadata) {
UNREFERENCED_PARAMETER(metadata);
}

void AdsApiImpl::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) {
UNREFERENCED_PARAMETER(metadata);
}

void AdsApiImpl::onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResponse>&& message) {
const std::string& type_url = message->type_url();
ENVOY_LOG(debug, "Received ADS message for {}", type_url);
try {
// To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we
// build a map here from resource name to resource and then walk watches_.
// TODO(htuch): Reconsider implementation data structure for watches to make lookups of resource
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, why not just store internally as map?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We expect that most watches that name resources (as opposed to open CDS/LDS watches, which just wildcard via an empty resource list) will have a single resource, but the current APIs internally allows for more. We need to assemble the subset of resources from those received and pass them to onConfigUpdate() once per watch. This means we need to do some gather operation. This could be either done by building buckets for each watch and populating them as we walk the watch list (as done today), or building buckets and populating them as we walk the resource list.

I'm not a fan of either of these. If we change the API to allow zero (wildcard) or one resource per watch, we can easily use a map. Given that's how it's actually used by the xDS implementations in Envoy today, I think I'm going to modify the ADS internal API to switch to this. Let me know if that sounds good.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 sounds good simpler is better

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After playing around with this, we still need to do the map construction. This is because we need to deliver empty updates to watches (e.g. EDS) when a resource is not present, so we basically need to walk all the watches and have a O(1) map back to the delivered resource matching the watched name. So, I'll leave it as is.

// -> watches O(1), to avoid doing this crap.
std::unordered_map<std::string, ProtobufWkt::Any> resources;
for (const auto& resource : message->resources()) {
if (type_url != resource.type_url()) {
throw EnvoyException(fmt::format("{} does not match {} type URL is DiscoveryResponse {}",
resource.type_url(), type_url, message->DebugString()));
}
resources.emplace(Utility::resourceName(resource), resource);
}
for (auto watch : watches_[type_url]) {
if (watch->resources_.empty()) {
watch->callbacks_.onConfigUpdate(message->resources());
continue;
}
Protobuf::RepeatedPtrField<ProtobufWkt::Any> found_resources;
for (auto watched_resource_name : watch->resources_) {
auto it = resources.find(watched_resource_name);
if (it != resources.end()) {
found_resources.Add()->CopyFrom(it->second);
}
}
if (!found_resources.empty()) {
watch->callbacks_.onConfigUpdate(found_resources);
}
}
requests_[type_url].set_version_info(message->version_info());
} catch (const EnvoyException& e) {
ENVOY_LOG(warn, "ADS config for {} update rejected: {}", message->type_url(), e.what());
for (auto watch : watches_[type_url]) {
watch->callbacks_.onConfigUpdateFailed(&e);
}
}
sendDiscoveryRequest(requests_[type_url]);
}

void AdsApiImpl::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) {
UNREFERENCED_PARAMETER(metadata);
}

void AdsApiImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
ENVOY_LOG(warn, "ADS config stream closed: {}, {}", status, message);
stream_ = nullptr;
handleFailure();
}

} // namespace Config
} // namespace Envoy
73 changes: 58 additions & 15 deletions source/common/config/ads_api_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,78 @@

#include "envoy/config/ads.h"
#include "envoy/config/subscription.h"
#include "envoy/event/dispatcher.h"
#include "envoy/upstream/cluster_manager.h"

#include "common/common/logger.h"
#include "common/grpc/async_client_impl.h"

#include "api/ads.pb.h"
#include "api/discovery.pb.h"

namespace Envoy {
namespace Config {

/**
* ADS API implementation that fetches via gRPC.
* TODO(htuch): Implement ADS. This should look similar to GrpcSubscriptionImpl, except it manages
* multiple in-flight DiscoveryRequests, one per type URL.
*/
class AdsApiImpl : public AdsApi, Logger::Loggable<Logger::Id::upstream> {
class AdsApiImpl : public AdsApi,
Grpc::AsyncStreamCallbacks<envoy::api::v2::DiscoveryResponse>,
Logger::Loggable<Logger::Id::upstream> {
public:
AdsApiImpl(const envoy::api::v2::ApiConfigSource& ads_config,
Upstream::ClusterManager& cluster_manager) {
UNREFERENCED_PARAMETER(ads_config);
UNREFERENCED_PARAMETER(cluster_manager);
}
AdsApiImpl(const envoy::api::v2::Node& node, const envoy::api::v2::ApiConfigSource& ads_config,
Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher);
~AdsApiImpl();

void start() override;
AdsWatchPtr subscribe(const std::string& type_url, const std::vector<std::string>& resources,
AdsCallbacks& callbacks) override {
UNREFERENCED_PARAMETER(type_url);
UNREFERENCED_PARAMETER(resources);
UNREFERENCED_PARAMETER(callbacks);
return nullptr;
}
AdsCallbacks& callbacks) override;

// Grpc::AsyncStreamCallbacks
void onCreateInitialMetadata(Http::HeaderMap& metadata) override;
void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) override;
void onReceiveMessage(std::unique_ptr<envoy::api::v2::DiscoveryResponse>&& message) override;
void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) override;
void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;

// TODO(htuch): Make this configurable or some static.
const uint32_t RETRY_DELAY_MS = 5000;

private:
void setRetryTimer();
void establishNewStream();
void sendDiscoveryRequest(const envoy::api::v2::DiscoveryRequest& request);
void handleFailure();

struct AdsWatchImpl : public AdsWatch {
AdsWatchImpl(const std::vector<std::string>& resources, AdsCallbacks& callbacks,
std::list<AdsWatchImpl*>& type_url_list)
: resources_(resources), callbacks_(callbacks), type_url_list_(type_url_list),
inserted_(true) {
entry_ = type_url_list_.emplace(type_url_list_.begin(), this);
}
~AdsWatchImpl() override {
if (inserted_) {
type_url_list_.erase(entry_);
}
}
std::vector<std::string> resources_;
AdsCallbacks& callbacks_;
std::list<AdsWatchImpl*>& type_url_list_;
std::list<AdsWatchImpl*>::iterator entry_;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels a lot like LinkedObject, but it doesn't store unique_ptr internally. Would be nice to somehow reuse, but I probably not worth it.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I know, I as looking at LinkedObject but it just wasn't the right thing for this.

bool inserted_;
};

envoy::api::v2::Node node_;
std::unique_ptr<
Grpc::AsyncClient<envoy::api::v2::DiscoveryRequest, envoy::api::v2::DiscoveryResponse>>
async_client_;
Grpc::AsyncStream<envoy::api::v2::DiscoveryRequest>* stream_{};
const Protobuf::MethodDescriptor& service_method_;
std::unordered_map<std::string, std::list<AdsWatchImpl*>> watches_;
std::unordered_map<std::string, envoy::api::v2::DiscoveryRequest> requests_;
// Envoy's dependendency ordering.
std::list<std::string> subscriptions_;
Event::TimerPtr retry_timer_;
};

} // namespace Config
Expand Down
Loading