diff --git a/src/configuration.cc b/src/configuration.cc index f8514f07..041cbbf5 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -69,6 +69,7 @@ constexpr const char kKubernetesDefaultClusterLocation[] = ""; constexpr const char kKubernetesDefaultNodeName[] = ""; constexpr const bool kKubernetesDefaultUseWatch = false; constexpr const bool kKubernetesDefaultClusterLevelMetadata = false; +constexpr const bool kKubernetesDefaultServiceMetadata = true; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; constexpr const char kDefaultHealthCheckFile[] = @@ -113,6 +114,7 @@ Configuration::Configuration() kubernetes_use_watch_(kKubernetesDefaultUseWatch), kubernetes_cluster_level_metadata_( kKubernetesDefaultClusterLevelMetadata), + kubernetes_service_metadata_(kKubernetesDefaultServiceMetadata), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone), health_check_file_(kDefaultHealthCheckFile) {} @@ -270,7 +272,10 @@ void Configuration::ParseConfiguration(std::istream& input) { config["KubernetesUseWatch"].as(kubernetes_use_watch_); kubernetes_cluster_level_metadata_ = config["KubernetesClusterLevelMetadata"].as( - kubernetes_cluster_level_metadata_); + kKubernetesDefaultClusterLevelMetadata); + kubernetes_service_metadata_ = + config["KubernetesServiceMetadata"].as( + kKubernetesDefaultServiceMetadata); instance_id_ = config["InstanceId"].as(instance_id_); instance_zone_ = diff --git a/src/configuration.h b/src/configuration.h index c275b080..4b4cb98c 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -140,6 +140,10 @@ class Configuration { std::lock_guard lock(mutex_); return kubernetes_cluster_level_metadata_; } + bool KubernetesServiceMetadata() const { + std::lock_guard lock(mutex_); + return kubernetes_service_metadata_; + } // Common metadata updater options. const std::string& InstanceId() const { std::lock_guard lock(mutex_); @@ -194,6 +198,7 @@ class Configuration { std::string kubernetes_node_name_; bool kubernetes_use_watch_; bool kubernetes_cluster_level_metadata_; + bool kubernetes_service_metadata_; std::string instance_id_; std::string instance_zone_; std::string health_check_file_; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index abfed910..f36a6fdf 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -453,6 +453,75 @@ KubernetesReader::GetPodAndContainerMetadata( return std::move(result); } +std::vector KubernetesReader::GetServiceList( + const std::string& cluster_name, const std::string& location) const + throw(json::Exception) { + std::lock_guard lock(service_mutex_); + std::vector service_list; + for (const auto& metadata_it : service_to_metadata_) { + // A service key consists of a namespace name and a service name. + const ServiceKey& service_key = metadata_it.first; + const std::string namespace_name = service_key.first; + const json::value& service_metadata = metadata_it.second; + auto pods_it = service_to_pods_.find(service_key); + const std::vector& pod_names = + (pods_it != service_to_pods_.end()) ? pods_it->second : kNoPods; + std::vector pod_resources; + for (const std::string& pod_name : pod_names) { + const MonitoredResource k8s_pod("k8s_pod", { + {"cluster_name", cluster_name}, + {"namespace_name", namespace_name}, + {"pod_name", pod_name}, + {"location", location}, + }); + pod_resources.emplace_back(k8s_pod.ToJSON()); + } + service_list.emplace_back(json::object({ + {"api", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", service_metadata->Clone()}, + {"pods", json::array(std::move(pod_resources))}, + })}, + })); + } + return service_list; +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata( + Timestamp collected_at) const throw(json::Exception) { + const std::string cluster_name = environment_.KubernetesClusterName(); + const std::string location = environment_.KubernetesClusterLocation(); + std::vector service_list = + GetServiceList(cluster_name, location); + const MonitoredResource k8s_cluster("k8s_cluster", { + {"cluster_name", cluster_name}, + {"location", location}, + }); + json::value cluster_raw_metadata = json::object({ + {"blobs", json::object({ + {"services", json::array(std::move(service_list))}, + })}, + }); + if (config_.VerboseLogging()) { + LOG(INFO) << "Raw cluster metadata: " << *cluster_raw_metadata; + } + + // There is no created_at for the cluster since the metadata contains + // ALL current services. + Timestamp created_at = time_point(); + return MetadataUpdater::ResourceMetadata( + std::vector{}, + k8s_cluster, +#ifdef ENABLE_KUBERNETES_METADATA + MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(), + /*is_deleted=*/false, created_at, collected_at, + std::move(cluster_raw_metadata)) +#else + MetadataStore::Metadata::IGNORED() +#endif + ); +} + std::vector KubernetesReader::MetadataQuery() const { if (config_.VerboseLogging()) { @@ -911,6 +980,80 @@ json::value KubernetesReader::FindTopLevelController( return FindTopLevelController(ns, GetOwner(ns, controller_ref)); } +void KubernetesReader::UpdateServiceToMetadataCache( + const json::Object* service, bool is_deleted) throw(json::Exception) { +#ifdef VERBOSE + LOG(DEBUG) << "UpdateServiceToMetadataCache(" << *service << ")"; +#endif + const json::Object* metadata = service->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string service_name = metadata->Get("name"); + const ServiceKey service_key(namespace_name, service_name); + + std::lock_guard lock(service_mutex_); + if (is_deleted) { + service_to_metadata_.erase(service_key); + } else { + auto it_inserted = + service_to_metadata_.emplace(service_key, json::value()); + it_inserted.first->second = service->Clone(); + } +} + +void KubernetesReader::UpdateServiceToPodsCache( + const json::Object* endpoints, bool is_deleted) throw(json::Exception) { +#ifdef VERBOSE + LOG(DEBUG) << "UpdateServiceToPodsCache(" << *endpoints << ")"; +#endif + const json::Object* metadata = endpoints->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + // Endpoints name is the same as the matching service name. + const std::string service_name = metadata->Get("name"); + const ServiceKey service_key(namespace_name, service_name); + + std::vector pod_names; + // Only extract the pod names when this is not a deletion. In the case of + // a deletion, we delete the mapping below. + if (!is_deleted && endpoints->Has("subsets") && + !endpoints->at("subsets")->Is()) { + const json::Array* subsets = endpoints->Get("subsets"); + for (const json::value& subset : *subsets) { + const json::Object* subset_obj = subset->As(); + if (!subset_obj->Has("addresses")) { + continue; + } + const json::Array* addresses = subset_obj->Get("addresses"); + for (const json::value& address : *addresses) { + const json::Object* address_obj = address->As(); + if (!address_obj->Has("targetRef")) { + continue; + } + const json::Object* ref = address_obj->Get("targetRef"); + if (!(ref->Has("kind") && ref->Has("name"))) { + continue; + } + const std::string target_kind = ref->Get("kind"); + if (target_kind != "Pod") { + LOG(INFO) << "Found a resource other than a pod in Endpoint " + << service_name << "'s targetRef: " << target_kind; + continue; + } + const std::string pod_name = ref->Get("name"); + pod_names.push_back(pod_name); + } + } + } + + std::lock_guard lock(service_mutex_); + if (is_deleted) { + service_to_pods_.erase(service_key); + } else { + auto it_inserted = + service_to_pods_.emplace(service_key, std::vector()); + it_inserted.first->second = pod_names; + } +} + bool KubernetesReader::ValidateConfiguration() const { try { (void) QueryMaster(std::string(kKubernetesEndpointPath) + "/nodes?limit=1"); @@ -1013,6 +1156,73 @@ void KubernetesReader::WatchNodes( LOG(INFO) << "Watch thread (node) exiting"; } +void KubernetesReader::ServiceCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* service, Timestamp collected_at, bool is_deleted) + throw(json::Exception) { + UpdateServiceToMetadataCache(service, is_deleted); + + // TODO: using a temporary did not work here. + std::vector result_vector; + result_vector.emplace_back(GetClusterMetadata(collected_at)); + callback(std::move(result_vector)); +} + +void KubernetesReader::WatchServices(MetadataUpdater::UpdateCallback callback) { + LOG(INFO) << "Watch thread started for services"; + + try { + WatchMaster( + "Service", + std::string(kKubernetesEndpointPath) + "/watch/services/", + [=](const json::Object* service, Timestamp collected_at, + bool is_deleted) { + ServiceCallback(callback, service, collected_at, is_deleted); + }); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + LOG(ERROR) << "No more service metadata will be collected"; + } catch (const KubernetesReader::QueryException& e) { + LOG(ERROR) << "No more service metadata will be collected"; + } + health_checker_->SetUnhealthy("kubernetes_service_thread"); + LOG(INFO) << "Watch thread (service) exiting"; +} + +void KubernetesReader::EndpointsCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* endpoints, Timestamp collected_at, bool is_deleted) + throw(json::Exception) { + UpdateServiceToPodsCache(endpoints, is_deleted); + + // TODO: using a temporary did not work here. + std::vector result_vector; + result_vector.emplace_back(GetClusterMetadata(collected_at)); + callback(std::move(result_vector)); +} + +void KubernetesReader::WatchEndpoints( + MetadataUpdater::UpdateCallback callback) { + LOG(INFO) << "Watch thread started for endpoints"; + + try { + WatchMaster( + "Endpoints", + std::string(kKubernetesEndpointPath) + "/watch/endpoints/", + [=](const json::Object* endpoints, Timestamp collected_at, + bool is_deleted) { + EndpointsCallback(callback, endpoints, collected_at, is_deleted); + }); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + LOG(ERROR) << "No more endpoints metadata will be collected"; + } catch (const KubernetesReader::QueryException& e) { + LOG(ERROR) << "No more endpoints metadata will be collected"; + } + health_checker_->SetUnhealthy("kubernetes_endpoints_thread"); + LOG(INFO) << "Watch thread (endpoints) exiting"; +} + KubernetesUpdater::KubernetesUpdater(const Configuration& config, HealthChecker* health_checker, MetadataStore* store) @@ -1055,6 +1265,15 @@ void KubernetesUpdater::StartUpdater() { pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(watched_node, cb); }); + if (config().KubernetesClusterLevelMetadata() && + config().KubernetesServiceMetadata()) { + service_watch_thread_ = std::thread([=]() { + reader_.WatchServices(cb); + }); + endpoints_watch_thread_ = std::thread([=]() { + reader_.WatchEndpoints(cb); + }); + } } else { // Only try to poll if watch is disabled. PollingMetadataUpdater::StartUpdater(); @@ -1064,6 +1283,9 @@ void KubernetesUpdater::StartUpdater() { void KubernetesUpdater::MetadataCallback( std::vector&& result_vector) { for (MetadataUpdater::ResourceMetadata& result : result_vector) { +#ifdef VERBOSE + LOG(DEBUG) << "MetadataCallback (" << result << ")"; +#endif UpdateResourceCallback(result); UpdateMetadataCallback(std::move(result)); } diff --git a/src/kubernetes.h b/src/kubernetes.h index 1ca245ca..ea620df3 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -55,6 +55,12 @@ class KubernetesReader { void WatchPods(const std::string& node_name, MetadataUpdater::UpdateCallback callback) const; + // Service watcher. + void WatchServices(MetadataUpdater::UpdateCallback callback); + + // Endpoints watcher. + void WatchEndpoints(MetadataUpdater::UpdateCallback callback); + // Gets the name of the node the agent is running on. // Returns an empty string if unable to find the current node. const std::string& CurrentNode() const; @@ -95,6 +101,16 @@ class KubernetesReader { MetadataUpdater::UpdateCallback callback, const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); + // Service watch callback. + void ServiceCallback( + MetadataUpdater::UpdateCallback callback, const json::Object* service, + Timestamp collected_at, bool is_deleted) throw(json::Exception); + + // Endpoints watch callback. + void EndpointsCallback( + MetadataUpdater::UpdateCallback callback, const json::Object* endpoints, + Timestamp collected_at, bool is_deleted) throw(json::Exception); + // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); @@ -120,6 +136,14 @@ class KubernetesReader { std::vector GetPodAndContainerMetadata( const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); + // Get a list of service metadata based on the service level caches. + std::vector GetServiceList( + const std::string& cluster_name, const std::string& location) + const throw(json::Exception); + // Return the cluster metadata based on the cached values for + // service_to_metadta_ and service_to_pods_. + MetadataUpdater::ResourceMetadata GetClusterMetadata(Timestamp collected_at) + const throw(json::Exception); // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. @@ -141,6 +165,19 @@ class KubernetesReader { json::value FindTopLevelController(const std::string& ns, json::value object) const throw(QueryException, json::Exception); + // Update service_to_metadata_ cache based on a newly updated service. + void UpdateServiceToMetadataCache( + const json::Object* service, bool is_deleted) throw(json::Exception); + + // Update service_to_pods_ cache based on a newly updated endpoints. The + // Endpoints resource provides a mapping from a single service to its pods: + // https://kubernetes.io/docs/concepts/services-networking/service/ + void UpdateServiceToPodsCache( + const json::Object* endpoints, bool is_deleted) throw(json::Exception); + + // An empty vector value for endpoints that have no pods. + const std::vector kNoPods; + // Cached data. mutable std::recursive_mutex mutex_; mutable std::string current_node_; @@ -152,6 +189,18 @@ class KubernetesReader { // A memoized map from an encoded owner reference to the owner object. mutable std::map owners_; + // ServiceKey is a pair of the namespace name and the service name that + // uniquely identifies a service in a cluster. + using ServiceKey = std::pair; + // Mutex for the service related caches. + mutable std::mutex service_mutex_; + // Map from service key to service metadata. This map is built based on the + // response from WatchServices. + mutable std::map service_to_metadata_; + // Map from service key to names of pods in the service. This map is built + // based on the response from WatchEndpoints. + mutable std::map> service_to_pods_; + const Configuration& config_; HealthChecker* health_checker_; Environment environment_; @@ -168,6 +217,12 @@ class KubernetesUpdater : public PollingMetadataUpdater { if (pod_watch_thread_.joinable()) { pod_watch_thread_.join(); } + if (service_watch_thread_.joinable()) { + service_watch_thread_.join(); + } + if (endpoints_watch_thread_.joinable()) { + endpoints_watch_thread_.join(); + } } protected: @@ -182,6 +237,8 @@ class KubernetesUpdater : public PollingMetadataUpdater { HealthChecker* health_checker_; std::thread node_watch_thread_; std::thread pod_watch_thread_; + std::thread service_watch_thread_; + std::thread endpoints_watch_thread_; }; } diff --git a/test/configuration_unittest.cc b/test/configuration_unittest.cc index 7097c58e..4e0cf574 100644 --- a/test/configuration_unittest.cc +++ b/test/configuration_unittest.cc @@ -32,6 +32,8 @@ void VerifyDefaultConfig(const Configuration& config) { EXPECT_EQ("", config.KubernetesClusterLocation()); EXPECT_EQ("", config.KubernetesNodeName()); EXPECT_EQ(false, config.KubernetesUseWatch()); + EXPECT_EQ(false, config.KubernetesClusterLevelMetadata()); + EXPECT_EQ(true, config.KubernetesServiceMetadata()); EXPECT_EQ("", config.InstanceId()); EXPECT_EQ("", config.InstanceZone()); EXPECT_EQ("/var/run/metadata-agent/health/unhealthy", config.HealthCheckFile()); diff --git a/test/kubernetes_unittest.cc b/test/kubernetes_unittest.cc index 359a8241..2841f947 100644 --- a/test/kubernetes_unittest.cc +++ b/test/kubernetes_unittest.cc @@ -1,4 +1,5 @@ #include "../src/configuration.h" +#include "../src/resource.h" #include "../src/kubernetes.h" #include "../src/updater.h" #include "gtest/gtest.h" @@ -37,10 +38,28 @@ class KubernetesTest : public ::testing::Test { const json::value& value) { reader->owners_[key] = value->Clone(); } + + MetadataUpdater::ResourceMetadata GetClusterMetadata( + const KubernetesReader& reader, Timestamp collected_at) const + throw(json::Exception) { + return reader.GetClusterMetadata(collected_at); + } + + void UpdateServiceToMetadataCache( + KubernetesReader* reader, const json::Object* service, bool is_deleted) + throw(json::Exception) { + return reader->UpdateServiceToMetadataCache(service, is_deleted); + } + + void UpdateServiceToPodsCache( + KubernetesReader* reader, const json::Object* endpoints, bool is_deleted) + throw(json::Exception) { + return reader->UpdateServiceToPodsCache(endpoints, is_deleted); + } }; TEST_F(KubernetesTest, GetNodeMetadata) { - Configuration config(std::stringstream( + Configuration config(std::istringstream( "KubernetesClusterName: TestClusterName\n" "KubernetesClusterLocation: TestClusterLocation\n" "MetadataIngestionRawContentVersion: TestVersion\n" @@ -239,5 +258,170 @@ TEST_F(KubernetesTest, GetLegacyResource) { {"zone", "TestZone"}, }), m.resource()); EXPECT_TRUE(m.metadata().ignore); + +TEST_F(KubernetesTest, GetClusterMetadataEmpty) { + Configuration config(std::istringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_TRUE(m.ids().empty()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_FALSE(m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + json::value empty_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({})}, + })}, + }); + EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetClusterMetadataEmptyService) { + Configuration config(std::istringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + json::value service = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })}, + }); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/false); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_TRUE(m.ids().empty()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_FALSE(m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + json::value expected_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({ + json::object({ + {"api", json::object({ + {"pods", json::array({})}, + {"raw", std::move(service)}, + {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. + })}, + }), + })}, + })}, + }); + EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetClusterMetadataServiceWithPods) { + Configuration config(std::istringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + json::value service = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })}, + }); + json::value endpoints = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })}, + {"subsets", json::array({ + json::object({ + {"addresses", json::array({ + json::object({ + {"targetRef", json::object({ + {"kind", json::string("Pod")}, + {"name", json::string("my-pod")}, + })}, + }), + })}, + }), + })}, + }); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/false); + UpdateServiceToPodsCache( + &reader, endpoints->As(), /*is_deleted=*/false); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_TRUE(m.ids().empty()); + EXPECT_EQ(MonitoredResource("k8s_cluster", { + {"cluster_name", "TestClusterName"}, + {"location", "TestClusterLocation"}, + }), m.resource()); + EXPECT_EQ("TestVersion", m.metadata().version); + EXPECT_FALSE(m.metadata().is_deleted); + EXPECT_EQ(Timestamp(), m.metadata().created_at); + EXPECT_EQ(Timestamp(), m.metadata().collected_at); + MonitoredResource pod_mr = MonitoredResource("k8s_pod", { + {"cluster_name", "TestClusterName"}, + {"namespace_name", "testnamespace"}, + {"pod_name", "my-pod"}, + {"location", "TestClusterLocation"}, + }); + json::value expected_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({ + json::object({ + {"api", json::object({ + {"pods", json::array({ + pod_mr.ToJSON(), + })}, + {"raw", std::move(service)}, + {"version", json::string("1.6")}, // Hard-coded in kubernetes.cc. + })}, + }), + })}, + })}, + }); + EXPECT_EQ(expected_cluster->ToString(), m.metadata().metadata->ToString()); +} + +TEST_F(KubernetesTest, GetClusterMetadataDeletedService) { + Configuration config(std::istringstream( + "KubernetesClusterName: TestClusterName\n" + "KubernetesClusterLocation: TestClusterLocation\n" + "MetadataIngestionRawContentVersion: TestVersion\n" + )); + Environment environment(config); + json::value service = json::object({ + {"metadata", json::object({ + {"name", json::string("testname")}, + {"namespace", json::string("testnamespace")}, + })}, + }); + KubernetesReader reader(config, nullptr); // Don't need HealthChecker. + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/false); + UpdateServiceToMetadataCache( + &reader, service->As(), /*is_deleted=*/true); + const auto m = GetClusterMetadata(reader, Timestamp()); + EXPECT_TRUE(m.ids().empty()); + json::value empty_cluster = json::object({ + {"blobs", json::object({ + {"services", json::array({})}, + })}, + }); + EXPECT_EQ(empty_cluster->ToString(), m.metadata().metadata->ToString()); } } // namespace google