diff --git a/src/configuration.cc b/src/configuration.cc index 796b6020..17af7d97 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -57,6 +57,7 @@ constexpr const char kKubernetesDefaultClusterName[] = ""; constexpr const char kKubernetesDefaultClusterLocation[] = ""; constexpr const char kKubernetesDefaultNodeName[] = ""; constexpr const bool kKubernetesDefaultUseWatch = true; +constexpr const bool kKubernetesDefaultClusterLevelMetadata = false; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; @@ -95,6 +96,8 @@ MetadataAgentConfiguration::MetadataAgentConfiguration() kubernetes_cluster_location_(kKubernetesDefaultClusterLocation), kubernetes_node_name_(kKubernetesDefaultNodeName), kubernetes_use_watch_(kKubernetesDefaultUseWatch), + kubernetes_cluster_level_metadata_( + kKubernetesDefaultClusterLevelMetadata), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone) {} @@ -204,6 +207,9 @@ void MetadataAgentConfiguration::ParseConfiguration(std::istream& input) { config["KubernetesNodeName"].as(kKubernetesDefaultNodeName); kubernetes_use_watch_ = config["KubernetesUseWatch"].as(kKubernetesDefaultUseWatch); + kubernetes_cluster_level_metadata_ = + config["KubernetesClusterLevelMetadata"].as( + kKubernetesDefaultClusterLevelMetadata); instance_id_ = config["InstanceId"].as(kDefaultInstanceId); instance_zone_ = diff --git a/src/configuration.h b/src/configuration.h index cc4dbb94..e92971bc 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -128,6 +128,10 @@ class MetadataAgentConfiguration { std::lock_guard lock(mutex_); return kubernetes_use_watch_; } + bool KubernetesClusterLevelMetadata() const { + std::lock_guard lock(mutex_); + return kubernetes_cluster_level_metadata_; + } // Common metadata updater options. const std::string& InstanceId() const { std::lock_guard lock(mutex_); @@ -169,6 +173,7 @@ class MetadataAgentConfiguration { std::string kubernetes_cluster_location_; std::string kubernetes_node_name_; bool kubernetes_use_watch_; + bool kubernetes_cluster_level_metadata_; std::string instance_id_; std::string instance_zone_; }; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index cdc60467..bb315d12 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -91,12 +91,11 @@ KubernetesReader::KubernetesReader(const MetadataAgentConfiguration& config) : config_(config), environment_(config) {} MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( - json::value raw_node, Timestamp collected_at, bool is_deleted) const + const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); - const json::Object* node = raw_node->As(); const json::Object* metadata = node->Get("metadata"); const std::string node_name = metadata->Get("name"); const std::string created_str = @@ -122,7 +121,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( })}, {"api", json::object({ {"version", json::string(kKubernetesApiVersion)}, - {"raw", std::move(raw_node)}, + {"raw", node->Clone()}, })}, })}, }); @@ -169,34 +168,38 @@ json::value KubernetesReader::ComputePodAssociations(const json::Object* pod) ? top_level_controller->Get("kind") : "Pod"; - // TODO: What about pods that are not scheduled yet? - const json::Object* spec = pod->Get("spec"); - const std::string node_name = spec->Get("nodeName"); - json::value instance_resource = InstanceReader::InstanceResource(environment_).ToJSON(); + std::unique_ptr raw_associations(new json::Object({ + {"infrastructureResource", std::move(instance_resource)}, + {"controllers", json::object({ + {"topLevelControllerType", json::string(top_level_kind)}, + {"topLevelControllerName", json::string(top_level_name)}, + })}, + })); + + const json::Object* spec = pod->Get("spec"); + if (spec->Has("nodeName")) { + // Pods that have been scheduled will have a nodeName. + raw_associations->emplace(std::make_pair( + "nodeName", + json::string(spec->Get("nodeName")) + )); + } + return json::object({ {"version", json::string(config_.MetadataIngestionRawContentVersion())}, - {"raw", json::object({ - {"infrastructureResource", std::move(instance_resource)}, - {"controllers", json::object({ - {"topLevelControllerType", json::string(top_level_kind)}, - {"topLevelControllerName", json::string(top_level_name)}, - })}, - {"nodeName", json::string(node_name)}, - })}, + {"raw", json::value(std::move(raw_associations))}, }); } MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( - json::value raw_pod, json::value associations, Timestamp collected_at, + const json::Object* pod, json::value associations, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); - const json::Object* pod = raw_pod->As(); - const json::Object* metadata = pod->Get("metadata"); const std::string namespace_name = metadata->Get("namespace"); const std::string pod_name = metadata->Get("name"); @@ -442,8 +445,7 @@ KubernetesReader::GetPodAndContainerMetadata( } result.emplace_back( - GetPodMetadata(pod->Clone(), std::move(associations), collected_at, - is_deleted)); + GetPodMetadata(pod, std::move(associations), collected_at, is_deleted)); return std::move(result); } @@ -454,19 +456,34 @@ std::vector } std::vector result; - const std::string node_name = CurrentNode(); + const std::string current_node = CurrentNode(); if (config_.VerboseLogging()) { - LOG(INFO) << "Current node is " << node_name; + LOG(INFO) << "Current node is " << current_node; } + const std::string node_name( + config_.KubernetesClusterLevelMetadata() ? "" : current_node); + try { - json::value raw_node = QueryMaster( + json::value raw_nodes = QueryMaster( std::string(kKubernetesEndpointPath) + "/nodes/" + node_name); Timestamp collected_at = std::chrono::system_clock::now(); - result.emplace_back(GetNodeMetadata(std::move(raw_node), collected_at, - /*is_deleted=*/false)); + if (!node_name.empty()) { + // It's a single node object -- fake a NodeList. + raw_nodes.reset(json::object({ + {"items", json::array({std::move(raw_nodes)})}, + }).release()); + } + + const json::Object* nodelist = raw_nodes->As(); + const json::Array* nodes_array = nodelist->Get("items"); + for (const json::value& raw_node : *nodes_array) { + const json::Object* node = raw_node->As(); + result.emplace_back( + GetNodeMetadata(node, collected_at, /*is_deleted=*/false)); + } } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const QueryException& e) { @@ -502,10 +519,13 @@ std::vector const std::string pod_id = metadata->Get("uid"); const json::Object* spec = pod->Get("spec"); - const std::string pod_node_name = spec->Get("nodeName"); - if (pod_node_name != node_name) { - LOG(ERROR) << "Internal error; pod's node " << pod_node_name - << " not the same as agent node " << node_name; + + if (!node_name.empty()) { + const std::string pod_node_name = spec->Get("nodeName"); + if (pod_node_name != node_name) { + LOG(ERROR) << "Internal error; pod's node " << pod_node_name + << " not the same as agent node " << node_name; + } } const json::Object* status = pod->Get("status"); @@ -1078,15 +1098,11 @@ void KubernetesReader::PodCallback( callback(std::move(result_vector)); } -void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) - const { - LOG(INFO) << "Watch thread (pods) started"; - - const std::string node_name = CurrentNode(); - - if (config_.VerboseLogging()) { - LOG(INFO) << "Current node is " << node_name; - } +void KubernetesReader::WatchPods( + const std::string& node_name, + MetadataUpdater::UpdateCallback callback) const { + LOG(INFO) << "Watch thread (pods) started for node " + << (node_name.empty() ? "" : node_name); const std::string node_selector(kNodeSelectorPrefix + node_name); const std::string pod_label_selector( @@ -1115,20 +1131,15 @@ void KubernetesReader::NodeCallback( const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { std::vector result_vector; - result_vector.emplace_back( - GetNodeMetadata(node->Clone(), collected_at, is_deleted)); + result_vector.emplace_back(GetNodeMetadata(node, collected_at, is_deleted)); callback(std::move(result_vector)); } -void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) - const { - LOG(INFO) << "Watch thread (node) started"; - - const std::string node_name = CurrentNode(); - - if (config_.VerboseLogging()) { - LOG(INFO) << "Current node is " << node_name; - } +void KubernetesReader::WatchNodes( + const std::string& node_name, + MetadataUpdater::UpdateCallback callback) const { + LOG(INFO) << "Watch thread (node) started for node " + << (node_name.empty() ? "" : node_name); try { // TODO: There seems to be a Kubernetes API bug with watch=true. @@ -1147,6 +1158,12 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) LOG(INFO) << "Watch thread (node) exiting"; } +KubernetesUpdater::KubernetesUpdater(MetadataAgent* server) + : reader_(server->config()), PollingMetadataUpdater( + server, "KubernetesUpdater", + server->config().KubernetesUpdaterIntervalSeconds(), + [=]() { return reader_.MetadataQuery(); }) { } + bool KubernetesUpdater::ValidateConfiguration() const { if (!PollingMetadataUpdater::ValidateConfiguration()) { return false; @@ -1157,11 +1174,30 @@ bool KubernetesUpdater::ValidateConfiguration() const { void KubernetesUpdater::StartUpdater() { if (config().KubernetesUseWatch()) { + const std::string current_node = reader_.CurrentNode(); + + if (config().VerboseLogging()) { + LOG(INFO) << "Current node is " << current_node; + } + + if (config().KubernetesClusterLevelMetadata()) { + LOG(INFO) << "Watching for cluster-level metadata"; + } else { + LOG(INFO) << "Watching for node-level metadata"; + } + + const std::string watched_node( + config().KubernetesClusterLevelMetadata() ? "" : current_node); + auto cb = [=](std::vector&& results) { MetadataCallback(std::move(results)); }; - node_watch_thread_ = std::thread([=]() { reader_.WatchNode(cb); }); - pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(cb); }); + node_watch_thread_ = std::thread([=]() { + reader_.WatchNodes(watched_node, cb); + }); + pod_watch_thread_ = std::thread([=]() { + reader_.WatchPods(watched_node, cb); + }); } else { // Only try to poll if watch is disabled. PollingMetadataUpdater::StartUpdater(); diff --git a/src/kubernetes.h b/src/kubernetes.h index c4956dbd..91378a0e 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -43,10 +43,16 @@ class KubernetesReader { bool ValidateConfiguration() const; // Node watcher. - void WatchNode(MetadataUpdater::UpdateCallback callback) const; + void WatchNodes(const std::string& node_name, + MetadataUpdater::UpdateCallback callback) const; // Pod watcher. - void WatchPods(MetadataUpdater::UpdateCallback callback) const; + void WatchPods(const std::string& node_name, + MetadataUpdater::UpdateCallback callback) const; + + // Gets the name of the node the agent is running on. + // Returns an empty string if unable to find the current node. + const std::string& CurrentNode() const; private: // A representation of all query-related errors. @@ -88,11 +94,11 @@ class KubernetesReader { throw(json::Exception); // Given a node object, return the associated metadata. MetadataUpdater::ResourceMetadata GetNodeMetadata( - json::value raw_node, Timestamp collected_at, bool is_deleted) const + const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Given a pod object, return the associated metadata. MetadataUpdater::ResourceMetadata GetPodMetadata( - json::value raw_pod, json::value associations, Timestamp collected_at, + const json::Object* pod, json::value associations, Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Given a pod object and container info, return the container metadata. MetadataUpdater::ResourceMetadata GetContainerMetadata( @@ -109,10 +115,6 @@ class KubernetesReader { const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); - // Gets the name of the node the agent is running on. - // Returns an empty string if unable to find the current node. - const std::string& CurrentNode() const; - // Gets the Kubernetes master API token. // Returns an empty string if unable to find the token. const std::string& KubernetesApiToken() const; @@ -151,12 +153,7 @@ class KubernetesReader { class KubernetesUpdater : public PollingMetadataUpdater { public: - KubernetesUpdater(MetadataAgent* server) - : reader_(server->config()), PollingMetadataUpdater( - server, "KubernetesUpdater", - server->config().KubernetesUpdaterIntervalSeconds(), - [=]() { return reader_.MetadataQuery(); }) { } - + KubernetesUpdater(MetadataAgent* server); ~KubernetesUpdater() { if (node_watch_thread_.joinable()) { node_watch_thread_.join();