diff --git a/src/api_server.cc b/src/api_server.cc index aebf59c5..d2341a05 100644 --- a/src/api_server.cc +++ b/src/api_server.cc @@ -102,7 +102,7 @@ void MetadataApiServer::Handler::operator()(const HttpServer::request& request, } if (request.method == "GET" && request.destination.find(kPrefix) == 0) { std::string id = request.destination.substr(kPrefix.size()); - std::lock_guard lock(agent_.mu_); + std::lock_guard lock(agent_.resource_mu_); const auto result = agent_.resource_map_.find(id); if (result == agent_.resource_map_.end()) { // TODO: This could be considered log spam. @@ -286,9 +286,8 @@ MetadataAgent::MetadataAgent(const MetadataAgentConfiguration& config) MetadataAgent::~MetadataAgent() {} void MetadataAgent::UpdateResource(const std::vector& resource_ids, - const MonitoredResource& resource, - Metadata&& entry) { - std::lock_guard lock(mu_); + const MonitoredResource& resource) { + std::lock_guard lock(resource_mu_); // TODO: How do we handle deleted resources? // TODO: Do we care if the value was already there? for (const std::string& id : resource_ids) { @@ -297,6 +296,11 @@ void MetadataAgent::UpdateResource(const std::vector& resource_ids, } resource_map_.emplace(id, resource); } +} + +void MetadataAgent::UpdateMetadata(const MonitoredResource& resource, + Metadata&& entry) { + std::lock_guard lock(metadata_mu_); if (config_.VerboseLogging()) { LOG(INFO) << "Updating metadata map " << resource << "->{" << "version: " << entry.version << ", " @@ -316,7 +320,7 @@ void MetadataAgent::UpdateResource(const std::vector& resource_ids, std::map MetadataAgent::GetMetadataMap() const { - std::lock_guard lock(mu_); + std::lock_guard lock(metadata_mu_); std::map result; for (const auto& kv : metadata_map_) { diff --git a/src/api_server.h b/src/api_server.h index b0ed5b9e..cb23dc3a 100644 --- a/src/api_server.h +++ b/src/api_server.h @@ -81,12 +81,15 @@ class MetadataAgent { MetadataAgent(const MetadataAgentConfiguration& config); ~MetadataAgent(); - // Updates metadata for a given resource. + // Updates the local resource map entry for a given resource. // Each local id in `resource_ids` is effectively an alias for `resource`. - // Adds a resource mapping from each of the `resource_ids` to the `resource` - // and a metadata mapping from the `resource` to the metadata `entry`. + // Adds a resource mapping from each of the `resource_ids` to the `resource`. void UpdateResource(const std::vector& resource_ids, - const MonitoredResource& resource, + const MonitoredResource& resource); + + // Updates metadata for a given resource. + // Adds a metadata mapping from the `resource` to the metadata `entry`. + void UpdateMetadata(const MonitoredResource& resource, Metadata&& entry); // Starts serving. @@ -104,10 +107,12 @@ class MetadataAgent { const MetadataAgentConfiguration& config_; - // A lock that guards access to the maps. - mutable std::mutex mu_; + // A lock that guards access to the local resource map. + mutable std::mutex resource_mu_; // A map from a locally unique id to MonitoredResource. std::map resource_map_; + // A lock that guards access to the metadata map. + mutable std::mutex metadata_mu_; // A map from MonitoredResource to (JSON) resource metadata. std::map metadata_map_; diff --git a/src/docker.cc b/src/docker.cc index 13dab92d..db90fcd1 100644 --- a/src/docker.cc +++ b/src/docker.cc @@ -43,7 +43,7 @@ constexpr const char resource_type_separator[] = "."; DockerReader::DockerReader(const MetadataAgentConfiguration& config) : config_(config), environment_(config) {} -std::vector +std::vector DockerReader::MetadataQuery() const { if (config_.VerboseLogging()) { LOG(INFO) << "Docker Query called"; @@ -58,7 +58,7 @@ std::vector http::local_client client; http::local_client::request list_request( docker_endpoint + "/json?all=true" + container_filter); - std::vector result; + std::vector result; try { http::local_client::response list_response = client.get(list_request); Timestamp collected_at = std::chrono::system_clock::now(); diff --git a/src/docker.h b/src/docker.h index c4069341..917ee85e 100644 --- a/src/docker.h +++ b/src/docker.h @@ -30,13 +30,23 @@ class DockerReader { public: DockerReader(const MetadataAgentConfiguration& config); // A Docker metadata query function. - std::vector MetadataQuery() const; + std::vector MetadataQuery() const; private: const MetadataAgentConfiguration& config_; Environment environment_; }; +class DockerUpdater : public PollingMetadataUpdater { + public: + DockerUpdater(MetadataAgent* server) + : reader_(server->config()), PollingMetadataUpdater( + server, server->config().DockerUpdaterIntervalSeconds(), + std::bind(&google::DockerReader::MetadataQuery, &reader_)) { } + private: + DockerReader reader_; +}; + } #endif // DOCKER_H_ diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 6d902bd8..3d0a7e28 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -85,81 +85,378 @@ bool ReadServiceAccountSecret( KubernetesReader::KubernetesReader(const MetadataAgentConfiguration& config) : config_(config), environment_(config) {} -std::vector - KubernetesReader::MetadataQuery() const { - if (config_.VerboseLogging()) { - LOG(INFO) << "Kubernetes Query called"; - } - std::vector result; - +MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( + json::value raw_node, Timestamp collected_at) const throw(json::Exception) { const std::string platform = "gce"; // TODO: detect other platforms. const std::string instance_id = environment_.InstanceId(); const std::string zone = environment_.InstanceZone(); const std::string cluster_name = environment_.KubernetesClusterName(); - const std::string node_name = CurrentNode(); + const json::Object* node = raw_node->As(); + const json::Object* metadata = node->Get("metadata"); + const std::string node_name = metadata->Get("name"); + const std::string created_str = + metadata->Get("creationTimestamp"); + Timestamp created_at = rfc3339::FromString(created_str); + + const MonitoredResource k8s_node("k8s_node", { + {"cluster_name", cluster_name}, + {"node_name", node_name}, + {"location", zone}, + }); + + json::value node_raw_metadata = json::object({ + {"blobs", json::object({ + {"association", json::object({ + {"version", json::string(kRawContentVersion)}, + {"raw", json::object({ + {"providerPlatform", json::string(platform)}, + {"instanceId", json::string(instance_id)}, + })}, + })}, + {"api", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", std::move(raw_node)}, + })}, + })}, + }); if (config_.VerboseLogging()) { - LOG(INFO) << "Current node is " << node_name; + LOG(INFO) << "Raw node metadata: " << *node_raw_metadata; } - const MonitoredResource k8s_node("k8s_node", { + const std::string k8s_node_name = boost::algorithm::join( + std::vector{kK8sNodeNameResourcePrefix, node_name}, + kResourceTypeSeparator); + return MetadataUpdater::ResourceMetadata( + std::vector{k8s_node_name}, + k8s_node, +#ifdef ENABLE_KUBERNETES_METADATA + MetadataAgent::Metadata(kRawContentVersion, + /*deleted=*/false, created_at, collected_at, + std::move(node_raw_metadata)) +#else + MetadataAgent::Metadata::IGNORED() +#endif + ); +} + +json::value KubernetesReader::ComputePodAssociations(const json::Object* pod) + const throw(json::Exception) { + const std::string platform = "gce"; // TODO: detect other platforms. + + const json::Object* metadata = pod->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string pod_id = metadata->Get("uid"); + + const json::value top_level = FindTopLevelOwner(namespace_name, pod->Clone()); + const json::Object* top_level_controller = top_level->As(); + const json::Object* top_level_metadata = + top_level_controller->Get("metadata"); + const std::string top_level_name = + top_level_metadata->Get("name"); + if (!top_level_controller->Has("kind") && + top_level_metadata->Get("uid") != pod_id) { + LOG(ERROR) << "Internal error; top-level controller without 'kind' " + << *top_level_controller + << " not the same as pod " << *pod; + } + const std::string top_level_kind = + top_level_controller->Has("kind") + ? top_level_controller->Get("kind") + : "Pod"; + + return json::object({ + {"version", json::string(kRawContentVersion)}, + {"raw", json::object({ + {"providerPlatform", json::string(platform)}, + {"controllers", json::object({ + {"topLevelControllerType", json::string(top_level_kind)}, + {"topLevelControllerName", json::string(top_level_name)}, + })}, + })}, + }); +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( + json::value raw_pod, json::value associations, Timestamp collected_at) const + throw(json::Exception) { + const std::string zone = environment_.InstanceZone(); + const std::string cluster_name = environment_.KubernetesClusterName(); + + const json::Object* pod = raw_pod->As(); + + const json::Object* metadata = pod->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string pod_name = metadata->Get("name"); + const std::string pod_id = metadata->Get("uid"); + const std::string created_str = + metadata->Get("creationTimestamp"); + Timestamp created_at = rfc3339::FromString(created_str); + const json::Object* labels = metadata->Get("labels"); + + const json::Object* status = pod->Get("status"); + const std::string started_str = status->Get("startTime"); + Timestamp started_at = rfc3339::FromString(started_str); + + const json::Object* spec = pod->Get("spec"); + const std::string node_name = spec->Get("nodeName"); + + const MonitoredResource k8s_pod("k8s_pod", { {"cluster_name", cluster_name}, + {"namespace_name", namespace_name}, {"node_name", node_name}, + {"pod_name", pod_name}, {"location", zone}, }); - try { - json::value raw_node = QueryMaster( - std::string(kKubernetesEndpointPath) + "/nodes/" + node_name); - Timestamp collected_at = std::chrono::system_clock::now(); + // TODO: find is_deleted. + //const json::Object* status = pod->Get("status"); + bool is_deleted = false; - const json::Object* node = raw_node->As(); - - const json::Object* metadata = node->Get("metadata"); - const std::string node_id = metadata->Get("uid"); - const std::string created_str = - metadata->Get("creationTimestamp"); - Timestamp created_at = rfc3339::FromString(created_str); - - json::value node_raw_metadata = json::object({ - {"blobs", json::object({ - {"association", json::object({ - {"version", json::string(kRawContentVersion)}, - {"raw", json::object({ - {"providerPlatform", json::string(platform)}, - {"instanceId", json::string(instance_id)}, - })}, - })}, - {"api", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", std::move(raw_node)}, - })}, + json::value pod_raw_metadata = json::object({ + {"blobs", json::object({ + {"association", std::move(associations)}, + {"api", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", pod->Clone()}, })}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw node metadata: " << *node_raw_metadata; - } + })}, + }); + if (config_.VerboseLogging()) { + LOG(INFO) << "Raw pod metadata: " << *pod_raw_metadata; + } -#if 0 - // TODO: do we need this? - const std::string k8s_node_id = boost::algorithm::join( - std::vector{kK8sNodeResourcePrefix, node_id}, - kResourceTypeSeparator); + const std::string k8s_pod_id = boost::algorithm::join( + std::vector{kK8sPodResourcePrefix, namespace_name, pod_id}, + kResourceTypeSeparator); + const std::string k8s_pod_name = boost::algorithm::join( + std::vector{kK8sPodNameResourcePrefix, namespace_name, pod_name}, + kResourceTypeSeparator); + return MetadataUpdater::ResourceMetadata( + std::vector{k8s_pod_id, k8s_pod_name}, + k8s_pod, +#ifdef ENABLE_KUBERNETES_METADATA + MetadataAgent::Metadata(kRawContentVersion, + is_deleted, created_at, collected_at, + std::move(pod_raw_metadata)) +#else + MetadataAgent::Metadata::IGNORED() #endif - const std::string k8s_node_name = boost::algorithm::join( - std::vector{kK8sNodeNameResourcePrefix, node_name}, - kResourceTypeSeparator); - result.emplace_back( - std::vector{k8s_node_name}, - k8s_node, + ); +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( + const json::Object* pod, int container_index, json::value associations, + Timestamp collected_at) const throw(json::Exception) { + const std::string zone = environment_.InstanceZone(); + const std::string cluster_name = environment_.KubernetesClusterName(); + + const json::Object* metadata = pod->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string pod_name = metadata->Get("name"); + const std::string pod_id = metadata->Get("uid"); + const std::string created_str = + metadata->Get("creationTimestamp"); + Timestamp created_at = rfc3339::FromString(created_str); + const json::Object* labels = metadata->Get("labels"); + + const json::Object* spec = pod->Get("spec"); + const std::string node_name = spec->Get("nodeName"); + + const json::Object* status = pod->Get("status"); + + const json::Array* container_specs = spec->Get("containers"); + const json::Array* container_statuses = + status->Get("containerStatuses"); + + const json::value& c_status = (*container_statuses)[container_index]; + const json::value& c_spec = (*container_specs)[container_index]; + if (config_.VerboseLogging()) { + LOG(INFO) << "Container: " << *c_status; + } + const json::Object* container = c_status->As(); + const json::Object* container_spec = c_spec->As(); + const std::string container_name = container->Get("name"); + std::size_t docker_prefix_end = sizeof(kDockerIdPrefix) - 1; + if (container->Get("containerID").compare( + 0, docker_prefix_end, kDockerIdPrefix) != 0) { + LOG(ERROR) << "ContainerID " + << container->Get("containerID") + << " does not start with " << kDockerIdPrefix + << " (" << docker_prefix_end << " chars)"; + docker_prefix_end = 0; + } + const std::string container_id = + container->Get("containerID").substr( + docker_prefix_end); + // TODO: find is_deleted. + //const json::Object* state = container->Get("state"); + bool is_deleted = false; + + const MonitoredResource k8s_container("k8s_container", { + {"cluster_name", cluster_name}, + {"namespace_name", namespace_name}, + {"node_name", node_name}, + {"pod_name", pod_name}, + {"container_name", container_name}, + {"location", zone}, + }); + + json::value container_raw_metadata = json::object({ + {"blobs", json::object({ + {"association", std::move(associations)}, + {"spec", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", container_spec->Clone()}, + })}, + {"status", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", container->Clone()}, + })}, + {"labels", json::object({ + {"version", json::string(kKubernetesApiVersion)}, + {"raw", labels->Clone()}, + })}, + })}, + }); + if (config_.VerboseLogging()) { + LOG(INFO) << "Raw container metadata: " << *container_raw_metadata; + } + + const std::string k8s_container_id = boost::algorithm::join( + std::vector{kK8sContainerResourcePrefix, container_id}, + kResourceTypeSeparator); + const std::string k8s_container_pod = boost::algorithm::join( + std::vector{kK8sContainerResourcePrefix, pod_id, container_name}, + kResourceTypeSeparator); + const std::string k8s_container_name = boost::algorithm::join( + std::vector{kK8sContainerNameResourcePrefix, namespace_name, pod_name, container_name}, + kResourceTypeSeparator); + return MetadataUpdater::ResourceMetadata( + std::vector{k8s_container_id, k8s_container_pod, k8s_container_name}, + k8s_container, #ifdef ENABLE_KUBERNETES_METADATA - MetadataAgent::Metadata(kRawContentVersion, - /*deleted=*/false, created_at, collected_at, - std::move(node_raw_metadata)) + MetadataAgent::Metadata(kKubernetesApiVersion, + is_deleted, created_at, collected_at, + std::move(container_raw_metadata)) #else - MetadataAgent::Metadata::IGNORED() + MetadataAgent::Metadata::IGNORED() #endif - ); + ); +} + +MetadataUpdater::ResourceMetadata KubernetesReader::GetLegacyResource( + const json::Object* pod, int container_index) const + throw(json::Exception) { + const std::string instance_id = environment_.InstanceId(); + const std::string zone = environment_.InstanceZone(); + const std::string cluster_name = environment_.KubernetesClusterName(); + + const json::Object* metadata = pod->Get("metadata"); + const std::string namespace_name = metadata->Get("namespace"); + const std::string pod_name = metadata->Get("name"); + const std::string pod_id = metadata->Get("uid"); + + const json::Object* status = pod->Get("status"); + + const json::Array* container_statuses = + status->Get("containerStatuses"); + + const json::value& c_status = (*container_statuses)[container_index]; + if (config_.VerboseLogging()) { + LOG(INFO) << "Container: " << *c_status; + } + const json::Object* container = c_status->As(); + const std::string container_name = container->Get("name"); + + const MonitoredResource gke_container("gke_container", { + {"cluster_name", cluster_name}, + {"namespace_id", namespace_name}, + {"instance_id", instance_id}, + {"pod_id", pod_id}, + {"container_name", container_name}, + {"zone", zone}, + }); + + const std::string gke_container_pod_id = boost::algorithm::join( + std::vector{kGkeContainerResourcePrefix, namespace_name, pod_id, container_name}, + kResourceTypeSeparator); + const std::string gke_container_name = boost::algorithm::join( + std::vector{kGkeContainerNameResourcePrefix, namespace_name, pod_name, container_name}, + kResourceTypeSeparator); + return MetadataUpdater::ResourceMetadata( + std::vector{gke_container_pod_id, gke_container_name}, + gke_container, + MetadataAgent::Metadata::IGNORED()); +} + +std::vector +KubernetesReader::GetPodAndContainerMetadata( + const json::Object* pod, Timestamp collected_at) const + throw(json::Exception) { + std::vector result; + + json::value associations = ComputePodAssociations(pod); + + const json::Object* metadata = pod->Get("metadata"); + const std::string pod_name = metadata->Get("name"); + const std::string pod_id = metadata->Get("uid"); + + const json::Object* spec = pod->Get("spec"); + const std::string node_name = spec->Get("nodeName"); + + const json::Object* status = pod->Get("status"); + + const json::Array* container_specs = spec->Get("containers"); + const json::Array* container_statuses = + status->Get("containerStatuses"); + std::size_t num_containers = std::min( + container_statuses->size(), container_specs->size()); + + for (int i = 0; i < num_containers; ++i) { + const json::value& c_status = (*container_statuses)[i]; + const json::value& c_spec = (*container_specs)[i]; + if (config_.VerboseLogging()) { + LOG(INFO) << "Container: " << *c_status; + } + const json::Object* container = c_status->As(); + const json::Object* container_spec = c_spec->As(); + const std::string container_name = container->Get("name"); + const std::string spec_name = container_spec->Get("name"); + if (container_name != spec_name) { + LOG(ERROR) << "Internal error; container name " << container_name + << " not the same as spec name " << spec_name + << " at index " << i; + } + result.emplace_back(GetLegacyResource(pod, i)); + result.emplace_back( + GetContainerMetadata(pod, i, associations->Clone(), collected_at)); + } + + result.emplace_back( + GetPodMetadata(pod->Clone(), std::move(associations), collected_at)); + return std::move(result); +} + +std::vector + KubernetesReader::MetadataQuery() const { + if (config_.VerboseLogging()) { + LOG(INFO) << "Kubernetes Query called"; + } + std::vector result; + + const std::string node_name = CurrentNode(); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Current node is " << node_name; + } + + try { + json::value raw_node = QueryMaster( + std::string(kKubernetesEndpointPath) + "/nodes/" + node_name); + Timestamp collected_at = std::chrono::system_clock::now(); + + result.emplace_back(GetNodeMetadata(std::move(raw_node), collected_at)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const QueryException& e) { @@ -183,26 +480,16 @@ std::vector const std::string api_version = podlist_object->Get("apiVersion"); const json::Array* pod_list = podlist_object->Get("items"); - for (const json::value& element : *pod_list) { + for (const json::value& raw_pod : *pod_list) { try { if (config_.VerboseLogging()) { - LOG(INFO) << "Pod: " << *element; + LOG(INFO) << "Pod: " << *raw_pod; } - const json::Object* pod = element->As(); + const json::Object* pod = raw_pod->As(); const json::Object* metadata = pod->Get("metadata"); - const std::string namespace_name = - metadata->Get("namespace"); const std::string pod_name = metadata->Get("name"); const std::string pod_id = metadata->Get("uid"); - const std::string created_str = - metadata->Get("creationTimestamp"); - Timestamp created_at = rfc3339::FromString(created_str); - const json::Object* labels = metadata->Get("labels"); - - const json::Object* status = pod->Get("status"); - const std::string started_str = status->Get("startTime"); - Timestamp started_at = rfc3339::FromString(started_str); const json::Object* spec = pod->Get("spec"); const std::string pod_node_name = spec->Get("nodeName"); @@ -211,188 +498,23 @@ std::vector << " not the same as agent node " << node_name; } - const json::value top_level = - FindTopLevelOwner(namespace_name, pod->Clone()); - const json::Object* top_level_controller = - top_level->As(); - const json::Object* top_level_metadata = - top_level_controller->Get("metadata"); - const std::string top_level_name = - top_level_metadata->Get("name"); - if (!top_level_controller->Has("kind") && - top_level_metadata->Get("uid") != pod_id) { - LOG(ERROR) << "Internal error; top-level controller without 'kind' " - << *top_level_controller - << " not the same as pod " << *pod; - } - const std::string top_level_kind = - top_level_controller->Has("kind") - ? top_level_controller->Get("kind") - : "Pod"; - - const MonitoredResource k8s_pod("k8s_pod", { - {"cluster_name", cluster_name}, - {"namespace_name", namespace_name}, - {"node_name", node_name}, - {"pod_name", pod_name}, - {"location", zone}, - }); - - // TODO: find pod_deleted. - //const json::Object* status = pod->Get("status"); - bool pod_deleted = false; - - json::value associations = json::object({ - {"version", json::string(kRawContentVersion)}, - {"raw", json::object({ - {"providerPlatform", json::string(platform)}, - {"controllers", json::object({ - {"topLevelControllerType", json::string(top_level_kind)}, - {"topLevelControllerName", json::string(top_level_name)}, - })}, - })}, - }); - json::value pod_raw_metadata = json::object({ - {"blobs", json::object({ - {"association", associations->Clone()}, - {"api", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", pod->Clone()}, - })}, - })}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw pod metadata: " << *pod_raw_metadata; - } - - const std::string k8s_pod_id = boost::algorithm::join( - std::vector{kK8sPodResourcePrefix, namespace_name, pod_id}, - kResourceTypeSeparator); - const std::string k8s_pod_name = boost::algorithm::join( - std::vector{kK8sPodNameResourcePrefix, namespace_name, pod_name}, - kResourceTypeSeparator); - result.emplace_back( - std::vector{k8s_pod_id, k8s_pod_name}, - k8s_pod, -#ifdef ENABLE_KUBERNETES_METADATA - MetadataAgent::Metadata(kRawContentVersion, - pod_deleted, created_at, collected_at, - std::move(pod_raw_metadata)) -#else - MetadataAgent::Metadata::IGNORED() -#endif - ); + const json::Object* status = pod->Get("status"); const json::Array* container_specs = spec->Get("containers"); - const json::Array* container_list = + const json::Array* container_statuses = status->Get("containerStatuses"); - - if (container_specs->size() != container_list->size()) { + if (container_specs->size() != container_statuses->size()) { LOG(ERROR) << "Container specs and statuses arrays " << "have different sizes: " << container_specs->size() << " vs " - << container_list->size() << " for pod " + << container_statuses->size() << " for pod " << pod_id << "(" << pod_name << ")"; } - std::size_t num_containers = std::min( - container_list->size(), container_specs->size()); - - for (int i = 0; i < num_containers; ++i) { - const json::value& c_element = (*container_list)[i]; - const json::value& c_spec = (*container_specs)[i]; - if (config_.VerboseLogging()) { - LOG(INFO) << "Container: " << *c_element; - } - const json::Object* container = c_element->As(); - const json::Object* container_spec = c_spec->As(); - const std::string container_name = - container->Get("name"); - std::size_t docker_prefix_end = sizeof(kDockerIdPrefix) - 1; - if (container->Get("containerID").compare( - 0, docker_prefix_end, kDockerIdPrefix) != 0) { - LOG(ERROR) << "ContainerID " - << container->Get("containerID") - << " does not start with " << kDockerIdPrefix - << " (" << docker_prefix_end << " chars)"; - docker_prefix_end = 0; - } - const std::string container_id = - container->Get("containerID").substr( - docker_prefix_end); - // TODO: find is_deleted. - //const json::Object* state = container->Get("state"); - bool is_deleted = false; - - const MonitoredResource gke_container("gke_container", { - {"cluster_name", cluster_name}, - {"namespace_id", namespace_name}, - {"instance_id", instance_id}, - {"pod_id", pod_id}, - {"container_name", container_name}, - {"zone", zone}, - }); - - const std::string gke_container_id = boost::algorithm::join( - std::vector{kGkeContainerResourcePrefix, namespace_name, pod_id, container_name}, - kResourceTypeSeparator); - const std::string gke_container_name = boost::algorithm::join( - std::vector{kGkeContainerNameResourcePrefix, namespace_name, pod_name, container_name}, - kResourceTypeSeparator); - result.emplace_back( - std::vector{gke_container_id, gke_container_name}, - gke_container, - MetadataAgent::Metadata::IGNORED()); - - const MonitoredResource k8s_container("k8s_container", { - {"cluster_name", cluster_name}, - {"namespace_name", namespace_name}, - {"node_name", node_name}, - {"pod_name", pod_name}, - {"container_name", container_name}, - {"location", zone}, - }); - - json::value container_raw_metadata = json::object({ - {"blobs", json::object({ - {"association", associations->Clone()}, - {"spec", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", container_spec->Clone()}, - })}, - {"status", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", container->Clone()}, - })}, - {"labels", json::object({ - {"version", json::string(kKubernetesApiVersion)}, - {"raw", labels->Clone()}, - })}, - })}, - }); - if (config_.VerboseLogging()) { - LOG(INFO) << "Raw container metadata: " << *container_raw_metadata; - } - - const std::string k8s_container_id = boost::algorithm::join( - std::vector{kK8sContainerResourcePrefix, container_id}, - kResourceTypeSeparator); - const std::string k8s_container_pod = boost::algorithm::join( - std::vector{kK8sContainerResourcePrefix, pod_id, container_name}, - kResourceTypeSeparator); - const std::string k8s_container_name = boost::algorithm::join( - std::vector{kK8sContainerNameResourcePrefix, namespace_name, pod_name, container_name}, - kResourceTypeSeparator); - result.emplace_back( - std::vector{k8s_container_id, k8s_container_pod, k8s_container_name}, - k8s_container, -#ifdef ENABLE_KUBERNETES_METADATA - MetadataAgent::Metadata(kKubernetesApiVersion, - is_deleted, created_at, collected_at, - std::move(container_raw_metadata)) -#else - MetadataAgent::Metadata::IGNORED() -#endif - ); + + std::vector full_pod_metadata = + GetPodAndContainerMetadata(pod, collected_at); + for (MetadataUpdater::ResourceMetadata& metadata : full_pod_metadata) { + result.emplace_back(std::move(metadata)); } } catch (const json::Exception& e) { LOG(ERROR) << e.what(); diff --git a/src/kubernetes.h b/src/kubernetes.h index d0f5d452..bccf6700 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -35,7 +35,7 @@ class KubernetesReader { public: KubernetesReader(const MetadataAgentConfiguration& config); // A Kubernetes metadata query function. - std::vector MetadataQuery() const; + std::vector MetadataQuery() const; private: // A representation of all query-related errors. @@ -47,6 +47,31 @@ class KubernetesReader { std::string explanation_; }; + // Compute the associations for a given pod. + json::value ComputePodAssociations(const json::Object* pod) const + throw(json::Exception); + // Given a node object, return the associated metadata. + MetadataUpdater::ResourceMetadata GetNodeMetadata( + json::value raw_node, Timestamp collected_at) const + throw(json::Exception); + // Given a pod object, return the associated metadata. + MetadataUpdater::ResourceMetadata GetPodMetadata( + json::value raw_pod, json::value associations, Timestamp collected_at) + const throw(json::Exception); + // Given a pod object and container index, return the container metadata. + MetadataUpdater::ResourceMetadata GetContainerMetadata( + const json::Object* pod, int container_index, json::value associations, + Timestamp collected_at) const throw(json::Exception); + // Given a pod object and container index, return the legacy resource. + // The returned "metadata" field will be Metadata::IGNORED. + MetadataUpdater::ResourceMetadata GetLegacyResource( + const json::Object* pod, int container_index) const + throw(json::Exception); + // Given a pod object, return the associated pod and container metadata. + std::vector GetPodAndContainerMetadata( + const json::Object* pod, Timestamp collected_at) const + throw(json::Exception); + // Issues a Kubernetes master API query at a given path and // returns a parsed JSON response. The path has to start with "/". json::value QueryMaster(const std::string& path) const @@ -90,6 +115,16 @@ class KubernetesReader { Environment environment_; }; +class KubernetesUpdater : public PollingMetadataUpdater { + public: + KubernetesUpdater(MetadataAgent* server) + : reader_(server->config()), PollingMetadataUpdater( + server, server->config().KubernetesUpdaterIntervalSeconds(), + std::bind(&google::KubernetesReader::MetadataQuery, &reader_)) { } + private: + KubernetesReader reader_; +}; + } #endif // KUBERNETES_H_ diff --git a/src/metadatad.cc b/src/metadatad.cc index 30d72dc5..a2645b0a 100644 --- a/src/metadatad.cc +++ b/src/metadatad.cc @@ -22,7 +22,6 @@ #include "configuration.h" #include "docker.h" #include "kubernetes.h" -#include "updater.h" int main(int ac, char** av) { google::MetadataAgentConfiguration config; @@ -33,21 +32,11 @@ int main(int ac, char** av) { google::MetadataAgent server(config); - google::DockerReader docker(config); - google::PollingMetadataUpdater docker_updater( - config.DockerUpdaterIntervalSeconds(), &server, - std::bind(&google::DockerReader::MetadataQuery, &docker)); + google::DockerUpdater docker_updater(&server); + google::KubernetesUpdater kubernetes_updater(&server); - google::KubernetesReader kubernetes(config); - google::PollingMetadataUpdater kubernetes_updater( - config.KubernetesUpdaterIntervalSeconds(), &server, - std::bind(&google::KubernetesReader::MetadataQuery, &kubernetes)); + docker_updater.start(); + kubernetes_updater.start(); - if (config.DockerUpdaterIntervalSeconds() > 0) { - docker_updater.start(); - } - if (config.KubernetesUpdaterIntervalSeconds() > 0) { - kubernetes_updater.start(); - } server.start(); } diff --git a/src/updater.cc b/src/updater.cc index f3875d7d..4e3b7df4 100644 --- a/src/updater.cc +++ b/src/updater.cc @@ -22,11 +22,16 @@ namespace google { +MetadataUpdater::MetadataUpdater(MetadataAgent* store) + : store_(store) {} + +MetadataUpdater::~MetadataUpdater() {} + PollingMetadataUpdater::PollingMetadataUpdater( - double period_s, MetadataAgent* store, + MetadataAgent* store, double period_s, std::function()> query_metadata) - : period_(period_s), - store_(store), + : MetadataUpdater(store), + period_(period_s), query_metadata_(query_metadata), timer_(), reporter_thread_() {} @@ -39,16 +44,18 @@ PollingMetadataUpdater::~PollingMetadataUpdater() { void PollingMetadataUpdater::start() { timer_.lock(); - if (store_->config().VerboseLogging()) { + if (config().VerboseLogging()) { LOG(INFO) << "Timer locked"; } - reporter_thread_ = - std::thread(&PollingMetadataUpdater::PollForMetadata, this); + if (period_ > seconds::zero()) { + reporter_thread_ = + std::thread(&PollingMetadataUpdater::PollForMetadata, this); + } } void PollingMetadataUpdater::stop() { timer_.unlock(); - if (store_->config().VerboseLogging()) { + if (config().VerboseLogging()) { LOG(INFO) << "Timer unlocked"; } } @@ -58,11 +65,11 @@ void PollingMetadataUpdater::PollForMetadata() { do { std::vector result_vector = query_metadata_(); for (ResourceMetadata& result : result_vector) { - store_->UpdateResource( - result.ids, result.resource, std::move(result.metadata)); + UpdateResourceCallback(result); + UpdateMetadataCallback(std::move(result)); } // An unlocked timer means we should stop updating. - if (store_->config().VerboseLogging()) { + if (config().VerboseLogging()) { LOG(INFO) << "Trying to unlock the timer"; } auto start = std::chrono::high_resolution_clock::now(); @@ -74,7 +81,7 @@ void PollingMetadataUpdater::PollForMetadata() { if (now < wakeup) { continue; } - if (store_->config().VerboseLogging()) { + if (config().VerboseLogging()) { LOG(INFO) << " Timer unlock timed out after " << std::chrono::duration_cast(now - start).count() << "s (good)"; @@ -84,7 +91,7 @@ void PollingMetadataUpdater::PollForMetadata() { done = false; } } while (!done); - if (store_->config().VerboseLogging()) { + if (config().VerboseLogging()) { LOG(INFO) << "Timer unlocked (stop polling)"; } } diff --git a/src/updater.h b/src/updater.h index 477d1df1..5bf2896a 100644 --- a/src/updater.h +++ b/src/updater.h @@ -29,29 +29,60 @@ namespace google { -// A class for all periodic updates of the metadata mapping. -// Specific metadata updaters will be instances of this class. -class PollingMetadataUpdater { +// An abstract class for asynchronous updates of the metadata mapping. +class MetadataUpdater { public: struct ResourceMetadata { ResourceMetadata(const std::vector& ids_, const MonitoredResource& resource_, MetadataAgent::Metadata&& metadata_) : ids(ids_), resource(resource_), metadata(std::move(metadata_)) {} + ResourceMetadata(ResourceMetadata&& other) + : ResourceMetadata(other.ids, other.resource, + std::move(other.metadata)) {} std::vector ids; MonitoredResource resource; MetadataAgent::Metadata metadata; }; + MetadataUpdater(MetadataAgent* store); + virtual ~MetadataUpdater(); + + const MetadataAgentConfiguration& config() { + return store_->config(); + } + + // Starts updating. + virtual void start() = 0; + + // Stops updating. + virtual void stop() = 0; + + protected: + // Updates the resource map in the store. + void UpdateResourceCallback(const ResourceMetadata& result) { + store_->UpdateResource(result.ids, result.resource); + } + + // Updates the metadata in the store. Consumes result. + void UpdateMetadataCallback(ResourceMetadata&& result) { + store_->UpdateMetadata(result.resource, std::move(result.metadata)); + } + + private: + // The store for the metadata. + MetadataAgent* store_; +}; + +// A class for all periodic updates of the metadata mapping. +class PollingMetadataUpdater : public MetadataUpdater { + public: PollingMetadataUpdater( - double period_s, MetadataAgent* store, + MetadataAgent* store, double period_s, std::function()> query_metadata); ~PollingMetadataUpdater(); - // Starts updating. void start(); - - // Stops updating. void stop(); private: @@ -62,9 +93,6 @@ class PollingMetadataUpdater { // The polling period in seconds. seconds period_; - // The store for the metadata. - MetadataAgent* store_; - // The function to actually query for metadata. std::function()> query_metadata_;