From 44fc687e3af6a2b0f0c1c0f3e5b268097dc0f7c2 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Wed, 28 Mar 2018 19:34:35 -0400 Subject: [PATCH 01/15] Add support for collecting services. --- src/kubernetes.cc | 237 ++++++++++++++++++++++++++++++++++++++++++++++ src/kubernetes.h | 48 ++++++++++ 2 files changed, 285 insertions(+) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index abfed910..64fdf9a3 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -53,6 +53,7 @@ constexpr const char kGkeContainerResourcePrefix[] = "gke_container"; constexpr const char kK8sContainerResourcePrefix[] = "k8s_container"; constexpr const char kK8sPodResourcePrefix[] = "k8s_pod"; constexpr const char kK8sNodeResourcePrefix[] = "k8s_node"; +constexpr const char kK8sClusterResourcePrefix[] = "k8s_cluster"; constexpr const char kNodeSelectorPrefix[] = "?fieldSelector=spec.nodeName%3D"; @@ -453,6 +454,78 @@ KubernetesReader::GetPodAndContainerMetadata( return std::move(result); } + +std::vector KubernetesReader::GetServiceList( + const std::string cluster_name, const std::string location +) const throw(json::Exception) { + std::lock_guard lock(service_mutex_); + std::vector service_list; + for (auto const& service_it : service_to_metadata_) { + const std::string service_key = service_it.first; + const std::string namespace_name = + service_key.substr(0, service_key.find("/")); + json::value service_metadata = service_it.second->Clone(); + std::vector pod_names; + auto endpoints_it = service_to_pods_.find(service_key); + if (endpoints_it != service_to_pods_.end()) { + pod_names = endpoints_it->second; + } + std::vector pod_resources; + for (const std::string& pod_name : pod_names) { + const MonitoredResource k8s_pod("k8s_pod", { + {"cluster_name", cluster_name}, + {"namespace_name", namespace_name}, + {"pod_name", pod_name}, + {"location", location}, + }); + pod_resources.push_back(k8s_pod.ToJSON()); + } + service_list.emplace_back(json::object({ + {"api", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", std::move(service_metadata)}, + {"pods", json::array(std::move(pod_resources))}, + })}, + })); + } + return service_list; +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( + Timestamp collected_at, bool is_deleted) const throw(json::Exception) { + const std::string cluster_name = environment_.KubernetesClusterName(); + const std::string location = environment_.KubernetesClusterLocation(); + std::vector service_list = + GetServiceList(cluster_name, location); + const MonitoredResource k8s_cluster("k8s_cluster", { + {"cluster_name", cluster_name}, + {"location", location}, + }); + json::value cluster_raw_metadata = json::object({ + {"blobs", json::object({ + {"services", json::array(std::move(service_list))}, + })}, + }); + if (config_.VerboseLogging()) { + LOG(INFO) << "Raw cluster metadata: " << *cluster_raw_metadata; + } + + // There is no created_at for the cluster since the metadata contains + // ALL current services. + Timestamp created_at = time_point(); + return MetadataUpdater::ResourceMetadata( + std::vector{}, + k8s_cluster, +#ifdef ENABLE_KUBERNETES_METADATA + MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), + is_deleted, created_at, collected_at, + std::move(cluster_raw_metadata)) +#else + MetadataStore::Metadata::IGNORED() +#endif + ); +} + std::vector KubernetesReader::MetadataQuery() const { if (config_.VerboseLogging()) { @@ -911,6 +984,93 @@ json::value KubernetesReader::FindTopLevelController( return FindTopLevelController(ns, GetOwner(ns, controller_ref)); } +void KubernetesReader::UpdateServiceToMetadataCache( + const json::Object* service, bool is_deleted) throw(json::Exception) { +#ifdef VERBOSE + LOG(DEBUG) << "UpdateServiceToMetadataCache(" << *service << ")"; +#endif + const json::Object* metadata = service->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string service_name = metadata->Get("name"); + const std::string encoded_ref = boost::algorithm::join( + std::vector{namespace_name, service_name}, "/"); + + std::lock_guard lock(service_mutex_); + auto service_it = service_to_metadata_.find(encoded_ref); + if (is_deleted) { + if (service_it != service_to_metadata_.end()) { + service_to_metadata_.erase(service_it); + } + } else { + json::value service_value = service->Clone(); + if (service_it == service_to_metadata_.end()) { + service_to_metadata_.emplace(encoded_ref, std::move(service_value)); + } else { + service_it->second = std::move(service_value); + } + } +} + +void KubernetesReader::UpdateServiceToPodsCache( + const json::Object* endpoints, bool is_deleted) throw(json::Exception) { +#ifdef VERBOSE + LOG(DEBUG) << "UpdateServiceToPodsCache(" << *endpoints << ")"; +#endif + const json::Object* metadata = endpoints->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + // Endpoints name is same as the matching service name. + const std::string service_name = metadata->Get("name"); + const std::string encoded_ref = boost::algorithm::join( + std::vector{namespace_name, service_name}, "/"); + std::vector pod_names; + + // Only extract the pod names when this is not a deletion. In the case of + // a deletion, we delete the mapping below. + if (!is_deleted && endpoints->Has("subsets") && + !endpoints->at("subsets")->Is()) { + const json::Array* subsets = endpoints->Get("subsets"); + for (const json::value& subset : *subsets) { + const json::Object* subset_obj = subset->As(); + if (!subset_obj->Has("addresses")) { + continue; + } + const json::Array* addresses = subset_obj->Get("addresses"); + for (const json::value& address : *addresses) { + const json::Object* address_obj = address->As(); + if (!address_obj->Has("targetRef")) { + continue; + } + const json::Object* ref = address_obj->Get("targetRef"); + if (!(ref->Has("kind") && ref->Has("name"))) { + continue; + } + const std::string& target_kind = ref->Get("kind"); + if (target_kind != "Pod") { + LOG(INFO) << "Found a resource other than a pod in Endpoint " + << service_name << "'s targetRef: " << target_kind; + continue; + } + const std::string& pod_name = ref->Get("name"); + pod_names.push_back(pod_name); + } + } + } + + std::lock_guard lock(service_mutex_); + auto service_it = service_to_pods_.find(encoded_ref); + if (is_deleted) { + if (service_it != service_to_pods_.end()) { + service_to_pods_.erase(service_it); + } + } else { + if (service_it == service_to_pods_.end()) { + service_to_pods_.emplace(encoded_ref, pod_names); + } else { + service_it->second = pod_names; + } + } +} + bool KubernetesReader::ValidateConfiguration() const { try { (void) QueryMaster(std::string(kKubernetesEndpointPath) + "/nodes?limit=1"); @@ -1013,6 +1173,72 @@ void KubernetesReader::WatchNodes( LOG(INFO) << "Watch thread (node) exiting"; } +void KubernetesReader::ServiceCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* service, Timestamp collected_at, bool is_deleted) + throw(json::Exception) { + UpdateServiceToMetadataCache(service, is_deleted); + + // TODO: using a temporary did not work here. + std::vector result_vector; + result_vector.emplace_back(GetClusterMetadata(collected_at, is_deleted)); + callback(std::move(result_vector)); +} + +void KubernetesReader::WatchServices(MetadataUpdater::UpdateCallback callback) { + LOG(INFO) << "Watch thread started for services"; + + try { + WatchMaster( + "Service", + std::string(kKubernetesEndpointPath) + "/watch/services/", + [=](const json::Object* service, Timestamp collected_at, bool is_deleted) { + ServiceCallback(callback, service, collected_at, is_deleted); + }); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + LOG(ERROR) << "No more service metadata will be collected"; + } catch (const KubernetesReader::QueryException& e) { + LOG(ERROR) << "No more service metadata will be collected"; + } + health_checker_->SetUnhealthy("kubernetes_service_thread"); + LOG(INFO) << "Watch thread (service) exiting"; +} + +void KubernetesReader::EndpointsCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* endpoints, Timestamp collected_at, bool is_deleted) + throw(json::Exception) { + UpdateServiceToPodsCache(endpoints, is_deleted); + + // TODO: using a temporary did not work here. + std::vector result_vector; + result_vector.emplace_back(GetClusterMetadata(collected_at, is_deleted)); + callback(std::move(result_vector)); +} + +void KubernetesReader::WatchEndpoints( + MetadataUpdater::UpdateCallback callback) { + LOG(INFO) << "Watch thread started for endpoints"; + + try { + WatchMaster( + "Endpoints", + std::string(kKubernetesEndpointPath) + "/watch/endpoints/", + [=](const json::Object* endpoints, Timestamp collected_at, + bool is_deleted) { + EndpointsCallback(callback, endpoints, collected_at, is_deleted); + }); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + LOG(ERROR) << "No more endpoints metadata will be collected"; + } catch (const KubernetesReader::QueryException& e) { + LOG(ERROR) << "No more endpoints metadata will be collected"; + } + health_checker_->SetUnhealthy("kubernetes_endpoints_thread"); + LOG(INFO) << "Watch thread (endpoints) exiting"; +} + KubernetesUpdater::KubernetesUpdater(const Configuration& config, HealthChecker* health_checker, MetadataStore* store) @@ -1055,6 +1281,14 @@ void KubernetesUpdater::StartUpdater() { pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(watched_node, cb); }); + if (config().KubernetesClusterLevelMetadata()) { + service_watch_thread_ = std::thread([=]() { + reader_.WatchServices(cb); + }); + endpoints_watch_thread_ = std::thread([=]() { + reader_.WatchEndpoints(cb); + }); + } } else { // Only try to poll if watch is disabled. PollingMetadataUpdater::StartUpdater(); @@ -1064,6 +1298,9 @@ void KubernetesUpdater::StartUpdater() { void KubernetesUpdater::MetadataCallback( std::vector&& result_vector) { for (MetadataUpdater::ResourceMetadata& result : result_vector) { +#ifdef VERBOSE + LOG(DEBUG) << "MetadataCallback (" << result << ")"; +#endif UpdateResourceCallback(result); UpdateMetadataCallback(std::move(result)); } diff --git a/src/kubernetes.h b/src/kubernetes.h index 1ca245ca..48a6d13b 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -55,6 +55,12 @@ class KubernetesReader { void WatchPods(const std::string& node_name, MetadataUpdater::UpdateCallback callback) const; + // Service watcher. + void WatchServices(MetadataUpdater::UpdateCallback callback); + + // Endpoints watcher. + void WatchEndpoints(MetadataUpdater::UpdateCallback callback); + // Gets the name of the node the agent is running on. // Returns an empty string if unable to find the current node. const std::string& CurrentNode() const; @@ -95,6 +101,16 @@ class KubernetesReader { MetadataUpdater::UpdateCallback callback, const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); + // Service watch callback. + void ServiceCallback( + MetadataUpdater::UpdateCallback callback, const json::Object* service, + Timestamp collected_at, bool is_deleted) throw(json::Exception); + + // Endpoints watch callback. + void EndpointsCallback( + MetadataUpdater::UpdateCallback callback, const json::Object* endpoints, + Timestamp collected_at, bool is_deleted) throw(json::Exception); + // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); @@ -120,6 +136,14 @@ class KubernetesReader { std::vector GetPodAndContainerMetadata( const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); + // Get a list of service metadata based on the service level caches. + std::vector GetServiceList( + const std::string cluster_name, const std::string location) + const throw(json::Exception); + // Return the cluster metadata based on the cached values for + // service_to_metadta_ and service_to_pods_. + MetadataUpdater::ResourceMetadata GetClusterMetadata( + Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. @@ -141,6 +165,14 @@ class KubernetesReader { json::value FindTopLevelController(const std::string& ns, json::value object) const throw(QueryException, json::Exception); + // Update service_to_metadata_ cache based on a newly updated service. + void UpdateServiceToMetadataCache( + const json::Object* service, bool is_deleted) throw(json::Exception); + + // Update service_to_pods_ cache based on a newly updated endpoints. + void UpdateServiceToPodsCache( + const json::Object* endpoints, bool is_deleted) throw(json::Exception); + // Cached data. mutable std::recursive_mutex mutex_; mutable std::string current_node_; @@ -151,6 +183,14 @@ class KubernetesReader { version_to_kind_to_name_; // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; + // Mutex for the service related caches. + mutable std::recursive_mutex service_mutex_; + // Map from service key to service metadata. This map is built based on the + // response from WatchServices. + mutable std::map service_to_metadata_; + // Map from service key to names of pods in the service. This map is built + // based on the response from WatchEndpoints. + mutable std::map> service_to_pods_; const Configuration& config_; HealthChecker* health_checker_; @@ -168,6 +208,12 @@ class KubernetesUpdater : public PollingMetadataUpdater { if (pod_watch_thread_.joinable()) { pod_watch_thread_.join(); } + if (service_watch_thread_.joinable()) { + service_watch_thread_.join(); + } + if (endpoints_watch_thread_.joinable()) { + endpoints_watch_thread_.join(); + } } protected: @@ -182,6 +228,8 @@ class KubernetesUpdater : public PollingMetadataUpdater { HealthChecker* health_checker_; std::thread node_watch_thread_; std::thread pod_watch_thread_; + std::thread service_watch_thread_; + std::thread endpoints_watch_thread_; }; } From 5fa6771499c0a0f3e069c4a63ac6a4be269feb90 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 11:59:07 -0400 Subject: [PATCH 02/15] Address comments --- src/kubernetes.cc | 65 +++++++++++++++++++++-------------------------- src/kubernetes.h | 9 ++++--- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 64fdf9a3..4979cc3c 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -53,7 +53,6 @@ constexpr const char kGkeContainerResourcePrefix[] = "gke_container"; constexpr const char kK8sContainerResourcePrefix[] = "k8s_container"; constexpr const char kK8sPodResourcePrefix[] = "k8s_pod"; constexpr const char kK8sNodeResourcePrefix[] = "k8s_node"; -constexpr const char kK8sClusterResourcePrefix[] = "k8s_cluster"; constexpr const char kNodeSelectorPrefix[] = "?fieldSelector=spec.nodeName%3D"; @@ -454,22 +453,20 @@ KubernetesReader::GetPodAndContainerMetadata( return std::move(result); } - std::vector KubernetesReader::GetServiceList( - const std::string cluster_name, const std::string location -) const throw(json::Exception) { - std::lock_guard lock(service_mutex_); + const std::string& cluster_name, const std::string& location) const + throw(json::Exception) { + std::lock_guard lock(service_mutex_); std::vector service_list; - for (auto const& service_it : service_to_metadata_) { - const std::string service_key = service_it.first; + for (const auto& service_it : service_to_metadata_) { + const std::string& service_key = service_it.first; const std::string namespace_name = service_key.substr(0, service_key.find("/")); - json::value service_metadata = service_it.second->Clone(); - std::vector pod_names; + const json::value& service_metadata = service_it.second; auto endpoints_it = service_to_pods_.find(service_key); - if (endpoints_it != service_to_pods_.end()) { - pod_names = endpoints_it->second; - } + const std::vector& pod_names = + (endpoints_it != service_to_pods_.end()) ? endpoints_it->second + : kNoPods; std::vector pod_resources; for (const std::string& pod_name : pod_names) { const MonitoredResource k8s_pod("k8s_pod", { @@ -478,12 +475,12 @@ std::vector KubernetesReader::GetServiceList( {"pod_name", pod_name}, {"location", location}, }); - pod_resources.push_back(k8s_pod.ToJSON()); + pod_resources.emplace_back(k8s_pod.ToJSON()); } service_list.emplace_back(json::object({ {"api", json::object({ {"version", json::string(kKubernetesApiVersion)}, - {"raw", std::move(service_metadata)}, + {"raw", service_metadata->Clone()}, {"pods", json::array(std::move(pod_resources))}, })}, })); @@ -492,7 +489,7 @@ std::vector KubernetesReader::GetServiceList( } MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( - Timestamp collected_at, bool is_deleted) const throw(json::Exception) { + Timestamp collected_at) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); std::vector service_list = @@ -518,7 +515,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( k8s_cluster, #ifdef ENABLE_KUBERNETES_METADATA MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - is_deleted, created_at, collected_at, + /*is_deleted=*/ false, created_at, collected_at, std::move(cluster_raw_metadata)) #else MetadataStore::Metadata::IGNORED() @@ -995,18 +992,17 @@ void KubernetesReader::UpdateServiceToMetadataCache( const std::string encoded_ref = boost::algorithm::join( std::vector{namespace_name, service_name}, "/"); - std::lock_guard lock(service_mutex_); + std::lock_guard lock(service_mutex_); auto service_it = service_to_metadata_.find(encoded_ref); if (is_deleted) { if (service_it != service_to_metadata_.end()) { service_to_metadata_.erase(service_it); } } else { - json::value service_value = service->Clone(); if (service_it == service_to_metadata_.end()) { - service_to_metadata_.emplace(encoded_ref, std::move(service_value)); + service_to_metadata_.emplace(encoded_ref, service->Clone()); } else { - service_it->second = std::move(service_value); + service_it->second = service->Clone(); } } } @@ -1022,8 +1018,8 @@ void KubernetesReader::UpdateServiceToPodsCache( const std::string service_name = metadata->Get("name"); const std::string encoded_ref = boost::algorithm::join( std::vector{namespace_name, service_name}, "/"); - std::vector pod_names; + std::vector pod_names; // Only extract the pod names when this is not a deletion. In the case of // a deletion, we delete the mapping below. if (!is_deleted && endpoints->Has("subsets") && @@ -1044,31 +1040,27 @@ void KubernetesReader::UpdateServiceToPodsCache( if (!(ref->Has("kind") && ref->Has("name"))) { continue; } - const std::string& target_kind = ref->Get("kind"); + const std::string target_kind = ref->Get("kind"); if (target_kind != "Pod") { LOG(INFO) << "Found a resource other than a pod in Endpoint " << service_name << "'s targetRef: " << target_kind; continue; } - const std::string& pod_name = ref->Get("name"); + const std::string pod_name = ref->Get("name"); pod_names.push_back(pod_name); } } } - std::lock_guard lock(service_mutex_); - auto service_it = service_to_pods_.find(encoded_ref); + std::lock_guard lock(service_mutex_); if (is_deleted) { - if (service_it != service_to_pods_.end()) { - service_to_pods_.erase(service_it); - } + service_to_pods_.erase(encoded_ref); } else { - if (service_it == service_to_pods_.end()) { - service_to_pods_.emplace(encoded_ref, pod_names); - } else { - service_it->second = pod_names; - } + auto it_inserted = + service_to_pods_.emplace(encoded_ref, std::vector()); + service_to_pods_.at(encoded_ref) = pod_names; } + } bool KubernetesReader::ValidateConfiguration() const { @@ -1181,7 +1173,7 @@ void KubernetesReader::ServiceCallback( // TODO: using a temporary did not work here. std::vector result_vector; - result_vector.emplace_back(GetClusterMetadata(collected_at, is_deleted)); + result_vector.emplace_back(GetClusterMetadata(collected_at)); callback(std::move(result_vector)); } @@ -1192,7 +1184,8 @@ void KubernetesReader::WatchServices(MetadataUpdater::UpdateCallback callback) { WatchMaster( "Service", std::string(kKubernetesEndpointPath) + "/watch/services/", - [=](const json::Object* service, Timestamp collected_at, bool is_deleted) { + [=](const json::Object* service, Timestamp collected_at, + bool is_deleted) { ServiceCallback(callback, service, collected_at, is_deleted); }); } catch (const json::Exception& e) { @@ -1213,7 +1206,7 @@ void KubernetesReader::EndpointsCallback( // TODO: using a temporary did not work here. std::vector result_vector; - result_vector.emplace_back(GetClusterMetadata(collected_at, is_deleted)); + result_vector.emplace_back(GetClusterMetadata(collected_at)); callback(std::move(result_vector)); } diff --git a/src/kubernetes.h b/src/kubernetes.h index 48a6d13b..a8d7655b 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -138,12 +138,12 @@ class KubernetesReader { throw(json::Exception); // Get a list of service metadata based on the service level caches. std::vector GetServiceList( - const std::string cluster_name, const std::string location) + const std::string& cluster_name, const std::string& location) const throw(json::Exception); // Return the cluster metadata based on the cached values for // service_to_metadta_ and service_to_pods_. MetadataUpdater::ResourceMetadata GetClusterMetadata( - Timestamp collected_at, bool is_deleted) const throw(json::Exception); + Timestamp collected_at) const throw(json::Exception); // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. @@ -173,6 +173,9 @@ class KubernetesReader { void UpdateServiceToPodsCache( const json::Object* endpoints, bool is_deleted) throw(json::Exception); + // Const data. + const std::vector kNoPods; + // Cached data. mutable std::recursive_mutex mutex_; mutable std::string current_node_; @@ -184,7 +187,7 @@ class KubernetesReader { // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; // Mutex for the service related caches. - mutable std::recursive_mutex service_mutex_; + mutable std::mutex service_mutex_; // Map from service key to service metadata. This map is built based on the // response from WatchServices. mutable std::map service_to_metadata_; From 9a20bedd35501070a08f4993b3acbb94d88cdbf1 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 12:01:11 -0400 Subject: [PATCH 03/15] Add a config option for turning on service metadata --- src/configuration.cc | 8 +++++++- src/configuration.h | 5 +++++ src/kubernetes.cc | 2 +- 3 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/configuration.cc b/src/configuration.cc index f8514f07..80af0227 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -69,6 +69,7 @@ constexpr const char kKubernetesDefaultClusterLocation[] = ""; constexpr const char kKubernetesDefaultNodeName[] = ""; constexpr const bool kKubernetesDefaultUseWatch = false; constexpr const bool kKubernetesDefaultClusterLevelMetadata = false; +constexpr const bool kKubernetesDefaultServiceMetadata = true; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; constexpr const char kDefaultHealthCheckFile[] = @@ -113,6 +114,8 @@ Configuration::Configuration() kubernetes_use_watch_(kKubernetesDefaultUseWatch), kubernetes_cluster_level_metadata_( kKubernetesDefaultClusterLevelMetadata), + kubernetes_service_metadata_( + kKubernetesDefaultServiceMetadata), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone), health_check_file_(kDefaultHealthCheckFile) {} @@ -270,7 +273,10 @@ void Configuration::ParseConfiguration(std::istream& input) { config["KubernetesUseWatch"].as(kubernetes_use_watch_); kubernetes_cluster_level_metadata_ = config["KubernetesClusterLevelMetadata"].as( - kubernetes_cluster_level_metadata_); + kKubernetesDefaultClusterLevelMetadata); + kubernetes_service_metadata_ = + config["KubernetesServiceMetadata"].as( + kKubernetesDefaultServiceMetadata); instance_id_ = config["InstanceId"].as(instance_id_); instance_zone_ = diff --git a/src/configuration.h b/src/configuration.h index c275b080..4b4cb98c 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -140,6 +140,10 @@ class Configuration { std::lock_guard lock(mutex_); return kubernetes_cluster_level_metadata_; } + bool KubernetesServiceMetadata() const { + std::lock_guard lock(mutex_); + return kubernetes_service_metadata_; + } // Common metadata updater options. const std::string& InstanceId() const { std::lock_guard lock(mutex_); @@ -194,6 +198,7 @@ class Configuration { std::string kubernetes_node_name_; bool kubernetes_use_watch_; bool kubernetes_cluster_level_metadata_; + bool kubernetes_service_metadata_; std::string instance_id_; std::string instance_zone_; std::string health_check_file_; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 4979cc3c..af7890b9 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -1274,7 +1274,7 @@ void KubernetesUpdater::StartUpdater() { pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(watched_node, cb); }); - if (config().KubernetesClusterLevelMetadata()) { + if (config().KubernetesServiceMetadata()) { service_watch_thread_ = std::thread([=]() { reader_.WatchServices(cb); }); From baed17406c7ee19083cdb77d6a84771274fc52d3 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 12:07:45 -0400 Subject: [PATCH 04/15] Update unit test for configuration.cc --- test/configuration_unittest.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/configuration_unittest.cc b/test/configuration_unittest.cc index 7097c58e..4e0cf574 100644 --- a/test/configuration_unittest.cc +++ b/test/configuration_unittest.cc @@ -32,6 +32,8 @@ void VerifyDefaultConfig(const Configuration& config) { EXPECT_EQ("", config.KubernetesClusterLocation()); EXPECT_EQ("", config.KubernetesNodeName()); EXPECT_EQ(false, config.KubernetesUseWatch()); + EXPECT_EQ(false, config.KubernetesClusterLevelMetadata()); + EXPECT_EQ(true, config.KubernetesServiceMetadata()); EXPECT_EQ("", config.InstanceId()); EXPECT_EQ("", config.InstanceZone()); EXPECT_EQ("/var/run/metadata-agent/health/unhealthy", config.HealthCheckFile()); From 96f9293656648f4fc7ff3152ed5cb36bc246d9fd Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 20:19:05 -0400 Subject: [PATCH 05/15] Address comments 2 --- src/configuration.cc | 5 ++--- src/kubernetes.cc | 30 +++++++++++++++--------------- src/kubernetes.h | 12 +++++++----- test/configuration_unittest.cc | 2 +- 4 files changed, 25 insertions(+), 24 deletions(-) diff --git a/src/configuration.cc b/src/configuration.cc index 80af0227..0abfd782 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -69,7 +69,7 @@ constexpr const char kKubernetesDefaultClusterLocation[] = ""; constexpr const char kKubernetesDefaultNodeName[] = ""; constexpr const bool kKubernetesDefaultUseWatch = false; constexpr const bool kKubernetesDefaultClusterLevelMetadata = false; -constexpr const bool kKubernetesDefaultServiceMetadata = true; +constexpr const bool kKubernetesDefaultServiceMetadata = false; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; constexpr const char kDefaultHealthCheckFile[] = @@ -114,8 +114,7 @@ Configuration::Configuration() kubernetes_use_watch_(kKubernetesDefaultUseWatch), kubernetes_cluster_level_metadata_( kKubernetesDefaultClusterLevelMetadata), - kubernetes_service_metadata_( - kKubernetesDefaultServiceMetadata), + kubernetes_service_metadata_(kKubernetesDefaultServiceMetadata), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone), health_check_file_(kDefaultHealthCheckFile) {} diff --git a/src/kubernetes.cc b/src/kubernetes.cc index af7890b9..9f185f2d 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -459,14 +459,13 @@ std::vector KubernetesReader::GetServiceList( std::lock_guard lock(service_mutex_); std::vector service_list; for (const auto& service_it : service_to_metadata_) { - const std::string& service_key = service_it.first; - const std::string namespace_name = - service_key.substr(0, service_key.find("/")); + const std::pair& service_key = service_it.first; + const std::string namespace_name = service_key.first; const json::value& service_metadata = service_it.second; auto endpoints_it = service_to_pods_.find(service_key); const std::vector& pod_names = (endpoints_it != service_to_pods_.end()) ? endpoints_it->second - : kNoPods; + : kNoPods; std::vector pod_resources; for (const std::string& pod_name : pod_names) { const MonitoredResource k8s_pod("k8s_pod", { @@ -515,7 +514,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( k8s_cluster, #ifdef ENABLE_KUBERNETES_METADATA MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), - /*is_deleted=*/ false, created_at, collected_at, + /*is_deleted=*/false, created_at, collected_at, std::move(cluster_raw_metadata)) #else MetadataStore::Metadata::IGNORED() @@ -989,18 +988,18 @@ void KubernetesReader::UpdateServiceToMetadataCache( const json::Object* metadata = service->Get("metadata"); const std::string namespace_name = metadata->Get("namespace"); const std::string service_name = metadata->Get("name"); - const std::string encoded_ref = boost::algorithm::join( - std::vector{namespace_name, service_name}, "/"); + const std::pair service_key ( + namespace_name, service_name); std::lock_guard lock(service_mutex_); - auto service_it = service_to_metadata_.find(encoded_ref); + auto service_it = service_to_metadata_.find(service_key); if (is_deleted) { if (service_it != service_to_metadata_.end()) { service_to_metadata_.erase(service_it); } } else { if (service_it == service_to_metadata_.end()) { - service_to_metadata_.emplace(encoded_ref, service->Clone()); + service_to_metadata_.emplace(service_key, service->Clone()); } else { service_it->second = service->Clone(); } @@ -1016,8 +1015,8 @@ void KubernetesReader::UpdateServiceToPodsCache( const std::string namespace_name = metadata->Get("namespace"); // Endpoints name is same as the matching service name. const std::string service_name = metadata->Get("name"); - const std::string encoded_ref = boost::algorithm::join( - std::vector{namespace_name, service_name}, "/"); + const std::pair service_key ( + namespace_name, service_name); std::vector pod_names; // Only extract the pod names when this is not a deletion. In the case of @@ -1054,11 +1053,11 @@ void KubernetesReader::UpdateServiceToPodsCache( std::lock_guard lock(service_mutex_); if (is_deleted) { - service_to_pods_.erase(encoded_ref); + service_to_pods_.erase(service_key); } else { auto it_inserted = - service_to_pods_.emplace(encoded_ref, std::vector()); - service_to_pods_.at(encoded_ref) = pod_names; + service_to_pods_.emplace(service_key, std::vector()); + it_inserted.first->second = pod_names; } } @@ -1274,7 +1273,8 @@ void KubernetesUpdater::StartUpdater() { pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(watched_node, cb); }); - if (config().KubernetesServiceMetadata()) { + if (config().KubernetesClusterLevelMetadata() && + config().KubernetesServiceMetadata()) { service_watch_thread_ = std::thread([=]() { reader_.WatchServices(cb); }); diff --git a/src/kubernetes.h b/src/kubernetes.h index a8d7655b..c8b53e85 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -142,8 +142,8 @@ class KubernetesReader { const throw(json::Exception); // Return the cluster metadata based on the cached values for // service_to_metadta_ and service_to_pods_. - MetadataUpdater::ResourceMetadata GetClusterMetadata( - Timestamp collected_at) const throw(json::Exception); + MetadataUpdater::ResourceMetadata GetClusterMetadata(Timestamp collected_at) + const throw(json::Exception); // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. @@ -173,7 +173,7 @@ class KubernetesReader { void UpdateServiceToPodsCache( const json::Object* endpoints, bool is_deleted) throw(json::Exception); - // Const data. + // An empty vector value for endpoints that have no pods. const std::vector kNoPods; // Cached data. @@ -190,10 +190,12 @@ class KubernetesReader { mutable std::mutex service_mutex_; // Map from service key to service metadata. This map is built based on the // response from WatchServices. - mutable std::map service_to_metadata_; + mutable std::map, + json::value> service_to_metadata_; // Map from service key to names of pods in the service. This map is built // based on the response from WatchEndpoints. - mutable std::map> service_to_pods_; + mutable std::map, + std::vector> service_to_pods_; const Configuration& config_; HealthChecker* health_checker_; diff --git a/test/configuration_unittest.cc b/test/configuration_unittest.cc index 4e0cf574..ca7b868b 100644 --- a/test/configuration_unittest.cc +++ b/test/configuration_unittest.cc @@ -33,7 +33,7 @@ void VerifyDefaultConfig(const Configuration& config) { EXPECT_EQ("", config.KubernetesNodeName()); EXPECT_EQ(false, config.KubernetesUseWatch()); EXPECT_EQ(false, config.KubernetesClusterLevelMetadata()); - EXPECT_EQ(true, config.KubernetesServiceMetadata()); + EXPECT_EQ(false, config.KubernetesServiceMetadata()); EXPECT_EQ("", config.InstanceId()); EXPECT_EQ("", config.InstanceZone()); EXPECT_EQ("/var/run/metadata-agent/health/unhealthy", config.HealthCheckFile()); From 8ed1b51f95f66cd2904550214ef4497d448cc928 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 20:51:47 -0400 Subject: [PATCH 06/15] address comments 3 --- src/kubernetes.cc | 8 +++----- src/kubernetes.h | 9 +++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 9f185f2d..0795c61f 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -459,7 +459,7 @@ std::vector KubernetesReader::GetServiceList( std::lock_guard lock(service_mutex_); std::vector service_list; for (const auto& service_it : service_to_metadata_) { - const std::pair& service_key = service_it.first; + const ServiceKey& service_key = service_it.first; const std::string namespace_name = service_key.first; const json::value& service_metadata = service_it.second; auto endpoints_it = service_to_pods_.find(service_key); @@ -988,8 +988,7 @@ void KubernetesReader::UpdateServiceToMetadataCache( const json::Object* metadata = service->Get("metadata"); const std::string namespace_name = metadata->Get("namespace"); const std::string service_name = metadata->Get("name"); - const std::pair service_key ( - namespace_name, service_name); + const ServiceKey service_key(namespace_name, service_name); std::lock_guard lock(service_mutex_); auto service_it = service_to_metadata_.find(service_key); @@ -1015,8 +1014,7 @@ void KubernetesReader::UpdateServiceToPodsCache( const std::string namespace_name = metadata->Get("namespace"); // Endpoints name is same as the matching service name. const std::string service_name = metadata->Get("name"); - const std::pair service_key ( - namespace_name, service_name); + const ServiceKey service_key(namespace_name, service_name); std::vector pod_names; // Only extract the pod names when this is not a deletion. In the case of diff --git a/src/kubernetes.h b/src/kubernetes.h index c8b53e85..fdafc43a 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -186,16 +186,17 @@ class KubernetesReader { version_to_kind_to_name_; // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; + // Mutex for the service related caches. mutable std::mutex service_mutex_; + + using ServiceKey = std::pair; // Map from service key to service metadata. This map is built based on the // response from WatchServices. - mutable std::map, - json::value> service_to_metadata_; + mutable std::map service_to_metadata_; // Map from service key to names of pods in the service. This map is built // based on the response from WatchEndpoints. - mutable std::map, - std::vector> service_to_pods_; + mutable std::map> service_to_pods_; const Configuration& config_; HealthChecker* health_checker_; From 7c627bec1a9319c29724e33192bbb2aff0babe4f Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 21:09:38 -0400 Subject: [PATCH 07/15] Set KubernetesServiceMetadata to true, minor header fixes. --- src/configuration.cc | 2 +- src/kubernetes.h | 3 +-- test/configuration_unittest.cc | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/configuration.cc b/src/configuration.cc index 0abfd782..041cbbf5 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -69,7 +69,7 @@ constexpr const char kKubernetesDefaultClusterLocation[] = ""; constexpr const char kKubernetesDefaultNodeName[] = ""; constexpr const bool kKubernetesDefaultUseWatch = false; constexpr const bool kKubernetesDefaultClusterLevelMetadata = false; -constexpr const bool kKubernetesDefaultServiceMetadata = false; +constexpr const bool kKubernetesDefaultServiceMetadata = true; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; constexpr const char kDefaultHealthCheckFile[] = diff --git a/src/kubernetes.h b/src/kubernetes.h index fdafc43a..777e8263 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -187,10 +187,9 @@ class KubernetesReader { // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; + using ServiceKey = std::pair; // Mutex for the service related caches. mutable std::mutex service_mutex_; - - using ServiceKey = std::pair; // Map from service key to service metadata. This map is built based on the // response from WatchServices. mutable std::map service_to_metadata_; diff --git a/test/configuration_unittest.cc b/test/configuration_unittest.cc index ca7b868b..4e0cf574 100644 --- a/test/configuration_unittest.cc +++ b/test/configuration_unittest.cc @@ -33,7 +33,7 @@ void VerifyDefaultConfig(const Configuration& config) { EXPECT_EQ("", config.KubernetesNodeName()); EXPECT_EQ(false, config.KubernetesUseWatch()); EXPECT_EQ(false, config.KubernetesClusterLevelMetadata()); - EXPECT_EQ(false, config.KubernetesServiceMetadata()); + EXPECT_EQ(true, config.KubernetesServiceMetadata()); EXPECT_EQ("", config.InstanceId()); EXPECT_EQ("", config.InstanceZone()); EXPECT_EQ("/var/run/metadata-agent/health/unhealthy", config.HealthCheckFile()); From 59f860cc73cbad7cb1d829f0c0ca7b84a5698f24 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 21:57:30 -0400 Subject: [PATCH 08/15] Edit update to service_to_metadata_ to be similar to code for service_to_pods_ --- src/kubernetes.cc | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 0795c61f..337bb05b 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -991,17 +991,12 @@ void KubernetesReader::UpdateServiceToMetadataCache( const ServiceKey service_key(namespace_name, service_name); std::lock_guard lock(service_mutex_); - auto service_it = service_to_metadata_.find(service_key); if (is_deleted) { - if (service_it != service_to_metadata_.end()) { - service_to_metadata_.erase(service_it); - } + service_to_metadata_.erase(service_key); } else { - if (service_it == service_to_metadata_.end()) { - service_to_metadata_.emplace(service_key, service->Clone()); - } else { - service_it->second = service->Clone(); - } + auto it_inserted = + service_to_metadata_.emplace(service_key, json::value()); + it_inserted.first->second = service->Clone(); } } @@ -1057,7 +1052,6 @@ void KubernetesReader::UpdateServiceToPodsCache( service_to_pods_.emplace(service_key, std::vector()); it_inserted.first->second = pod_names; } - } bool KubernetesReader::ValidateConfiguration() const { From b3ddb0f7c5c3bfe8c3014dfa0a4fd7a940cd970d Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sat, 31 Mar 2018 22:59:18 -0400 Subject: [PATCH 09/15] Add unit tests for GetClusterMetadata --- test/kubernetes_unittest.cc | 156 ++++++++++++++++++++++++++++++++++++ 1 file changed, 156 insertions(+) diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index 359a8241..c415f15b 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -1,4 +1,5 @@ #include "../src/configuration.h" +#include "../src/resource.h" #include "../src/kubernetes.h" #include "../src/updater.h" #include "gtest/gtest.h" @@ -36,6 +37,24 @@ class KubernetesTest : public ::testing::Test { void UpdateOwnersCache(KubernetesReader* reader, const std::string& key, const json::value& value) { reader->owners_[key] = value->Clone(); + MetadataUpdater::ResourceMetadata GetClusterMetadata( + const KubernetesReader& reader, Timestamp collected_at) const + throw(json::Exception) { + return reader.GetClusterMetadata(collected_at); + } + + void UpdateServiceToMetadataCache( + KubernetesReader& reader, const json::Object* service, + bool is_deleted) + throw(json::Exception) { + return reader.UpdateServiceToMetadataCache(service, is_deleted); + } + + void UpdateServiceToPodsCache( + KubernetesReader& reader, const json::Object* endpoints, + bool is_deleted) + throw(json::Exception) { + return reader.UpdateServiceToPodsCache(endpoints, is_deleted); } }; @@ -150,6 +169,143 @@ TEST_F(KubernetesTest, ComputePodAssociations) { const auto associations = ComputePodAssociations(reader, pod->As()); EXPECT_EQ(expected_associations->ToString(), associations->ToString()); + +TEST_F(KubernetesTest, GetClusterMetadataEmpty) { + Configuration config(std::stringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_EQ(0, m.ids().size()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + json::value empty_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({})}, + })}, + }); + EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { + Configuration config(std::stringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + json::value service = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })} + }); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + UpdateServiceToMetadataCache( + reader, service->As(), /*is_deleted=*/false); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_EQ(0, m.ids().size()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + json::value expected_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({ + json::object({ + {"api", json::object({ + {"pods", json::array({})}, + {"raw", std::move(service)}, + {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. + })}, + }), + })}, + })}, + }); + EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { + Configuration config(std::stringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + json::value service = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })} + }); + json::value endpoints = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })}, + {"subsets", json::array({ + json::object({ + {"addresses", json::array({ + json::object({ + {"targetRef", json::object({ + {"kind", json::string("Pod")}, + {"name", json::string("my-pod")}, + })}, + }), + })}, + }), + })}, + }); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + UpdateServiceToMetadataCache( + reader, service->As(), /*is_deleted=*/false); + UpdateServiceToPodsCache( + reader, endpoints->As(), /*is_deleted=*/false); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_EQ(0, m.ids().size()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + MonitoredResource pod_mr = MonitoredResource("k8s_pod", { + {"cluster_name", "TestClusterName"}, + {"namespace_name", "testnamespace"}, + {"pod_name", "my-pod"}, + {"location", "TestClusterLocation"}, + }); + json::value expected_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({ + json::object({ + {"api", json::object({ + {"pods", json::array({ + pod_mr.ToJSON(), + })}, + {"raw", std::move(service)}, + {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. + })}, + }), + })}, + })}, + }); + EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); } TEST_F(KubernetesTest, GetPodMetadata) { From 2d25fd9b22268aaec1b913a2efd51363ed6fa596 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sun, 1 Apr 2018 00:17:41 -0400 Subject: [PATCH 10/15] Add test for deleted service + address comments --- test/kubernetes_unittest.cc | 219 ++++++++++++++++++++---------------- 1 file changed, 122 insertions(+), 97 deletions(-) diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index c415f15b..82c53f0a 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -44,22 +44,20 @@ class KubernetesTest : public ::testing::Test { } void UpdateServiceToMetadataCache( - KubernetesReader& reader, const json::Object* service, - bool is_deleted) + KubernetesReader* reader, const json::Object* service, bool is_deleted) throw(json::Exception) { - return reader.UpdateServiceToMetadataCache(service, is_deleted); + return reader->UpdateServiceToMetadataCache(service, is_deleted); } void UpdateServiceToPodsCache( - KubernetesReader& reader, const json::Object* endpoints, - bool is_deleted) + KubernetesReader* reader, const json::Object* endpoints, bool is_deleted) throw(json::Exception) { - return reader.UpdateServiceToPodsCache(endpoints, is_deleted); + return reader->UpdateServiceToPodsCache(endpoints, is_deleted); } }; TEST_F(KubernetesTest, GetNodeMetadata) { - Configuration config(std::stringstream( + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" "MetadataIngestionRawContentVersion: TestVersion\n" @@ -170,8 +168,96 @@ TEST_F(KubernetesTest, ComputePodAssociations) { ComputePodAssociations(reader, pod->As()); EXPECT_EQ(expected_associations->ToString(), associations->ToString()); -TEST_F(KubernetesTest, GetClusterMetadataEmpty) { +TEST_F(KubernetesTest, GetPodMetadata) { + Configuration config(std::stringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataApiResourceTypePerarator: \",\"\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + + json::value pod = json::object({ + {"metadata", json::object({ + {"namespace", json::string("TestNamespace")}, + {"name", json::string("TestName")}, + {"uid", json::string("TestUid")}, + {"creationTimestamp", json::string("2018-03-03T01:23:45.678901234Z")}, + })}, + }); + const auto m = GetPodMetadata(reader, pod->As(), + json::string("TestAssociations"), Timestamp(), + false); + + EXPECT_EQ(std::vector( + {"k8s_pod.TestUid", "k8s_pod.TestNamespace.TestName"}), m.ids()); + EXPECT_EQ(MonitoredResource("k8s_pod", { + {"cluster_name", "TestClusterName"}, + {"pod_name", "TestName"}, + {"location", "TestClusterLocation"}, + {"namespace_name", "TestNamespace"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_FALSE(m.metadata().is_deleted); + EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), + m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + EXPECT_FALSE(m.metadata().ignore); + json::value expected_metadata = json::object({ + {"blobs", json::object({ + {"api", json::object({ + {"raw", json::object({ + {"metadata", json::object({ + {"creationTimestamp", + json::string("2018-03-03T01:23:45.678901234Z")}, + {"name", json::string("TestName")}, + {"namespace", json::string("TestNamespace")}, + {"uid", json::string("TestUid")}, + })}, + })}, + {"version", json::string("1.6")}, + })}, + {"association", json::string("TestAssociations")}, + })}, + }); + EXPECT_EQ(expected_metadata->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetLegacyResource) { Configuration config(std::stringstream( + "KubernetesClusterName: TestClusterName\n" + "MetadataApiResourceTypeSeparator: \".\"\n" + "InstanceZone: TestZone\n" + "InstanceId: TestID\n" + )); + Environment environment(config); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + json::value pod = json::object({ + {"metadata", json::object({ + {"namespace", json::string("TestNamespace")}, + {"name", json::string("TestName")}, + {"uid", json::string("TestUid")}, + })}, + }); + const auto m = GetLegacyResource(reader, pod->As(), + "TestContainerName"); + EXPECT_EQ(std::vector({ + "gke_container.TestNamespace.TestUid.TestContainerName", + "gke_container.TestNamespace.TestName.TestContainerName", + }), m.ids()); + EXPECT_EQ(MonitoredResource("gke_container", { + {"cluster_name", "TestClusterName"}, + {"container_name", "TestContainerName"}, + {"instance_id", "TestID"}, + {"namespace_id", "TestNamespace"}, + {"pod_id", "TestUid"}, + {"zone", "TestZone"}, + }), m.resource()); + EXPECT_TRUE(m.metadata().ignore); + +TEST_F(KubernetesTest, GetClusterMetadataEmpty) { + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" "MetadataIngestionRawContentVersion: TestVersion\n" @@ -179,13 +265,13 @@ TEST_F(KubernetesTest, GetClusterMetadataEmpty) { Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_EQ(0, m.ids().size()); + EXPECT_TRUE(m.ids().empty()); EXPECT_EQ(MonitoredResource("k8s_cluster", { {"cluster_name", "TestClusterName"}, {"location", "TestClusterLocation"}, }), m.resource()); EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_FALSE(m.metadata().is_deleted); EXPECT_EQ(Timestamp(), m.metadata().created_at); EXPECT_EQ(Timestamp(), m.metadata().collected_at); json::value empty_cluster = json::object({ @@ -197,7 +283,7 @@ TEST_F(KubernetesTest, GetClusterMetadataEmpty) { } TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { - Configuration config(std::stringstream( + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" "MetadataIngestionRawContentVersion: TestVersion\n" @@ -207,11 +293,11 @@ TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { {"metadata", json::object({ {"name", json::string("testname")}, {"namespace", json::string("testnamespace")}, - })} + })}, }); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. UpdateServiceToMetadataCache( - reader, service->As(), /*is_deleted=*/false); + &reader, service->As(), /*is_deleted=*/false); const auto m = GetClusterMetadata(reader, Timestamp()); EXPECT_EQ(0, m.ids().size()); EXPECT_EQ(MonitoredResource("k8s_cluster", { @@ -219,7 +305,7 @@ TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { {"location", "TestClusterLocation"}, }), m.resource()); EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_FALSE(m.metadata().is_deleted); EXPECT_EQ(Timestamp(), m.metadata().created_at); EXPECT_EQ(Timestamp(), m.metadata().collected_at); json::value expected_cluster = json::object({ @@ -239,7 +325,7 @@ TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { } TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { - Configuration config(std::stringstream( + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" "MetadataIngestionRawContentVersion: TestVersion\n" @@ -249,7 +335,7 @@ TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { {"metadata", json::object({ {"name", json::string("testname")}, {"namespace", json::string("testnamespace")}, - })} + })}, }); json::value endpoints = json::object({ {"metadata", json::object({ @@ -271,17 +357,17 @@ TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { }); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. UpdateServiceToMetadataCache( - reader, service->As(), /*is_deleted=*/false); + &reader, service->As(), /*is_deleted=*/false); UpdateServiceToPodsCache( - reader, endpoints->As(), /*is_deleted=*/false); + &reader, endpoints->As(), /*is_deleted=*/false); const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_EQ(0, m.ids().size()); + EXPECT_TRUE(m.ids().empty()); EXPECT_EQ(MonitoredResource("k8s_cluster", { {"cluster_name", "TestClusterName"}, {"location", "TestClusterLocation"}, }), m.resource()); EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_EQ(false, m.metadata().is_deleted); + EXPECT_FALSE(m.metadata().is_deleted); EXPECT_EQ(Timestamp(), m.metadata().created_at); EXPECT_EQ(Timestamp(), m.metadata().collected_at); MonitoredResource pod_mr = MonitoredResource("k8s_pod", { @@ -296,7 +382,7 @@ TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { json::object({ {"api", json::object({ {"pods", json::array({ - pod_mr.ToJSON(), + pod_mr.ToJSON(), })}, {"raw", std::move(service)}, {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. @@ -308,92 +394,31 @@ TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); } -TEST_F(KubernetesTest, GetPodMetadata) { - Configuration config(std::stringstream( +TEST_F(KubernetesTest, GetClusterMetadataDeletedService) { + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" - "MetadataApiResourceTypePerarator: \",\"\n" "MetadataIngestionRawContentVersion: TestVersion\n" )); Environment environment(config); - KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - - json::value pod = json::object({ + json::value service = json::object({ {"metadata", json::object({ - {"namespace", json::string("TestNamespace")}, - {"name", json::string("TestName")}, - {"uid", json::string("TestUid")}, - {"creationTimestamp", json::string("2018-03-03T01:23:45.678901234Z")}, - })}, - }); - const auto m = GetPodMetadata(reader, pod->As(), - json::string("TestAssociations"), Timestamp(), - false); - - EXPECT_EQ(std::vector( - {"k8s_pod.TestUid", "k8s_pod.TestNamespace.TestName"}), m.ids()); - EXPECT_EQ(MonitoredResource("k8s_pod", { - {"cluster_name", "TestClusterName"}, - {"pod_name", "TestName"}, - {"location", "TestClusterLocation"}, - {"namespace_name", "TestNamespace"}, - }), m.resource()); - EXPECT_EQ("TestVersion", m.metadata().version); - EXPECT_FALSE(m.metadata().is_deleted); - EXPECT_EQ(time::rfc3339::FromString("2018-03-03T01:23:45.678901234Z"), - m.metadata().created_at); - EXPECT_EQ(Timestamp(), m.metadata().collected_at); - EXPECT_FALSE(m.metadata().ignore); - json::value expected_metadata = json::object({ - {"blobs", json::object({ - {"api", json::object({ - {"raw", json::object({ - {"metadata", json::object({ - {"creationTimestamp", - json::string("2018-03-03T01:23:45.678901234Z")}, - {"name", json::string("TestName")}, - {"namespace", json::string("TestNamespace")}, - {"uid", json::string("TestUid")}, - })}, - })}, - {"version", json::string("1.6")}, - })}, - {"association", json::string("TestAssociations")}, + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, })}, }); - EXPECT_EQ(expected_metadata->ToString(), m.metadata().metadata->ToString()); -} - -TEST_F(KubernetesTest, GetLegacyResource) { - Configuration config(std::stringstream( - "KubernetesClusterName: TestClusterName\n" - "MetadataApiResourceTypeSeparator: \".\"\n" - "InstanceZone: TestZone\n" - "InstanceId: TestID\n" - )); - Environment environment(config); KubernetesReader reader(config, nullptr); // Don't need HealthChecker. - json::value pod = json::object({ - {"metadata", json::object({ - {"namespace", json::string("TestNamespace")}, - {"name", json::string("TestName")}, - {"uid", json::string("TestUid")}, + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/false); + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/true); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_TRUE(m.ids().empty()); + json::value empty_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({})}, })}, }); - const auto m = GetLegacyResource(reader, pod->As(), - "TestContainerName"); - EXPECT_EQ(std::vector({ - "gke_container.TestNamespace.TestUid.TestContainerName", - "gke_container.TestNamespace.TestName.TestContainerName", - }), m.ids()); - EXPECT_EQ(MonitoredResource("gke_container", { - {"cluster_name", "TestClusterName"}, - {"container_name", "TestContainerName"}, - {"instance_id", "TestID"}, - {"namespace_id", "TestNamespace"}, - {"pod_id", "TestUid"}, - {"zone", "TestZone"}, - }), m.resource()); - EXPECT_TRUE(m.metadata().ignore); + EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); } } // namespace google From d1351e37883e76e38f607cc0d383ff2ba2d9b5ce Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sun, 1 Apr 2018 00:27:54 -0400 Subject: [PATCH 11/15] Fix test to use empty() --- test/kubernetes_unittest.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index 82c53f0a..b25f19f3 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -299,7 +299,7 @@ TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { UpdateServiceToMetadataCache( &reader, service->As(), /*is_deleted=*/false); const auto m = GetClusterMetadata(reader, Timestamp()); - EXPECT_EQ(0, m.ids().size()); + EXPECT_TRUE(m.ids().empty()); EXPECT_EQ(MonitoredResource("k8s_cluster", { {"cluster_name", "TestClusterName"}, {"location", "TestClusterLocation"}, From 38524ce46f9f672102e9e977b4f9914cb1d70d30 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sun, 1 Apr 2018 14:54:07 -0400 Subject: [PATCH 12/15] Add comments, and rename variables to clarify endpoints vs service --- src/kubernetes.cc | 13 ++++++------- src/kubernetes.h | 4 +++- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 337bb05b..fb282812 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -458,14 +458,13 @@ std::vector KubernetesReader::GetServiceList( throw(json::Exception) { std::lock_guard lock(service_mutex_); std::vector service_list; - for (const auto& service_it : service_to_metadata_) { - const ServiceKey& service_key = service_it.first; + for (const auto& metadata_it : service_to_metadata_) { + const ServiceKey& service_key = metadata_it.first; const std::string namespace_name = service_key.first; - const json::value& service_metadata = service_it.second; - auto endpoints_it = service_to_pods_.find(service_key); + const json::value& service_metadata = metadata_it.second; + auto pods_it = service_to_pods_.find(service_key); const std::vector& pod_names = - (endpoints_it != service_to_pods_.end()) ? endpoints_it->second - : kNoPods; + (pods_it != service_to_pods_.end()) ? pods_it->second : kNoPods; std::vector pod_resources; for (const std::string& pod_name : pod_names) { const MonitoredResource k8s_pod("k8s_pod", { @@ -1007,7 +1006,7 @@ void KubernetesReader::UpdateServiceToPodsCache( #endif const json::Object* metadata = endpoints->Get("metadata"); const std::string namespace_name = metadata->Get("namespace"); - // Endpoints name is same as the matching service name. + // Endpoints name is the same as the matching service name. const std::string service_name = metadata->Get("name"); const ServiceKey service_key(namespace_name, service_name); diff --git a/src/kubernetes.h b/src/kubernetes.h index 777e8263..e5d5610c 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -169,7 +169,9 @@ class KubernetesReader { void UpdateServiceToMetadataCache( const json::Object* service, bool is_deleted) throw(json::Exception); - // Update service_to_pods_ cache based on a newly updated endpoints. + // Update service_to_pods_ cache based on a newly updated endpoints. The + // Endpoints resource provides a mapping from a single service to its pods: + // https://kubernetes.io/docs/concepts/services-networking/service/ void UpdateServiceToPodsCache( const json::Object* endpoints, bool is_deleted) throw(json::Exception); From 67c15670929bd245aa73fcbe86b0814bfc3c57e7 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sun, 1 Apr 2018 15:23:40 -0400 Subject: [PATCH 13/15] Add comments for the ServiceKey --- src/kubernetes.cc | 1 + src/kubernetes.h | 2 ++ 2 files changed, 3 insertions(+) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index fb282812..af4c6b2b 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -459,6 +459,7 @@ std::vector KubernetesReader::GetServiceList( std::lock_guard lock(service_mutex_); std::vector service_list; for (const auto& metadata_it : service_to_metadata_) { + // service_key is a std::pair containing (namespace_name, service_name). const ServiceKey& service_key = metadata_it.first; const std::string namespace_name = service_key.first; const json::value& service_metadata = metadata_it.second; diff --git a/src/kubernetes.h b/src/kubernetes.h index e5d5610c..6163376e 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -189,6 +189,8 @@ class KubernetesReader { // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; + // Unique identifier of a service in a cluster, based on the namespace name + // and the service name. using ServiceKey = std::pair; // Mutex for the service related caches. mutable std::mutex service_mutex_; From 85a4b276bd559f48a0bf6074cd50c19a00067e2b Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Sun, 1 Apr 2018 16:47:41 -0400 Subject: [PATCH 14/15] Update comments --- src/kubernetes.cc | 2 +- src/kubernetes.h | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index af4c6b2b..f36a6fdf 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -459,7 +459,7 @@ std::vector KubernetesReader::GetServiceList( std::lock_guard lock(service_mutex_); std::vector service_list; for (const auto& metadata_it : service_to_metadata_) { - // service_key is a std::pair containing (namespace_name, service_name). + // A service key consists of a namespace name and a service name. const ServiceKey& service_key = metadata_it.first; const std::string namespace_name = service_key.first; const json::value& service_metadata = metadata_it.second; diff --git a/src/kubernetes.h b/src/kubernetes.h index 6163376e..ea620df3 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -189,8 +189,8 @@ class KubernetesReader { // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; - // Unique identifier of a service in a cluster, based on the namespace name - // and the service name. + // ServiceKey is a pair of the namespace name and the service name that + // uniquely identifies a service in a cluster. using ServiceKey = std::pair; // Mutex for the service related caches. mutable std::mutex service_mutex_; From 2a7223c19bf3061047035ace342cfff38b072f07 Mon Sep 17 00:00:00 2001 From: Supriya Garg Date: Mon, 2 Apr 2018 18:05:33 -0400 Subject: [PATCH 15/15] Fix mismatched braces introduced during rebase --- test/kubernetes_unittest.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index b25f19f3..2841f947 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -37,6 +37,8 @@ class KubernetesTest : public ::testing::Test { void UpdateOwnersCache(KubernetesReader* reader, const std::string& key, const json::value& value) { reader->owners_[key] = value->Clone(); + } + MetadataUpdater::ResourceMetadata GetClusterMetadata( const KubernetesReader& reader, Timestamp collected_at) const throw(json::Exception) { @@ -167,6 +169,7 @@ TEST_F(KubernetesTest, ComputePodAssociations) { const auto associations = ComputePodAssociations(reader, pod->As()); EXPECT_EQ(expected_associations->ToString(), associations->ToString()); +} TEST_F(KubernetesTest, GetPodMetadata) { Configuration config(std::stringstream(