diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index e1581c2937537..8d0a1137a75bb 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -112,14 +112,14 @@ def envoy_api_deps(skip_targets): native.git_repository( name = "envoy_api", remote = REPO_LOCATIONS["envoy_api"], - commit = "2ece56b705813e1a7176ceb73abf4fb67563f676", + commit = "1bed17f9de9da7346ce04c354ee491b98577aa60", ) api_bind_targets = [ "address", - "ads", "base", "bootstrap", "cds", + "discovery", "eds", "health_check", "lds", diff --git a/include/envoy/config/ads.h b/include/envoy/config/ads.h index ca6c0cba0fe70..b537ebae617a9 100644 --- a/include/envoy/config/ads.h +++ b/include/envoy/config/ads.h @@ -46,11 +46,17 @@ class AdsApi { public: virtual ~AdsApi() {} + /** + * Start streaming ADS messages. + */ + virtual void start() PURE; + /** * Start a configuration subscription asynchronously for some API type and resources. * @param type_url type URL corresponding to xDS API, e.g. * type.googleapis.com/envoy.api.v2.Cluster. - * @param resources vector of resource names to fetch. + * @param resources vector of resource names to watch for on ADS. If this is empty, then all + * resources for type_url will result in callbacks. * @param callbacks the callbacks to be notified of configuration updates. These must be valid * until AdsWatch::cancel() is invoked. * @return AdsWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes away, @@ -58,7 +64,7 @@ class AdsApi { */ virtual AdsWatchPtr subscribe(const std::string& type_url, const std::vector& resources, - AdsCallbacks& calllbacks) PURE; + AdsCallbacks& callbacks) PURE; }; } // namespace Config diff --git a/source/common/config/BUILD b/source/common/config/BUILD index f880c75e07407..59a070007134c 100644 --- a/source/common/config/BUILD +++ b/source/common/config/BUILD @@ -22,19 +22,24 @@ envoy_cc_library( envoy_cc_library( name = "ads_api_lib", + srcs = ["ads_api_impl.cc"], hdrs = ["ads_api_impl.h"], - external_deps = ["envoy_ads"], + external_deps = ["envoy_discovery"], deps = [ + ":utility_lib", "//include/envoy/config:ads_interface", "//include/envoy/config:subscription_interface", "//include/envoy/upstream:cluster_manager_interface", + "//source/common/common:logger_lib", + "//source/common/grpc:async_client_lib", + "//source/common/protobuf", ], ) envoy_cc_library( name = "ads_subscription_lib", hdrs = ["ads_subscription_impl.h"], - external_deps = ["envoy_ads"], + external_deps = ["envoy_discovery"], deps = [ "//include/envoy/config:ads_interface", "//include/envoy/config:subscription_interface", @@ -209,6 +214,12 @@ envoy_cc_library( ], ) +envoy_cc_library( + name = "resources_lib", + hdrs = ["resources.h"], + deps = ["//source/common/common:singleton"], +) + envoy_cc_library( name = "rds_json_lib", srcs = ["rds_json.cc"], @@ -258,10 +269,15 @@ envoy_cc_library( hdrs = ["utility.h"], external_deps = [ "envoy_base", + "envoy_cds", + "envoy_eds", + "envoy_lds", + "envoy_rds", "envoy_filter_http_connection_manager", ], deps = [ ":json_utility_lib", + "//include/envoy/config:ads_interface", "//include/envoy/config:subscription_interface", "//include/envoy/local_info:local_info_interface", "//include/envoy/registry", @@ -270,6 +286,7 @@ envoy_cc_library( "//source/common/common:hash_lib", "//source/common/common:hex_lib", "//source/common/common:singleton", + "//source/common/grpc:common_lib", "//source/common/json:config_schemas_lib", "//source/common/protobuf", "//source/common/protobuf:utility_lib", diff --git a/source/common/config/ads_api_impl.cc b/source/common/config/ads_api_impl.cc new file mode 100644 index 0000000000000..0c25bbb798f20 --- /dev/null +++ b/source/common/config/ads_api_impl.cc @@ -0,0 +1,161 @@ +#include "common/config/ads_api_impl.h" + +#include "common/config/utility.h" +#include "common/protobuf/protobuf.h" + +namespace Envoy { +namespace Config { + +// TODO(htuch): describe the ADS protocol + +AdsApiImpl::AdsApiImpl(const envoy::api::v2::Node& node, + const envoy::api::v2::ApiConfigSource& ads_config, + Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher) + : node_(node), service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName( + "envoy.api.v2.AggregatedDiscoveryService.StreamAggregatedResources")) { + if (ads_config.cluster_name().empty()) { + ENVOY_LOG(debug, "No ADS clusters defined, ADS will not be initialized."); + return; + } + retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); }); + if (ads_config.cluster_name().size() != 1) { + // TODO(htuch): Add support for multiple clusters, #1170. + throw EnvoyException( + "envoy::api::v2::ApiConfigSource must have a singleton cluster name specified"); + } + async_client_.reset(new Grpc::AsyncClientImpl( + cluster_manager, ads_config.cluster_name()[0])); +} + +AdsApiImpl::~AdsApiImpl() { + for (auto watches : watches_) { + for (auto watch : watches.second) { + watch->inserted_ = false; + } + } +} + +void AdsApiImpl::start() { + if (async_client_) { + establishNewStream(); + } +} + +void AdsApiImpl::setRetryTimer() { + retry_timer_->enableTimer(std::chrono::milliseconds(RETRY_DELAY_MS)); +} + +void AdsApiImpl::establishNewStream() { + // TODO(htuch): stats + ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString()); + stream_ = async_client_->start(service_method_, *this); + if (stream_ == nullptr) { + ENVOY_LOG(warn, "Unable to establish new stream"); + handleFailure(); + return; + } + + for (const auto type_url : subscriptions_) { + sendDiscoveryRequest(requests_[type_url]); + } +} + +void AdsApiImpl::sendDiscoveryRequest(const envoy::api::v2::DiscoveryRequest& request) { + if (stream_ == nullptr) { + return; + } + stream_->sendMessage(request, false); +} + +void AdsApiImpl::handleFailure() { + for (auto watches : watches_) { + for (auto watch : watches.second) { + watch->callbacks_.onConfigUpdateFailed(nullptr); + } + } + setRetryTimer(); +} + +AdsWatchPtr AdsApiImpl::subscribe(const std::string& type_url, + const std::vector& resources, + AdsCallbacks& callbacks) { + auto watch = + std::unique_ptr(new AdsWatchImpl(resources, callbacks, watches_[type_url])); + ENVOY_LOG(debug, "ADS subscribe for " + type_url); + + // Lazily kick off the requests based on first subscription. This has the + // convenient side-effect that we order messages on the channel based on + // Envoy's internal dependency ordering. + if (requests_.count(type_url) == 0) { + requests_[type_url].set_type_url(type_url); + requests_[type_url].mutable_node()->CopyFrom(node_); + subscriptions_.push_front(type_url); + sendDiscoveryRequest(requests_[type_url]); + } + + return watch; +} + +void AdsApiImpl::onCreateInitialMetadata(Http::HeaderMap& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void AdsApiImpl::onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void AdsApiImpl::onReceiveMessage(std::unique_ptr&& message) { + const std::string& type_url = message->type_url(); + ENVOY_LOG(debug, "Received ADS message for {}", type_url); + try { + // To avoid O(n^2) explosion (e.g. when we have 1000s of EDS watches), we + // build a map here from resource name to resource and then walk watches_. + // TODO(htuch): Reconsider implementation data structure for watches to make lookups of resource + // -> watches O(1), to avoid doing this crap. + std::unordered_map resources; + for (const auto& resource : message->resources()) { + if (type_url != resource.type_url()) { + throw EnvoyException(fmt::format("{} does not match {} type URL is DiscoveryResponse {}", + resource.type_url(), type_url, message->DebugString())); + } + resources.emplace(Utility::resourceName(resource), resource); + } + for (auto watch : watches_[type_url]) { + if (watch->resources_.empty()) { + watch->callbacks_.onConfigUpdate(message->resources()); + continue; + } + Protobuf::RepeatedPtrField found_resources; + for (auto watched_resource_name : watch->resources_) { + auto it = resources.find(watched_resource_name); + if (it != resources.end()) { + found_resources.Add()->CopyFrom(it->second); + } + } + if (!found_resources.empty()) { + watch->callbacks_.onConfigUpdate(found_resources); + } + } + requests_[type_url].set_version_info(message->version_info()); + } catch (const EnvoyException& e) { + ENVOY_LOG(warn, "ADS config for {} update rejected: {}", message->type_url(), e.what()); + for (auto watch : watches_[type_url]) { + watch->callbacks_.onConfigUpdateFailed(&e); + } + } + sendDiscoveryRequest(requests_[type_url]); +} + +void AdsApiImpl::onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) { + UNREFERENCED_PARAMETER(metadata); +} + +void AdsApiImpl::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) { + ENVOY_LOG(warn, "ADS config stream closed: {}, {}", status, message); + stream_ = nullptr; + handleFailure(); +} + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/ads_api_impl.h b/source/common/config/ads_api_impl.h index 01f6bc85d8cf2..6e0e30592e3ac 100644 --- a/source/common/config/ads_api_impl.h +++ b/source/common/config/ads_api_impl.h @@ -2,35 +2,78 @@ #include "envoy/config/ads.h" #include "envoy/config/subscription.h" +#include "envoy/event/dispatcher.h" #include "envoy/upstream/cluster_manager.h" #include "common/common/logger.h" +#include "common/grpc/async_client_impl.h" -#include "api/ads.pb.h" +#include "api/discovery.pb.h" namespace Envoy { namespace Config { /** * ADS API implementation that fetches via gRPC. - * TODO(htuch): Implement ADS. This should look similar to GrpcSubscriptionImpl, except it manages - * multiple in-flight DiscoveryRequests, one per type URL. */ -class AdsApiImpl : public AdsApi, Logger::Loggable { +class AdsApiImpl : public AdsApi, + Grpc::AsyncStreamCallbacks, + Logger::Loggable { public: - AdsApiImpl(const envoy::api::v2::ApiConfigSource& ads_config, - Upstream::ClusterManager& cluster_manager) { - UNREFERENCED_PARAMETER(ads_config); - UNREFERENCED_PARAMETER(cluster_manager); - } + AdsApiImpl(const envoy::api::v2::Node& node, const envoy::api::v2::ApiConfigSource& ads_config, + Upstream::ClusterManager& cluster_manager, Event::Dispatcher& dispatcher); + ~AdsApiImpl(); + void start() override; AdsWatchPtr subscribe(const std::string& type_url, const std::vector& resources, - AdsCallbacks& callbacks) override { - UNREFERENCED_PARAMETER(type_url); - UNREFERENCED_PARAMETER(resources); - UNREFERENCED_PARAMETER(callbacks); - return nullptr; - } + AdsCallbacks& callbacks) override; + + // Grpc::AsyncStreamCallbacks + void onCreateInitialMetadata(Http::HeaderMap& metadata) override; + void onReceiveInitialMetadata(Http::HeaderMapPtr&& metadata) override; + void onReceiveMessage(std::unique_ptr&& message) override; + void onReceiveTrailingMetadata(Http::HeaderMapPtr&& metadata) override; + void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; + + // TODO(htuch): Make this configurable or some static. + const uint32_t RETRY_DELAY_MS = 5000; + +private: + void setRetryTimer(); + void establishNewStream(); + void sendDiscoveryRequest(const envoy::api::v2::DiscoveryRequest& request); + void handleFailure(); + + struct AdsWatchImpl : public AdsWatch { + AdsWatchImpl(const std::vector& resources, AdsCallbacks& callbacks, + std::list& type_url_list) + : resources_(resources), callbacks_(callbacks), type_url_list_(type_url_list), + inserted_(true) { + entry_ = type_url_list_.emplace(type_url_list_.begin(), this); + } + ~AdsWatchImpl() override { + if (inserted_) { + type_url_list_.erase(entry_); + } + } + std::vector resources_; + AdsCallbacks& callbacks_; + std::list& type_url_list_; + std::list::iterator entry_; + bool inserted_; + }; + + envoy::api::v2::Node node_; + std::unique_ptr< + Grpc::AsyncClient> + async_client_; + Grpc::AsyncStream* stream_{}; + const Protobuf::MethodDescriptor& service_method_; + std::unordered_map> watches_; + std::unordered_map requests_; + // Envoy's dependendency ordering. + std::list subscriptions_; + Event::TimerPtr retry_timer_; }; } // namespace Config diff --git a/source/common/config/ads_subscription_impl.h b/source/common/config/ads_subscription_impl.h index 195978991100d..f6d8dddd22171 100644 --- a/source/common/config/ads_subscription_impl.h +++ b/source/common/config/ads_subscription_impl.h @@ -7,8 +7,9 @@ #include "common/common/logger.h" #include "common/grpc/common.h" #include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" -#include "api/ads.pb.h" +#include "api/discovery.pb.h" namespace Envoy { namespace Config { @@ -47,21 +48,20 @@ class AdsSubscriptionImpl : public Subscription, Protobuf::RepeatedPtrField typed_resources; std::transform(resources.cbegin(), resources.cend(), Protobuf::RepeatedPtrFieldBackInserter(&typed_resources), - [](const ProtobufWkt::Any& resource) { - ResourceType typed_resource; - resource.UnpackTo(&typed_resource); - return typed_resource; - }); + MessageUtil::anyConvert); callbacks_->onConfigUpdate(typed_resources); stats_.update_success_.inc(); stats_.update_attempt_.inc(); - ENVOY_LOG(debug, "ADS config for {} accepted", type_url_); + ENVOY_LOG(debug, "ADS config for {} accepted with {} resources", type_url_, resources.size()); + for (const auto resource : typed_resources) { + ENVOY_LOG(debug, "- {}", resource.DebugString()); + } } void onConfigUpdateFailed(const EnvoyException* e) override { stats_.update_rejected_.inc(); stats_.update_attempt_.inc(); - ENVOY_LOG(warn, "ADS config for {} rejected: {}", type_url_, e->what()); + ENVOY_LOG(warn, "ADS config for {} rejected: {}", type_url_, e == nullptr ? "" : e->what()); callbacks_->onConfigUpdateFailed(e); } diff --git a/source/common/config/resources.h b/source/common/config/resources.h new file mode 100644 index 0000000000000..ae8fff8d1715c --- /dev/null +++ b/source/common/config/resources.h @@ -0,0 +1,24 @@ +#pragma once + +#include + +#include "common/common/singleton.h" + +namespace Envoy { +namespace Config { + +/** + * Constant Type URLs. + */ +class TypeUrlValues { +public: + const std::string Listener{"type.googleapis.com/envoy.api.v2.Listener"}; + const std::string Cluster{"type.googleapis.com/envoy.api.v2.Cluster"}; + const std::string ClusterLoadAssignment{"type.googleapis.com/envoy.api.v2.ClusterLoadAssignment"}; + const std::string RouteConfiguration{"type.googleapis.com/envoy.api.v2.RouteConfiguration"}; +}; + +typedef ConstSingleton TypeUrl; + +} // namespace Config +} // namespace Envoy diff --git a/source/common/config/utility.cc b/source/common/config/utility.cc index ce91b4b8f3802..13a0b1c22aed6 100644 --- a/source/common/config/utility.cc +++ b/source/common/config/utility.cc @@ -6,6 +6,7 @@ #include "common/config/json_utility.h" #include "common/json/config_schemas.h" #include "common/protobuf/protobuf.h" +#include "common/protobuf/utility.h" #include "fmt/format.h" @@ -100,5 +101,23 @@ void Utility::translateLdsConfig(const Json::Object& json_lds, *lds_config.mutable_api_config_source()); } +std::string Utility::resourceName(const ProtobufWkt::Any& resource) { + if (resource.type_url() == Grpc::Common::typeUrl("envoy.api.v2.Listener")) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Grpc::Common::typeUrl("envoy.api.v2.RouteConfiguration")) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Grpc::Common::typeUrl("envoy.api.v2.Cluster")) { + return MessageUtil::anyConvert(resource).name(); + } + if (resource.type_url() == Grpc::Common::typeUrl("envoy.api.v2.ClusterLoadAssignment")) { + return MessageUtil::anyConvert(resource).cluster_name(); + } + ASSERT(false); + // TODO(htuch): this is a protocol error. + return "unknown_name"; +} + } // namespace Config } // namespace Envoy diff --git a/source/common/config/utility.h b/source/common/config/utility.h index fb3f40a84fb00..3ac6f65040a7c 100644 --- a/source/common/config/utility.h +++ b/source/common/config/utility.h @@ -1,19 +1,26 @@ #pragma once +#include "envoy/config/ads.h" #include "envoy/config/subscription.h" #include "envoy/json/json_object.h" #include "envoy/local_info/local_info.h" #include "envoy/registry/registry.h" #include "envoy/upstream/cluster_manager.h" +#include "common/common/assert.h" #include "common/common/hash.h" #include "common/common/hex.h" #include "common/common/singleton.h" +#include "common/grpc/common.h" #include "common/protobuf/protobuf.h" #include "common/protobuf/utility.h" #include "api/base.pb.h" +#include "api/cds.pb.h" +#include "api/eds.pb.h" #include "api/filter/http_connection_manager.pb.h" +#include "api/lds.pb.h" +#include "api/rds.pb.h" namespace Envoy { namespace Config { @@ -44,7 +51,7 @@ class Utility { static Protobuf::RepeatedPtrField getTypedResources(const envoy::api::v2::DiscoveryResponse& response) { Protobuf::RepeatedPtrField typed_resources; - for (auto& resource : response.resources()) { + for (const auto& resource : response.resources()) { auto* typed_resource = typed_resources.Add(); resource.UnpackTo(typed_resource); } @@ -190,6 +197,16 @@ class Utility { return config; } + + /** + * Obtain the "name" of a v2 API resource in a google.protobuf.Any, e.g. the route config name for + * a Routeconfiguration, based on the underlying resource type. + * TODO(htuch): This is kind of a hack. If we had a better support for resource names as first + * class in the API, this would not be necessary. + * @param resource google.protobuf.Any v2 API resource. + * @return std::string resource name. + */ + static std::string resourceName(const ProtobufWkt::Any& resource); }; } // namespace Config diff --git a/source/common/grpc/codec.h b/source/common/grpc/codec.h index 6089d5798c45b..eaaac0a3d1d15 100644 --- a/source/common/grpc/codec.h +++ b/source/common/grpc/codec.h @@ -45,6 +45,11 @@ class Decoder { // @return bool whether the decoding succeeded or not. bool decode(Buffer::Instance& input, std::vector& output); + // Determine the length of the current frame being decoded. This is useful when supplying a + // partial frame to decode() and wanting to know how many more bytes need to be read to complete + // the frame. + uint32_t length() const { return frame_.length_; } + private: // Wire format (http://www.grpc.io/docs/guides/wire.html) of GRPC data frame // header: diff --git a/source/common/protobuf/utility.cc b/source/common/protobuf/utility.cc index 13e4eac68fb7e..19cc51a38742b 100644 --- a/source/common/protobuf/utility.cc +++ b/source/common/protobuf/utility.cc @@ -21,6 +21,11 @@ void MessageUtil::loadFromJson(const std::string& json, Protobuf::Message& messa } } +void MessageUtil::loadFromYaml(const std::string& yaml, Protobuf::Message& message) { + const std::string json = Json::Factory::loadFromYamlString(yaml)->asJsonString(); + loadFromJson(json, message); +} + void MessageUtil::loadFromFile(const std::string& path, Protobuf::Message& message) { const std::string contents = Filesystem::fileReadToEnd(path); // If the filename ends with .pb, attempt to parse it as a binary proto. @@ -41,8 +46,7 @@ void MessageUtil::loadFromFile(const std::string& path, Protobuf::Message& messa message.GetTypeName() + ")"); } if (StringUtil::endsWith(path, ".yaml")) { - const std::string json = Json::Factory::loadFromYamlString(contents)->asJsonString(); - loadFromJson(json, message); + loadFromYaml(contents, message); } else { loadFromJson(contents, message); } diff --git a/source/common/protobuf/utility.h b/source/common/protobuf/utility.h index 31d5f9491d802..f49305a44c5d7 100644 --- a/source/common/protobuf/utility.h +++ b/source/common/protobuf/utility.h @@ -66,8 +66,23 @@ class MessageUtil { } static void loadFromJson(const std::string& json, Protobuf::Message& message); + static void loadFromYaml(const std::string& yaml, Protobuf::Message& message); static void loadFromFile(const std::string& path, Protobuf::Message& message); + /** + * Convert from google.protobuf.Any to a typed message. + * @param message source google.protobuf.Any message. + * @return MessageType the typed message inside the Any. + */ + template + static inline MessageType anyConvert(const ProtobufWkt::Any& message) { + MessageType typed_message; + if (!message.UnpackTo(&typed_message)) { + throw EnvoyException("Unable to unpack " + message.DebugString()); + } + return typed_message; + }; + /** * Convert between two protobufs via a JSON round-trip. This is used to translate arbitrary * messages to/from google.protobuf.Struct. diff --git a/source/common/router/rds_subscription.cc b/source/common/router/rds_subscription.cc index fd42c1789401b..313b90d982c91 100644 --- a/source/common/router/rds_subscription.cc +++ b/source/common/router/rds_subscription.cc @@ -16,9 +16,10 @@ RdsSubscription::RdsSubscription(Envoy::Config::SubscriptionStats stats, Upstream::ClusterManager& cm, Event::Dispatcher& dispatcher, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info) - : RestApiFetcher( - cm, rds.config_source().api_config_source().cluster_name()[0], dispatcher, random, - Config::Utility::apiConfigSourceRefreshDelay(rds.config_source().api_config_source())), + : RestApiFetcher(cm, rds.config_source().api_config_source().cluster_name()[0], dispatcher, + random, + Envoy::Config::Utility::apiConfigSourceRefreshDelay( + rds.config_source().api_config_source())), local_info_(local_info), stats_(stats) { const auto& api_config_source = rds.config_source().api_config_source(); UNREFERENCED_PARAMETER(api_config_source); @@ -48,7 +49,7 @@ void RdsSubscription::parseResponse(const Http::Message& response) { Envoy::Config::RdsJson::translateRouteConfiguration(*response_json, *resources.Add()); resources[0].set_name(route_config_name_); callbacks_->onConfigUpdate(resources); - version_info_ = Config::Utility::computeHashedVersion(response_body); + version_info_ = Envoy::Config::Utility::computeHashedVersion(response_body); stats_.update_success_.inc(); } diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 059a71656d7f7..88a380a7db320 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -161,10 +161,12 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra ThreadLocal::SlotAllocator& tls, Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, - AccessLog::AccessLogManager& log_manager) + AccessLog::AccessLogManager& log_manager, + Event::Dispatcher& primary_dispatcher) : factory_(factory), runtime_(runtime), stats_(stats), tls_(tls.allocateSlot()), random_(random), local_info_(local_info), cm_stats_(generateStats(stats)), - ads_api_(bootstrap.dynamic_resources().ads_config(), *this) { + ads_api_(bootstrap.node(), bootstrap.dynamic_resources().ads_config(), *this, + primary_dispatcher) { const auto& cm_config = bootstrap.cluster_manager(); if (cm_config.has_outlier_detection()) { const std::string event_log_file_path = cm_config.outlier_detection().event_log_path(); @@ -220,6 +222,7 @@ ClusterManagerImpl::ClusterManagerImpl(const envoy::api::v2::Bootstrap& bootstra } init_helper_.onStaticLoadComplete(); + ads_api_.start(); } ClusterManagerStats ClusterManagerImpl::generateStats(Stats::Scope& scope) { @@ -483,6 +486,13 @@ void ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools( host_http_conn_pool_map_.erase(old_host); } }); + + // The above addDrainedCallback() drain completion callback might execute immediately. This can + // then effectively nuke 'container', which means we can't continue to loop on its contents + // (we're done here). + if (host_http_conn_pool_map_.count(old_host) == 0) { + break; + } } } @@ -578,7 +588,7 @@ ClusterManagerPtr ProdClusterManagerFactory::clusterManagerFromProto( Runtime::Loader& runtime, Runtime::RandomGenerator& random, const LocalInfo::LocalInfo& local_info, AccessLog::AccessLogManager& log_manager) { return ClusterManagerPtr{new ClusterManagerImpl(bootstrap, *this, stats, tls, runtime, random, - local_info, log_manager)}; + local_info, log_manager, primary_dispatcher_)}; } Http::ConnectionPool::InstancePtr diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 7e43be271efad..424c61f37623f 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -36,8 +36,8 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { Ssl::ContextManager& ssl_context_manager, Event::Dispatcher& primary_dispatcher, const LocalInfo::LocalInfo& local_info) - : runtime_(runtime), stats_(stats), tls_(tls), random_(random), dns_resolver_(dns_resolver), - ssl_context_manager_(ssl_context_manager), primary_dispatcher_(primary_dispatcher), + : primary_dispatcher_(primary_dispatcher), runtime_(runtime), stats_(stats), tls_(tls), + random_(random), dns_resolver_(dns_resolver), ssl_context_manager_(ssl_context_manager), local_info_(local_info) {} // Upstream::ClusterManagerFactory @@ -57,6 +57,9 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { const Optional& eds_config, ClusterManager& cm) override; +protected: + Event::Dispatcher& primary_dispatcher_; + private: Runtime::Loader& runtime_; Stats::Store& stats_; @@ -64,7 +67,6 @@ class ProdClusterManagerFactory : public ClusterManagerFactory { Runtime::RandomGenerator& random_; Network::DnsResolverSharedPtr dns_resolver_; Ssl::ContextManager& ssl_context_manager_; - Event::Dispatcher& primary_dispatcher_; const LocalInfo::LocalInfo& local_info_; }; @@ -126,7 +128,8 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable& cluster_names) override { - EXPECT_CALL(ads_api_, subscribe_("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", + EXPECT_CALL(ads_api_, subscribe_(Config::TypeUrl::get().ClusterLoadAssignment, ElementsAreArray(cluster_names), _)) .WillOnce(Return(resetAdsWatch())); subscription_->start(cluster_names, callbacks_); @@ -72,7 +73,7 @@ class AdsSubscriptionTestHarness : public SubscriptionTestHarness { void updateResources(const std::vector& cluster_names) override { EXPECT_CALL(*ads_watch_, cancel()); - EXPECT_CALL(ads_api_, subscribe_("type.googleapis.com/envoy.api.v2.ClusterLoadAssignment", + EXPECT_CALL(ads_api_, subscribe_(Config::TypeUrl::get().ClusterLoadAssignment, ElementsAreArray(cluster_names), _)) .WillOnce(Return(resetAdsWatch())); subscription_->updateResources(cluster_names); diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 1d076c796f5b0..3f54913d708e0 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -109,7 +109,7 @@ class ClusterManagerImplTest : public testing::Test { void create(const envoy::api::v2::Bootstrap& bootstrap) { cluster_manager_.reset(new ClusterManagerImpl( bootstrap, factory_, factory_.stats_, factory_.tls_, factory_.runtime_, factory_.random_, - factory_.local_info_, log_manager_)); + factory_.local_info_, log_manager_, factory_.dispatcher_)); } NiceMock factory_; @@ -747,9 +747,57 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { // drain callbacks, etc. dns_timer_->callback_(); dns_callback(TestUtility::makeDnsResponse({"127.0.0.2", "127.0.0.3"})); - dns_timer_->callback_(); + + factory_.tls_.shutdownThread(); +} + +// This is a regression test for a use-after-free in +// ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools(), where a removal at one +// priority from the ConnPoolsContainer would delete the ConnPoolsContainer mid-iteration over the +// pool. +TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { + const std::string json = R"EOF( + { + "clusters": [ + { + "name": "cluster_1", + "connect_timeout_ms": 250, + "type": "strict_dns", + "dns_resolvers": [ "1.2.3.4:80" ], + "lb_type": "round_robin", + "hosts": [{"url": "tcp://localhost:11001"}] + }] + } + )EOF"; + + std::shared_ptr dns_resolver(new Network::MockDnsResolver()); + EXPECT_CALL(factory_.dispatcher_, createDnsResolver(_)).WillOnce(Return(dns_resolver)); + + Network::DnsResolver::ResolveCb dns_callback; + Event::MockTimer* dns_timer_ = new NiceMock(&factory_.dispatcher_); + Network::MockActiveDnsQuery active_dns_query; + EXPECT_CALL(*dns_resolver, resolve(_, _, _)) + .WillRepeatedly(DoAll(SaveArg<2>(&dns_callback), Return(&active_dns_query))); + create(parseBootstrapFromJson(json)); + EXPECT_FALSE(cluster_manager_->get("cluster_1")->info()->addedViaApi()); + dns_callback(TestUtility::makeDnsResponse({"127.0.0.2"})); + EXPECT_CALL(factory_, allocateConnPool_(_)) + .WillOnce(ReturnNew()); + + Http::ConnectionPool::MockInstance* cp = dynamic_cast( + cluster_manager_->httpConnPoolForCluster("cluster_1", ResourcePriority::Default, nullptr)); + + // Immediate drain, since this can happen with the HTTP codecs. + EXPECT_CALL(*cp, addDrainedCallback(_)) + .WillOnce(Invoke([](Http::ConnectionPool::Instance::DrainedCb cb) { cb(); })); + + // Remove the first host, this should lead to the cp being drained, without + // crash. + dns_timer_->callback_(); + dns_callback(TestUtility::makeDnsResponse({})); + factory_.tls_.shutdownThread(); } diff --git a/test/config/integration/BUILD b/test/config/integration/BUILD index d94a60daf92e5..6da56ad7720e6 100644 --- a/test/config/integration/BUILD +++ b/test/config/integration/BUILD @@ -10,6 +10,7 @@ envoy_package() exports_files([ "echo_server.json", "server.json", + "server_ads.yaml", "server_grpc_json_transcoder.json", "server_http2.json", "server_http2_upstream.json", diff --git a/test/config/integration/server_ads.yaml b/test/config/integration/server_ads.yaml new file mode 100644 index 0000000000000..d2f5e05b338a0 --- /dev/null +++ b/test/config/integration/server_ads.yaml @@ -0,0 +1,23 @@ +dynamic_resources: + lds_config: {ads: {}} + cds_config: {ads: {}} + ads_config: + api_type: GRPC + cluster_name: [ads_cluster] +static_resources: + clusters: + - name: ads_cluster + connect_timeout: { seconds: 5 } + type: STATIC + hosts: + - socket_address: + address: {{ ntop_ip_loopback_address }} + port_value: {{ ads_upstream }} + lb_policy: ROUND_ROBIN + http2_protocol_options: {} +admin: + access_log_path: /dev/null + address: + socket_address: + address: {{ ntop_ip_loopback_address }} + port_value: 0 diff --git a/test/integration/BUILD b/test/integration/BUILD index bb3fcc6ba2a3e..7110147e71408 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -11,6 +11,26 @@ load( envoy_package() +envoy_cc_test( + name = "ads_integration_test", + srcs = ["ads_integration_test.cc"], + data = ["//test/config/integration:server_ads.yaml"], + external_deps = [ + "envoy_cds", + "envoy_discovery", + "envoy_eds", + "envoy_lds", + "envoy_rds", + ], + deps = [ + ":http_integration_lib", + "//source/common/config:resources_lib", + "//source/common/protobuf:utility_lib", + "//test/test_common:network_utility_lib", + "//test/test_common:utility_lib", + ], +) + envoy_cc_test( name = "legacy_json_integration_test", srcs = ["legacy_json_integration_test.cc"], @@ -171,6 +191,7 @@ envoy_cc_test_library( "//include/envoy/api:api_interface", "//include/envoy/buffer:buffer_interface", "//include/envoy/event:dispatcher_interface", + "//include/envoy/grpc:status", "//include/envoy/http:codec_interface", "//include/envoy/http:header_map_interface", "//include/envoy/network:connection_interface", @@ -180,9 +201,12 @@ envoy_cc_test_library( "//include/envoy/server:options_interface", "//source/common/api:api_lib", "//source/common/buffer:buffer_lib", + "//source/common/buffer:zero_copy_input_stream_lib", "//source/common/common:assert_lib", "//source/common/common:logger_lib", "//source/common/common:thread_lib", + "//source/common/grpc:codec_lib", + "//source/common/grpc:common_lib", "//source/common/http:codec_client_lib", "//source/common/http:header_map_lib", "//source/common/http:headers_lib", diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc new file mode 100644 index 0000000000000..ecbb9ad165b59 --- /dev/null +++ b/test/integration/ads_integration_test.cc @@ -0,0 +1,238 @@ +#include "common/config/resources.h" +#include "common/protobuf/utility.h" + +#include "test/integration/http_integration.h" +#include "test/integration/utility.h" +#include "test/test_common/network_utility.h" +#include "test/test_common/utility.h" + +#include "api/cds.pb.h" +#include "api/discovery.pb.h" +#include "api/eds.pb.h" +#include "api/lds.pb.h" +#include "api/rds.pb.h" +#include "gtest/gtest.h" + +namespace Envoy { +namespace { + +class AdsIntegrationTest : public HttpIntegrationTest, + public testing::TestWithParam { +public: + AdsIntegrationTest() : HttpIntegrationTest(Http::CodecClient::Type::HTTP2, GetParam()) {} + + void SetUp() override { + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); + registerPort("endpoint", fake_upstreams_.back()->localAddress()->ip()->port()); + fake_upstreams_.emplace_back(new FakeUpstream(0, FakeHttpConnection::Type::HTTP2, version_)); + registerPort("ads_upstream", fake_upstreams_.back()->localAddress()->ip()->port()); + createTestServer("test/config/integration/server_ads.yaml", {"http"}); + } + + void TearDown() override { + test_server_.reset(); + fake_upstreams_.clear(); + } + + void expectDiscoveryRequest(const std::string& type_url, const std::string& version) { + envoy::api::v2::DiscoveryRequest discovery_request; + ads_stream_->waitForGrpcMessage(*dispatcher_, discovery_request); + EXPECT_EQ(type_url, discovery_request.type_url()); + EXPECT_TRUE(discovery_request.resource_names().empty()); + EXPECT_EQ(version, discovery_request.version_info()); + } + + void sendDiscoveryResponse(const std::string& type_url, const Protobuf::Message& message, + const std::string& version) { + envoy::api::v2::DiscoveryResponse discovery_response; + discovery_response.set_version_info(version); + discovery_response.set_type_url(type_url); + discovery_response.add_resources()->PackFrom(message); + ads_stream_->sendGrpcMessage(discovery_response); + } + + envoy::api::v2::Cluster buildCluster(const std::string& name) { + return TestUtility::parseYaml(fmt::format(R"EOF( + name: {} + connect_timeout: 5s + type: EDS + eds_cluster_config: {{ eds_config: {{ ads: {{}} }} }} + lb_policy: ROUND_ROBIN + http2_protocol_options: {{}} + )EOF", + name)); + } + + envoy::api::v2::ClusterLoadAssignment buildClusterLoadAssignment(const std::string& name) { + return TestUtility::parseYaml( + fmt::format(R"EOF( + cluster_name: {} + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: {} + port_value: {} + )EOF", + name, Network::Test::getLoopbackAddressString(GetParam()), + fake_upstreams_[0]->localAddress()->ip()->port())); + } + + envoy::api::v2::Listener buildListener(const std::string& name, const std::string& route_config) { + return TestUtility::parseYaml( + fmt::format(R"EOF( + name: {} + address: + socket_address: + address: {} + port_value: 0 + filter_chains: + filters: + - name: envoy.http_connection_manager + config: + codec_type: HTTP2 + rds: + route_config_name: {} + config_source: {{ ads: {{}} }} + http_filters: [{{ name: envoy.router, config: {{ deprecated_v1: true }}}}] + )EOF", + name, Network::Test::getLoopbackAddressString(GetParam()), route_config)); + } + + envoy::api::v2::RouteConfiguration buildRouteConfig(const std::string& name, + const std::string& cluster) { + return TestUtility::parseYaml(fmt::format(R"EOF( + name: {} + virtual_hosts: + - name: integration + domains: ["*"] + routes: + - match: {{ prefix: "/" }} + route: {{ cluster: {} }} + )EOF", + name, cluster)); + } + + void makeSingleRequest() { + registerTestServerPorts({"http"}); + auto client_conn = makeClientConnection(lookupPort("http")); + testRouterHeaderOnlyRequestAndResponse(std::move(client_conn), true); + cleanupUpstreamAndDownstream(); + fake_upstream_connection_ = nullptr; + } + + void initialize() { + ads_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_); + ads_stream_ = ads_connection_->waitForNewStream(); + ads_stream_->startGrpcStream(); + } + + FakeHttpConnectionPtr ads_connection_; + FakeStreamPtr ads_stream_; +}; + +INSTANTIATE_TEST_CASE_P(IpVersions, AdsIntegrationTest, + testing::ValuesIn(TestEnvironment::getIpVersionsForTest())); + +// Validate basic config delivery and upgrade. +TEST_P(AdsIntegrationTest, Basic) { + initialize(); + + // Send initial configuration, validate we can process a request. + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, ""); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, ""); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, "1"); + expectDiscoveryRequest(Config::TypeUrl::get().Listener, ""); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_0", "route_config_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1"); + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, ""); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Listener, "1"); + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1"); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); + + // Upgrade RDS/CDS/EDS to a newer config, validate we can process a request. + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_1"), "2"); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_1"), "2"); + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, "2"); + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "2"); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_1"), "2"); + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "2"); + + makeSingleRequest(); + + // Upgrade LDS/RDS, validate we can process a request. + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_1", "route_config_1"), "2"); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_1", "cluster_1"), "3"); + expectDiscoveryRequest(Config::TypeUrl::get().Listener, "2"); + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "3"); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 2); + makeSingleRequest(); +} + +// Validate that we can recover from failures. +TEST_P(AdsIntegrationTest, Failure) { + initialize(); + + // Send initial configuration, failing each xDS once (via a type mismatch), validate we can + // process a request. + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, ""); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildClusterLoadAssignment("cluster_0"), + "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Listener, ""); + + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, ""); + sendDiscoveryResponse(Config::TypeUrl::get().Cluster, buildCluster("cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, ""); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, buildCluster("cluster_0"), + "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Cluster, "1"); + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, ""); + sendDiscoveryResponse(Config::TypeUrl::get().ClusterLoadAssignment, + buildClusterLoadAssignment("cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1"); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildRouteConfig("listener_0", "route_config_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Listener, ""); + sendDiscoveryResponse(Config::TypeUrl::get().Listener, + buildListener("listener_0", "route_config_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, ""); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildListener("route_config_0", "cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().Listener, "1"); + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, ""); + sendDiscoveryResponse(Config::TypeUrl::get().RouteConfiguration, + buildRouteConfig("route_config_0", "cluster_0"), "1"); + + expectDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1"); + + test_server_->waitForCounterGe("listener_manager.listener_create_success", 1); + makeSingleRequest(); +} + +} // namespace +} // namespace Envoy diff --git a/test/integration/fake_upstream.cc b/test/integration/fake_upstream.cc index ad778d71f2706..67a64e99d5585 100644 --- a/test/integration/fake_upstream.cc +++ b/test/integration/fake_upstream.cc @@ -98,10 +98,10 @@ void FakeStream::waitForHeadersComplete() { void FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length) { std::unique_lock lock(lock_); - while (bodyLength() != body_length) { + while (bodyLength() < body_length) { decoder_event_.wait_until(lock, std::chrono::system_clock::now() + std::chrono::milliseconds(5)); - if (bodyLength() != body_length) { + if (bodyLength() < body_length) { // Run the client dispatcher since we may need to process window updates, etc. client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); } @@ -127,6 +127,15 @@ void FakeStream::waitForReset() { } } +void FakeStream::startGrpcStream() { + encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); +} + +void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) { + encodeTrailers( + Http::TestHeaderMapImpl{{"grpc-status", std::to_string(static_cast(status))}}); +} + FakeHttpConnection::FakeHttpConnection(QueuedConnectionWrapperPtr connection_wrapper, Stats::Store& store, Type type) : FakeConnectionBase(std::move(connection_wrapper)) { diff --git a/test/integration/fake_upstream.h b/test/integration/fake_upstream.h index 6acd3d66c18a6..e724351a43cad 100644 --- a/test/integration/fake_upstream.h +++ b/test/integration/fake_upstream.h @@ -8,6 +8,7 @@ #include #include "envoy/api/api.h" +#include "envoy/grpc/status.h" #include "envoy/http/codec.h" #include "envoy/network/connection.h" #include "envoy/network/connection_handler.h" @@ -15,7 +16,10 @@ #include "envoy/server/configuration.h" #include "common/buffer/buffer_impl.h" +#include "common/buffer/zero_copy_input_stream_impl.h" #include "common/common/thread.h" +#include "common/grpc/codec.h" +#include "common/grpc/common.h" #include "common/network/filter_impl.h" #include "common/network/listen_socket_impl.h" #include "common/stats/stats_impl.h" @@ -29,7 +33,9 @@ class FakeHttpConnection; /** * Provides a fake HTTP stream for integration testing. */ -class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { +class FakeStream : public Http::StreamDecoder, + public Http::StreamCallbacks, + Logger::Loggable { public: FakeStream(FakeHttpConnection& parent, Http::StreamEncoder& encoder); @@ -48,6 +54,35 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { void waitForEndStream(Event::Dispatcher& client_dispatcher); void waitForReset(); + // gRPC convenience methods. + void startGrpcStream(); + void finishGrpcStream(Grpc::Status::GrpcStatus status); + template void sendGrpcMessage(const T& message) { + auto serialized_response = Grpc::Common::serializeBody(message); + encodeData(*serialized_response, false); + } + template void decodeGrpcFrame(T& message) { + EXPECT_GE(decoded_grpc_frames_.size(), 1); + Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_)); + EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT); + EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream)); + ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString()); + decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); + } + template void waitForGrpcMessage(Event::Dispatcher& client_dispatcher, T& message) { + if (!decoded_grpc_frames_.empty()) { + decodeGrpcFrame(message); + return; + } + waitForData(client_dispatcher, 5); + EXPECT_TRUE(grpc_decoder_.decode(body(), decoded_grpc_frames_)); + if (decoded_grpc_frames_.size() < 1) { + waitForData(client_dispatcher, grpc_decoder_.length()); + EXPECT_TRUE(grpc_decoder_.decode(body(), decoded_grpc_frames_)); + } + decodeGrpcFrame(message); + } + // Http::StreamDecoder void decodeHeaders(Http::HeaderMapPtr&& headers, bool end_stream) override; void decodeData(Buffer::Instance& data, bool end_stream) override; @@ -68,6 +103,8 @@ class FakeStream : public Http::StreamDecoder, public Http::StreamCallbacks { bool end_stream_{}; Buffer::OwnedImpl body_; bool saw_reset_{}; + Grpc::Decoder grpc_decoder_; + std::vector decoded_grpc_frames_; }; typedef std::unique_ptr FakeStreamPtr; diff --git a/test/integration/ratelimit_integration_test.cc b/test/integration/ratelimit_integration_test.cc index b0c145d1c737d..fb117487b6f73 100644 --- a/test/integration/ratelimit_integration_test.cc +++ b/test/integration/ratelimit_integration_test.cc @@ -40,6 +40,8 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, void waitForRatelimitRequest() { fake_ratelimit_connection_ = fake_upstreams_[1]->waitForHttpConnection(*dispatcher_); ratelimit_request_ = fake_ratelimit_connection_->waitForNewStream(); + pb::lyft::ratelimit::RateLimitRequest request_msg; + ratelimit_request_->waitForGrpcMessage(*dispatcher_, request_msg); ratelimit_request_->waitForEndStream(*dispatcher_); EXPECT_STREQ("POST", ratelimit_request_->headers().Method()->value().c_str()); EXPECT_STREQ("/pb.lyft.ratelimit.RateLimitService/ShouldRateLimit", @@ -51,16 +53,6 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, auto* entry = expected_request_msg.add_descriptors()->add_entries(); entry->set_key("destination_cluster"); entry->set_value("traffic"); - - Grpc::Decoder decoder; - std::vector decoded_frames; - EXPECT_TRUE(decoder.decode(ratelimit_request_->body(), decoded_frames)); - EXPECT_EQ(1, decoded_frames.size()); - pb::lyft::ratelimit::RateLimitRequest request_msg; - Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_frames[0].data_)); - EXPECT_TRUE(decoded_frames[0].flags_ == Grpc::GRPC_FH_DEFAULT); - EXPECT_TRUE(request_msg.ParseFromZeroCopyStream(&stream)); - EXPECT_EQ(expected_request_msg.DebugString(), request_msg.DebugString()); } @@ -89,12 +81,11 @@ class RatelimitIntegrationTest : public HttpIntegrationTest, } void sendRateLimitResponse(pb::lyft::ratelimit::RateLimitResponse_Code code) { - ratelimit_request_->encodeHeaders(Http::TestHeaderMapImpl{{":status", "200"}}, false); + ratelimit_request_->startGrpcStream(); pb::lyft::ratelimit::RateLimitResponse response_msg; response_msg.set_overall_code(code); - auto serialized_response = Grpc::Common::serializeBody(response_msg); - ratelimit_request_->encodeData(*serialized_response, false); - ratelimit_request_->encodeTrailers(Http::TestHeaderMapImpl{{"grpc-status", "0"}}); + ratelimit_request_->sendGrpcMessage(response_msg); + ratelimit_request_->finishGrpcStream(Grpc::Status::Ok); } void cleanup() { diff --git a/test/mocks/config/mocks.h b/test/mocks/config/mocks.h index 423159984c94e..d0c4d24a04140 100644 --- a/test/mocks/config/mocks.h +++ b/test/mocks/config/mocks.h @@ -39,6 +39,7 @@ class MockAdsApi : public AdsApi { MockAdsApi(); virtual ~MockAdsApi(); + MOCK_METHOD0(start, void()); MOCK_METHOD3(subscribe_, AdsWatch*(const std::string& type_url, const std::vector& resources, AdsCallbacks& callbacks)); diff --git a/test/test_common/BUILD b/test/test_common/BUILD index 9b73e4a0ccc3b..840a763af774b 100644 --- a/test/test_common/BUILD +++ b/test/test_common/BUILD @@ -70,5 +70,6 @@ envoy_cc_test_library( "//source/common/json:json_loader_lib", "//source/common/network:address_lib", "//source/common/network:utility_lib", + "//source/common/protobuf:utility_lib", ], ) diff --git a/test/test_common/utility.h b/test/test_common/utility.h index f3adbcdb09486..589e07797e9ee 100644 --- a/test/test_common/utility.h +++ b/test/test_common/utility.h @@ -14,6 +14,7 @@ #include "envoy/stats/stats.h" #include "common/http/header_map_impl.h" +#include "common/protobuf/utility.h" #include "test/test_common/printers.h" @@ -126,6 +127,17 @@ class TestUtility { return "127.0.0.9"; #endif } + + /** + * Return typed proto message object for YAML. + * @param yaml YAML string. + * @return MessageType parsed from yaml. + */ + template static MessageType parseYaml(const std::string& yaml) { + MessageType message; + MessageUtil::loadFromYaml(yaml, message); + return message; + } }; /**