diff --git a/src/api_server.cc b/src/api_server.cc index fa14edc3..8d8f9776 100644 --- a/src/api_server.cc +++ b/src/api_server.cc @@ -66,7 +66,7 @@ class MetadataApiServer { class MetadataReporter { public: - MetadataReporter(const MetadataAgent& agent, double period_s); + MetadataReporter(MetadataAgent* agent, double period_s); ~MetadataReporter(); private: @@ -79,7 +79,7 @@ class MetadataReporter { std::map&& metadata) throw (boost::system::system_error); - const MetadataAgent& agent_; + MetadataAgent* agent_; Environment environment_; OAuth2 auth_; // The reporting period in seconds. @@ -151,9 +151,9 @@ MetadataApiServer::~MetadataApiServer() { } } -MetadataReporter::MetadataReporter(const MetadataAgent& agent, double period_s) +MetadataReporter::MetadataReporter(MetadataAgent* agent, double period_s) : agent_(agent), - environment_(agent.config_), + environment_(agent->config_), auth_(environment_), period_(period_s), reporter_thread_(&MetadataReporter::ReportMetadata, this) {} @@ -169,14 +169,17 @@ void MetadataReporter::ReportMetadata() { std::this_thread::sleep_for(std::chrono::seconds(3)); // TODO: Do we need to be able to stop this? while (true) { - if (agent_.config_.VerboseLogging()) { + if (agent_->config_.VerboseLogging()) { LOG(INFO) << "Sending metadata request to server"; } try { - SendMetadata(agent_.GetMetadataMap()); - if (agent_.config_.VerboseLogging()) { + SendMetadata(agent_->GetMetadataMap()); + if (agent_->config_.VerboseLogging()) { LOG(INFO) << "Metadata request sent successfully"; } + if (agent_->config_.MetadataReporterPurgeDeleted()) { + agent_->PurgeDeletedEntries(); + } } catch (const boost::system::system_error& e) { LOG(ERROR) << "Metadata request unsuccessful: " << e.what(); } @@ -229,20 +232,20 @@ void MetadataReporter::SendMetadata( std::map&& metadata) throw (boost::system::system_error) { if (metadata.empty()) { - if (agent_.config_.VerboseLogging()) { + if (agent_->config_.VerboseLogging()) { LOG(INFO) << "No data to send"; } return; } - if (agent_.config_.VerboseLogging()) { + if (agent_->config_.VerboseLogging()) { LOG(INFO) << "Sending request to the server"; } const std::string project_id = environment_.NumericProjectId(); // The endpoint template is expected to be of the form // "https://stackdriver.googleapis.com/.../projects/{{project_id}}/...". const std::string endpoint = - format::Substitute(agent_.config_.MetadataIngestionEndpointFormat(), + format::Substitute(agent_->config_.MetadataIngestionEndpointFormat(), {{"project_id", project_id}}); const std::string auth_header = auth_.GetAuthHeaderValue(); @@ -252,7 +255,7 @@ void MetadataReporter::SendMetadata( const int empty_size = empty_request->ToString().size(); const int limit_bytes = - agent_.config_.MetadataIngestionRequestSizeLimitBytes(); + agent_->config_.MetadataIngestionRequestSizeLimitBytes(); int total_size = empty_size; std::vector entries; @@ -281,7 +284,7 @@ void MetadataReporter::SendMetadata( } if (total_size + size > limit_bytes) { SendMetadataRequest(std::move(entries), endpoint, auth_header, - agent_.config_.VerboseLogging()); + agent_->config_.VerboseLogging()); entries.clear(); total_size = empty_size; } @@ -290,7 +293,7 @@ void MetadataReporter::SendMetadata( } if (!entries.empty()) { SendMetadataRequest(std::move(entries), endpoint, auth_header, - agent_.config_.VerboseLogging()); + agent_->config_.VerboseLogging()); } } @@ -345,12 +348,38 @@ std::map return result; } +void MetadataAgent::PurgeDeletedEntries() { + std::lock_guard lock(metadata_mu_); + + for (auto it = metadata_map_.begin(); it != metadata_map_.end(); ) { + const MonitoredResource& resource = it->first; + const Metadata& entry = it->second; + if (entry.is_deleted) { + if (config_.VerboseLogging()) { + LOG(INFO) << "Purging metadata entry " << resource << "->{" + << "version: " << entry.version << ", " + << "is_deleted: " << entry.is_deleted << ", " + << "created_at: " << rfc3339::ToString(entry.created_at) + << ", " + << "collected_at: " << rfc3339::ToString(entry.collected_at) + << ", " + << "metadata: " << *entry.metadata << ", " + << "ignore: " << entry.ignore + << "}"; + } + it = metadata_map_.erase(it); + } else { + ++it; + } + } +} + void MetadataAgent::start() { metadata_api_server_.reset(new MetadataApiServer( *this, config_.MetadataApiNumThreads(), "0.0.0.0", config_.MetadataApiPort())); reporter_.reset(new MetadataReporter( - *this, config_.MetadataReporterIntervalSeconds())); + this, config_.MetadataReporterIntervalSeconds())); } } diff --git a/src/api_server.h b/src/api_server.h index cb23dc3a..aa704ee7 100644 --- a/src/api_server.h +++ b/src/api_server.h @@ -104,6 +104,7 @@ class MetadataAgent { friend class MetadataReporter; std::map GetMetadataMap() const; + void PurgeDeletedEntries(); const MetadataAgentConfiguration& config_; diff --git a/src/configuration.cc b/src/configuration.cc index ad0e5e8c..6e30db62 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -33,6 +33,7 @@ constexpr const int kMetadataApiDefaultNumThreads = 3; constexpr const int kMetadataApiDefaultPort = 8000; constexpr const char kMetadataApiDefaultResourceTypeSeparator[] = "."; constexpr const int kMetadataReporterDefaultIntervalSeconds = 60; +constexpr const int kMetadataReporterDefaultPurgeDeleted = false; constexpr const char kMetadataIngestionDefaultEndpointFormat[] = "https://stackdriver.googleapis.com/v1beta2/projects/{{project_id}}" "/resourceMetadata:batchUpdate"; @@ -70,6 +71,8 @@ MetadataAgentConfiguration::MetadataAgentConfiguration() kMetadataApiDefaultResourceTypeSeparator), metadata_reporter_interval_seconds_( kMetadataReporterDefaultIntervalSeconds), + metadata_reporter_purge_deleted_( + kMetadataReporterDefaultPurgeDeleted), metadata_ingestion_endpoint_format_( kMetadataIngestionDefaultEndpointFormat), metadata_ingestion_request_size_limit_bytes_( @@ -147,6 +150,9 @@ void MetadataAgentConfiguration::ParseConfigFile(const std::string& filename) { metadata_reporter_interval_seconds_ = config["MetadataReporterIntervalSeconds"].as( kMetadataReporterDefaultIntervalSeconds); + metadata_reporter_purge_deleted_ = + config["MetadataReporterPurgeDeleted"].as( + kMetadataReporterDefaultPurgeDeleted); metadata_ingestion_endpoint_format_ = config["MetadataIngestionEndpointFormat"].as( kMetadataIngestionDefaultEndpointFormat); diff --git a/src/configuration.h b/src/configuration.h index b5a74081..4cd91f52 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -57,6 +57,10 @@ class MetadataAgentConfiguration { std::lock_guard lock(mutex_); return metadata_reporter_interval_seconds_; } + bool MetadataReporterPurgeDeleted() const { + std::lock_guard lock(mutex_); + return metadata_reporter_purge_deleted_; + } const std::string& MetadataIngestionEndpointFormat() const { std::lock_guard lock(mutex_); return metadata_ingestion_endpoint_format_; @@ -145,6 +149,7 @@ class MetadataAgentConfiguration { int metadata_api_port_; std::string metadata_api_resource_type_separator_; int metadata_reporter_interval_seconds_; + bool metadata_reporter_purge_deleted_; std::string metadata_ingestion_endpoint_format_; int metadata_ingestion_request_size_limit_bytes_; std::string metadata_ingestion_raw_content_version_; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 788f2b22..6205d8bc 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -85,7 +85,8 @@ KubernetesReader::KubernetesReader(const MetadataAgentConfiguration& config) : config_(config), environment_(config) {} MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( - json::value raw_node, Timestamp collected_at) const throw(json::Exception) { + json::value raw_node, Timestamp collected_at, bool is_deleted) const + throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); @@ -131,7 +132,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata( k8s_node, #ifdef ENABLE_KUBERNETES_METADATA MetadataAgent::Metadata(config_.MetadataIngestionRawContentVersion(), - /*deleted=*/false, created_at, collected_at, + is_deleted, created_at, collected_at, std::move(node_raw_metadata)) #else MetadataAgent::Metadata::IGNORED() @@ -183,8 +184,8 @@ json::value KubernetesReader::ComputePodAssociations(const json::Object* pod) } MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( - json::value raw_pod, json::value associations, Timestamp collected_at) const - throw(json::Exception) { + json::value raw_pod, json::value associations, Timestamp collected_at, + bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); @@ -209,10 +210,6 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( {"location", location}, }); - // TODO: find is_deleted. - //const json::Object* status = pod->Get("status"); - bool is_deleted = false; - json::value pod_raw_metadata = json::object({ {"blobs", json::object({ {"association", std::move(associations)}, @@ -248,7 +245,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata( MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( const json::Object* pod, const json::Object* container_spec, const json::Object* container_status, json::value associations, - Timestamp collected_at) const throw(json::Exception) { + Timestamp collected_at, bool is_deleted) const throw(json::Exception) { const std::string cluster_name = environment_.KubernetesClusterName(); const std::string location = environment_.KubernetesClusterLocation(); @@ -267,9 +264,6 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( } const std::string container_name = container_spec->Get("name"); - // TODO: find is_deleted. - //const json::Object* state = container_status->Get("state"); - bool is_deleted = false; const MonitoredResource k8s_container("k8s_container", { {"cluster_name", cluster_name}, @@ -323,7 +317,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetContainerMetadata( k8s_container_name, }; - if (container_status) { + if (container_status && container_status->Has("containerID")) { std::size_t docker_prefix_length = sizeof(kDockerIdPrefix) - 1; const std::string docker_id = @@ -391,7 +385,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetLegacyResource( std::vector KubernetesReader::GetPodAndContainerMetadata( - const json::Object* pod, Timestamp collected_at) const + const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { std::vector result; @@ -439,11 +433,12 @@ KubernetesReader::GetPodAndContainerMetadata( result.emplace_back(GetLegacyResource(pod, name)); result.emplace_back( GetContainerMetadata(pod, container_spec, container_status, - associations->Clone(), collected_at)); + associations->Clone(), collected_at, is_deleted)); } result.emplace_back( - GetPodMetadata(pod->Clone(), std::move(associations), collected_at)); + GetPodMetadata(pod->Clone(), std::move(associations), collected_at, + is_deleted)); return std::move(result); } @@ -465,7 +460,8 @@ std::vector std::string(kKubernetesEndpointPath) + "/nodes/" + node_name); Timestamp collected_at = std::chrono::system_clock::now(); - result.emplace_back(GetNodeMetadata(std::move(raw_node), collected_at)); + result.emplace_back(GetNodeMetadata(std::move(raw_node), collected_at, + /*is_deleted=*/false)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const QueryException& e) { @@ -520,8 +516,11 @@ std::vector << pod_id << "(" << pod_name << ")"; } + // TODO: find is_deleted. + //const json::Object* status = pod->Get("status"); + bool is_deleted = false; std::vector full_pod_metadata = - GetPodAndContainerMetadata(pod, collected_at); + GetPodAndContainerMetadata(pod, collected_at, is_deleted); for (MetadataUpdater::ResourceMetadata& metadata : full_pod_metadata) { result.emplace_back(std::move(metadata)); } @@ -569,9 +568,9 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { - Watcher(std::function callback, + Watcher(std::function event_callback, std::unique_lock&& completion, bool verbose) - : completion_(std::move(completion)), callback_(callback), + : completion_(std::move(completion)), event_callback_(event_callback), remaining_chunk_bytes_(0), verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. void operator()(const boost::iterator_range& range, @@ -702,7 +701,7 @@ struct Watcher { #ifdef VERBOSE LOG(DEBUG) << "Invoking callback('" << event_str << "')"; #endif - callback_(std::move(event)); + event_callback_(std::move(event)); #ifdef VERBOSE LOG(DEBUG) << "callback('" << event_str << "') completed"; #endif @@ -717,15 +716,33 @@ struct Watcher { } std::unique_lock completion_; - std::function callback_; + std::function event_callback_; std::string body_; size_t remaining_chunk_bytes_; bool verbose_; }; + +void EventCallback( + std::function callback, + json::value raw_watch) throw(json::Exception) { + Timestamp collected_at = std::chrono::system_clock::now(); + + //LOG(ERROR) << "Watch callback: " << *raw_watch; + const json::Object* watch = raw_watch->As(); + const std::string type = watch->Get("type"); + const json::Object* object = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *object; + if (type != "MODIFIED" && type != "ADDED" && type != "DELETED") { + return; + } + const bool is_deleted = (type == "DELETED"); + callback(object, collected_at, is_deleted); +} } void KubernetesReader::WatchMaster( - const std::string& path, std::function callback) const + const std::string& path, + std::function callback) const throw(QueryException, json::Exception) { const std::string prefix((path.find('?') == std::string::npos) ? "?" : "&"); const std::string watch_param(prefix + kWatchParam); @@ -745,8 +762,8 @@ void KubernetesReader::WatchMaster( // A notification for watch completion. std::mutex completion_mutex; std::unique_lock watch_completion(completion_mutex); - Watcher watcher(callback, std::move(watch_completion), - config_.VerboseLogging()); + Watcher watcher(std::bind(&EventCallback, callback, std::placeholders::_1), + std::move(watch_completion), config_.VerboseLogging()); http::client::response response = client.get(request, boost::ref(watcher)); if (config_.VerboseLogging()) { LOG(INFO) << "Waiting for completion"; @@ -944,21 +961,13 @@ json::value KubernetesReader::FindTopLevelOwner( return FindTopLevelOwner(ns, GetOwner(ns, ref->As())); } -void KubernetesReader::PodCallback(MetadataUpdater::UpdateCallback callback, - json::value raw_watch) const +void KubernetesReader::PodCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { - Timestamp collected_at = std::chrono::system_clock::now(); - - //LOG(ERROR) << "Watch callback: " << *raw_watch; - const json::Object* watch = raw_watch->As(); - const std::string type = watch->Get("type"); - const json::Object* pod = watch->Get("object"); - LOG(ERROR) << "Watch type: " << type << " object: " << *pod; - if (type == "MODIFIED" || type == "ADDED") { - std::vector result_vector = - GetPodAndContainerMetadata(pod, collected_at); - callback(std::move(result_vector)); - } + std::vector result_vector = + GetPodAndContainerMetadata(pod, collected_at, is_deleted); + callback(std::move(result_vector)); } void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) @@ -980,7 +989,8 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) WatchMaster(std::string(kKubernetesEndpointPath) + "/pods" + node_selector + pod_label_selector, std::bind(&KubernetesReader::PodCallback, - this, callback, std::placeholders::_1)); + this, callback, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const KubernetesReader::QueryException& e) { @@ -989,21 +999,14 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) LOG(INFO) << "Watch thread (pods) exiting"; } -void KubernetesReader::NodeCallback(MetadataUpdater::UpdateCallback callback, - json::value raw_watch) const +void KubernetesReader::NodeCallback( + MetadataUpdater::UpdateCallback callback, + const json::Object* node, Timestamp collected_at, bool is_deleted) const throw(json::Exception) { - Timestamp collected_at = std::chrono::system_clock::now(); - - //LOG(ERROR) << "Watch callback: " << *raw_watch; - const json::Object* watch = raw_watch->As(); - const std::string type = watch->Get("type"); - const json::Object* node = watch->Get("object"); - LOG(ERROR) << "Watch type: " << type << " object: " << *node; - if (type == "MODIFIED" || type == "ADDED") { - std::vector result_vector; - result_vector.emplace_back(GetNodeMetadata(node->Clone(), collected_at)); - callback(std::move(result_vector)); - } + std::vector result_vector; + result_vector.emplace_back( + GetNodeMetadata(node->Clone(), collected_at, is_deleted)); + callback(std::move(result_vector)); } void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) @@ -1021,7 +1024,8 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) WatchMaster(std::string(kKubernetesEndpointPath) + "/watch/nodes/" + node_name, std::bind(&KubernetesReader::NodeCallback, - this, callback, std::placeholders::_1)); + this, callback, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const KubernetesReader::QueryException& e) { diff --git a/src/kubernetes.h b/src/kubernetes.h index 601c6e7d..3ed88cdc 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -63,35 +63,36 @@ class KubernetesReader { // watches for parsed JSON responses. The path has to start with "/". // Invokes callback for every notification. void WatchMaster( - const std::string& path, std::function callback) const + const std::string& path, + std::function callback) const throw(QueryException, json::Exception); - // Node watcher callback. + // Node watch callback. void NodeCallback( - MetadataUpdater::UpdateCallback callback, json::value result) const - throw(json::Exception); + MetadataUpdater::UpdateCallback callback, const json::Object* node, + Timestamp collected_at, bool is_deleted) const throw(json::Exception); - // Pod watcher callback. + // Pod watch callback. void PodCallback( - MetadataUpdater::UpdateCallback callback, json::value result) const - throw(json::Exception); + MetadataUpdater::UpdateCallback callback, const json::Object* pod, + Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); // Given a node object, return the associated metadata. MetadataUpdater::ResourceMetadata GetNodeMetadata( - json::value raw_node, Timestamp collected_at) const + json::value raw_node, Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Given a pod object, return the associated metadata. MetadataUpdater::ResourceMetadata GetPodMetadata( - json::value raw_pod, json::value associations, Timestamp collected_at) - const throw(json::Exception); + json::value raw_pod, json::value associations, Timestamp collected_at, + bool is_deleted) const throw(json::Exception); // Given a pod object and container info, return the container metadata. MetadataUpdater::ResourceMetadata GetContainerMetadata( const json::Object* pod, const json::Object* container_spec, const json::Object* container_status, json::value associations, - Timestamp collected_at) const throw(json::Exception); + Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Given a pod object and container name, return the legacy resource. // The returned "metadata" field will be Metadata::IGNORED. MetadataUpdater::ResourceMetadata GetLegacyResource( @@ -99,7 +100,7 @@ class KubernetesReader { throw(json::Exception); // Given a pod object, return the associated pod and container metadata. std::vector GetPodAndContainerMetadata( - const json::Object* pod, Timestamp collected_at) const + const json::Object* pod, Timestamp collected_at, bool is_deleted) const throw(json::Exception); // Gets the name of the node the agent is running on.