diff --git a/src/Makefile b/src/Makefile index 44cdcc88..edf6298c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -63,8 +63,8 @@ install: metadatad export DISTRO PKG_NAME=stackdriver-metadata -PKG_VERSION=0.0.13 -PKG_RELEASE=5 +PKG_VERSION=0.0.14 +PKG_RELEASE=1 PKG_MAINTAINER=Stackdriver Engineering DOCKER_VERSION=0.2 diff --git a/src/configuration.cc b/src/configuration.cc index 364ff951..37d29448 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -46,6 +46,8 @@ constexpr const int kKubernetesUpdaterDefaultIntervalSeconds = 60; constexpr const char kKubernetesDefaultEndpointHost[] = "https://kubernetes"; constexpr const char kKubernetesDefaultPodLabelSelector[] = ""; constexpr const char kKubernetesDefaultClusterName[] = ""; +constexpr const char kKubernetesDefaultNodeName[] = ""; +constexpr const bool kKubernetesDefaultUseWatch = true; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; @@ -72,6 +74,8 @@ MetadataAgentConfiguration::MetadataAgentConfiguration() kubernetes_endpoint_host_(kKubernetesDefaultEndpointHost), kubernetes_pod_label_selector_(kKubernetesDefaultPodLabelSelector), kubernetes_cluster_name_(kKubernetesDefaultClusterName), + kubernetes_node_name_(kKubernetesDefaultNodeName), + kubernetes_use_watch_(kKubernetesDefaultUseWatch), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone) {} @@ -154,6 +158,10 @@ void MetadataAgentConfiguration::ParseConfigFile(const std::string& filename) { kubernetes_cluster_name_ = config["KubernetesClusterName"].as( kKubernetesDefaultClusterName); + kubernetes_node_name_ = + config["KubernetesNodeName"].as(kKubernetesDefaultNodeName); + kubernetes_use_watch_ = + config["KubernetesUseWatch"].as(kKubernetesDefaultUseWatch); instance_id_ = config["InstanceId"].as(kDefaultInstanceId); instance_zone_ = diff --git a/src/configuration.h b/src/configuration.h index 64e18e6f..c38da470 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -95,6 +95,14 @@ class MetadataAgentConfiguration { std::lock_guard lock(mutex_); return kubernetes_cluster_name_; } + const std::string& KubernetesNodeName() const { + std::lock_guard lock(mutex_); + return kubernetes_node_name_; + } + bool KubernetesUseWatch() const { + std::lock_guard lock(mutex_); + return kubernetes_use_watch_; + } // Common metadata updater options. const std::string& InstanceId() const { std::lock_guard lock(mutex_); @@ -125,6 +133,8 @@ class MetadataAgentConfiguration { std::string kubernetes_endpoint_host_; std::string kubernetes_pod_label_selector_; std::string kubernetes_cluster_name_; + std::string kubernetes_node_name_; + bool kubernetes_use_watch_; std::string instance_id_; std::string instance_zone_; }; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 7c7c527b..04125295 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -20,11 +20,13 @@ #include #include #include +#include #include #include #include #include +#include "http_common.h" #include "json.h" #include "logging.h" #include "resource.h" @@ -56,6 +58,8 @@ constexpr const char kK8sNodeNameResourcePrefix[] = "k8s_nodeName"; constexpr const char kNodeSelectorPrefix[] = "?fieldSelector=spec.nodeName%3D"; +constexpr const char kWatchParam[] = "watch=true"; + constexpr const char kDockerIdPrefix[] = "docker://"; constexpr const char kServiceAccountDirectory[] = @@ -551,6 +555,211 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const } } +namespace { +struct Watcher { + Watcher(std::function callback, + std::unique_lock&& completion, bool verbose) + : completion_(std::move(completion)), callback_(callback), + remaining_chunk_bytes_(0), verbose_(verbose) {} + ~Watcher() {} // Unlocks the completion_ lock. + void operator()(const boost::iterator_range& range, + const boost::system::error_code& error) { + if (!error) { +//#ifdef VERBOSE +// LOG(DEBUG) << "Watch notification: '" +// << std::string(std::begin(range), std::end(range)) +// << "'"; +//#endif + boost::iterator_range pos = range; + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); +//#ifdef VERBOSE +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; +//#endif + } + + if (remaining_chunk_bytes_ == 0) { + // Invoke the callback. + CompleteChunk(); + + // Process the next batch. + while (remaining_chunk_bytes_ == 0 && + std::begin(pos) != std::end(pos)) { + pos = StartNewChunk(pos); + } +//#ifdef VERBOSE +// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ +// << " bytes remaining"; +//#endif + + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); +//#ifdef VERBOSE +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; +//#endif + } + } + } else { + if (error == boost::asio::error::eof) { +#ifdef VERBOSE + LOG(DEBUG) << "Watch callback: EOF"; +#endif + } else { + LOG(ERROR) << "Callback got error " << error; + } + if (verbose_) { + LOG(INFO) << "Unlocking completion mutex"; + } + completion_.unlock(); + } + } + + private: + boost::iterator_range + StartNewChunk(const boost::iterator_range& range) { + if (remaining_chunk_bytes_ != 0) { + LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_ + << " bytes remaining"; + } + + body_.clear(); + + const std::string crlf("\r\n"); + auto begin = std::begin(range); + auto end = std::end(range); + auto iter = std::search(begin, end, crlf.begin(), crlf.end()); + if (iter == begin) { + // Blank lines are fine, just skip them. + iter = std::next(iter, crlf.size()); +#ifdef VERBOSE + LOG(DEBUG) << "Skipping blank line within chunked encoding;" + << " remaining data '" << std::string(iter, end) << "'"; +#endif + return boost::iterator_range(iter, end); + } else if (iter == end) { + LOG(ERROR) << "Invalid chunked encoding: '" + << std::string(begin, end) + << "'"; + return boost::iterator_range(begin, end); + } + std::string line(begin, iter); + iter = std::next(iter, crlf.size()); +//#ifdef VERBOSE +// LOG(DEBUG) << "Line: '" << line << "'"; +//#endif + std::stringstream stream(line); + stream >> std::hex >> remaining_chunk_bytes_; + return boost::iterator_range(iter, end); + } + + boost::iterator_range + ReadNextChunk(const boost::iterator_range& range) { + if (remaining_chunk_bytes_ == 0) { + LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; + return range; + } + + const std::string crlf("\r\n"); + auto begin = std::begin(range); + auto end = std::end(range); + // The available bytes in the current notification, which may include the + // remainder of this chunk and the start of the next one. + const size_t available = std::distance(begin, end); + const size_t len = std::min(available, remaining_chunk_bytes_); + body_.insert(body_.end(), begin, begin + len); + remaining_chunk_bytes_ -= len; + begin = std::next(begin, len); + return boost::iterator_range(begin, end); + } + + void CompleteChunk() { + if (body_.empty()) { +#ifdef VERBOSE + LOG(DEBUG) << "Skipping empty watch notification"; +#endif + } else { + try { +//#ifdef VERBOSE +// LOG(DEBUG) << "Invoking callbacks on '" << body_ << "'"; +//#endif + std::vector events = json::Parser::AllFromString(body_); + for (json::value& event : events) { + std::string event_str = event->ToString(); +#ifdef VERBOSE + LOG(DEBUG) << "Invoking callback('" << event_str << "')"; +#endif + callback_(std::move(event)); +#ifdef VERBOSE + LOG(DEBUG) << "callback('" << event_str << "') completed"; +#endif + } +//#ifdef VERBOSE +// LOG(DEBUG) << "All callbacks on '" << body_ << "' completed"; +//#endif + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } + } + } + + std::unique_lock completion_; + std::function callback_; + std::string body_; + size_t remaining_chunk_bytes_; + bool verbose_; +}; +} + +void KubernetesReader::WatchMaster( + const std::string& path, std::function callback) const + throw(QueryException, json::Exception) { + const std::string prefix((path.find('?') == std::string::npos) ? "?" : "&"); + const std::string watch_param(prefix + kWatchParam); + const std::string endpoint( + config_.KubernetesEndpointHost() + path + watch_param); + http::client client; + http::client::request request(endpoint); + request << boost::network::header( + "Authorization", "Bearer " + KubernetesApiToken()); + if (config_.VerboseLogging()) { + LOG(INFO) << "WatchMaster: Contacting " << endpoint; + } + try { + if (config_.VerboseLogging()) { + LOG(INFO) << "Locking completion mutex"; + } + // A notification for watch completion. + std::mutex completion_mutex; + std::unique_lock watch_completion(completion_mutex); + Watcher watcher(callback, std::move(watch_completion), + config_.VerboseLogging()); + http::client::response response = client.get(request, boost::ref(watcher)); + if (config_.VerboseLogging()) { + LOG(INFO) << "Waiting for completion"; + } + std::lock_guard await_completion(completion_mutex); + if (config_.VerboseLogging()) { + LOG(INFO) << "WatchMaster completed " << body(response); + } + std::string encoding; +#ifdef VERBOSE + LOG(DEBUG) << "response headers: " << response.headers(); +#endif + auto transfer_encoding_header = headers(response)["Transfer-Encoding"]; + if (!boost::empty(transfer_encoding_header)) { + encoding = boost::begin(transfer_encoding_header)->second; + } + if (encoding != "chunked") { + LOG(ERROR) << "Expected chunked encoding; found '" << encoding << "'"; + } + } catch (const boost::system::system_error& e) { + LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); + throw QueryException(endpoint + " -> " + e.what()); + } +} + const std::string& KubernetesReader::KubernetesApiToken() const { std::lock_guard lock(mutex_); if (kubernetes_api_token_.empty()) { @@ -576,21 +785,25 @@ const std::string& KubernetesReader::KubernetesNamespace() const { const std::string& KubernetesReader::CurrentNode() const { std::lock_guard lock(mutex_); if (current_node_.empty()) { - const std::string& ns = KubernetesNamespace(); - // TODO: This is unreliable, see - // https://github.com/kubernetes/kubernetes/issues/52162. - const std::string pod_name = boost::asio::ip::host_name(); - try { - json::value pod_response = QueryMaster( - std::string(kKubernetesEndpointPath) + - "/namespaces/" + ns + "/pods/" + pod_name); - const json::Object* pod = pod_response->As(); - const json::Object* spec = pod->Get("spec"); - current_node_ = spec->Get("nodeName"); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - } catch (const QueryException& e) { - // Already logged. + if (!config_.KubernetesNodeName().empty()) { + current_node_ = config_.KubernetesNodeName(); + } else { + const std::string& ns = KubernetesNamespace(); + // TODO: This is unreliable, see + // https://github.com/kubernetes/kubernetes/issues/52162. + const std::string pod_name = boost::asio::ip::host_name(); + try { + json::value pod_response = QueryMaster( + std::string(kKubernetesEndpointPath) + + "/namespaces/" + ns + "/pods/" + pod_name); + const json::Object* pod = pod_response->As(); + const json::Object* spec = pod->Get("spec"); + current_node_ = spec->Get("nodeName"); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const QueryException& e) { + // Already logged. + } } } return current_node_; @@ -719,4 +932,111 @@ json::value KubernetesReader::FindTopLevelOwner( return FindTopLevelOwner(ns, GetOwner(ns, ref->As())); } +void KubernetesReader::PodCallback(MetadataUpdater::UpdateCallback callback, + json::value raw_watch) const + throw(json::Exception) { + Timestamp collected_at = std::chrono::system_clock::now(); + + //LOG(ERROR) << "Watch callback: " << *raw_watch; + const json::Object* watch = raw_watch->As(); + const std::string type = watch->Get("type"); + const json::Object* pod = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *pod; + if (type == "MODIFIED" || type == "ADDED") { + std::vector result_vector = + GetPodAndContainerMetadata(pod, collected_at); + callback(std::move(result_vector)); + } +} + +void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) + const { + LOG(INFO) << "Watch thread (pods) started"; + + const std::string node_name = CurrentNode(); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Current node is " << node_name; + } + + const std::string node_selector(kNodeSelectorPrefix + node_name); + const std::string pod_label_selector( + config_.KubernetesPodLabelSelector().empty() + ? "" : "&" + config_.KubernetesPodLabelSelector()); + + try { + WatchMaster(std::string(kKubernetesEndpointPath) + "/pods" + + node_selector + pod_label_selector, + std::bind(&KubernetesReader::PodCallback, + this, callback, std::placeholders::_1)); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const KubernetesReader::QueryException& e) { + // Already logged. + } + LOG(INFO) << "Watch thread (pods) exiting"; +} + +void KubernetesReader::NodeCallback(MetadataUpdater::UpdateCallback callback, + json::value raw_watch) const + throw(json::Exception) { + Timestamp collected_at = std::chrono::system_clock::now(); + + //LOG(ERROR) << "Watch callback: " << *raw_watch; + const json::Object* watch = raw_watch->As(); + const std::string type = watch->Get("type"); + const json::Object* node = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *node; + if (type == "MODIFIED" || type == "ADDED") { + std::vector result_vector; + result_vector.emplace_back(GetNodeMetadata(node->Clone(), collected_at)); + callback(std::move(result_vector)); + } +} + +void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) + const { + LOG(INFO) << "Watch thread (node) started"; + + const std::string node_name = CurrentNode(); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Current node is " << node_name; + } + + try { + // TODO: There seems to be a Kubernetes API bug with watch=true. + WatchMaster(std::string(kKubernetesEndpointPath) + "/watch/nodes/" + + node_name, + std::bind(&KubernetesReader::NodeCallback, + this, callback, std::placeholders::_1)); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const KubernetesReader::QueryException& e) { + // Already logged. + } + LOG(INFO) << "Watch thread (node) exiting"; +} + +void KubernetesUpdater::start() { + PollingMetadataUpdater::start(); + if (config().KubernetesUseWatch()) { + // Wrap the bind expression into a function to use as a bind argument. + UpdateCallback cb = std::bind(&KubernetesUpdater::MetadataCallback, this, + std::placeholders::_1); + node_watch_thread_ = + std::thread(&KubernetesReader::WatchNode, &reader_, cb); + pod_watch_thread_ = + std::thread(&KubernetesReader::WatchPods, &reader_, cb); + } +} + +void KubernetesUpdater::MetadataCallback( + std::vector&& result_vector) { + for (MetadataUpdater::ResourceMetadata& result : result_vector) { + UpdateResourceCallback(result); + UpdateMetadataCallback(std::move(result)); + } +} + } diff --git a/src/kubernetes.h b/src/kubernetes.h index 428f6058..d669be92 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -18,6 +18,7 @@ //#include "config.h" +#include #include #include #include @@ -37,6 +38,12 @@ class KubernetesReader { // A Kubernetes metadata query function. std::vector MetadataQuery() const; + // Node watcher. + void WatchNode(MetadataUpdater::UpdateCallback callback) const; + + // Pod watcher. + void WatchPods(MetadataUpdater::UpdateCallback callback) const; + private: // A representation of all query-related errors. class QueryException { @@ -47,6 +54,28 @@ class KubernetesReader { std::string explanation_; }; + // Issues a Kubernetes master API query at a given path and + // returns a parsed JSON response. The path has to start with "/". + json::value QueryMaster(const std::string& path) const + throw(QueryException, json::Exception); + + // Issues a Kubernetes master API query at a given path and + // watches for parsed JSON responses. The path has to start with "/". + // Invokes callback for every notification. + void WatchMaster( + const std::string& path, std::function callback) const + throw(QueryException, json::Exception); + + // Node watcher callback. + void NodeCallback( + MetadataUpdater::UpdateCallback callback, json::value result) const + throw(json::Exception); + + // Pod watcher callback. + void PodCallback( + MetadataUpdater::UpdateCallback callback, json::value result) const + throw(json::Exception); + // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); @@ -72,11 +101,6 @@ class KubernetesReader { const json::Object* pod, Timestamp collected_at) const throw(json::Exception); - // Issues a Kubernetes master API query at a given path and - // returns a parsed JSON response. The path has to start with "/". - json::value QueryMaster(const std::string& path) const - throw(QueryException, json::Exception); - // Gets the name of the node the agent is running on. // Returns an empty string if unable to find the current node. const std::string& CurrentNode() const; @@ -123,8 +147,24 @@ class KubernetesUpdater : public PollingMetadataUpdater { : reader_(server->config()), PollingMetadataUpdater( server, server->config().KubernetesUpdaterIntervalSeconds(), std::bind(&google::KubernetesReader::MetadataQuery, &reader_)) { } + ~KubernetesUpdater() { + if (node_watch_thread_.joinable()) { + node_watch_thread_.join(); + } + if (pod_watch_thread_.joinable()) { + pod_watch_thread_.join(); + } + } + + void start(); + private: + // Metadata watcher callback. + void MetadataCallback(std::vector&& result_vector); + KubernetesReader reader_; + std::thread node_watch_thread_; + std::thread pod_watch_thread_; }; } diff --git a/src/updater.h b/src/updater.h index 5bf2896a..d4878962 100644 --- a/src/updater.h +++ b/src/updater.h @@ -19,6 +19,7 @@ //#include "config.h" #include +#include #include #include #include @@ -58,6 +59,9 @@ class MetadataUpdater { // Stops updating. virtual void stop() = 0; + using UpdateCallback = + std::function&&)>; + protected: // Updates the resource map in the store. void UpdateResourceCallback(const ResourceMetadata& result) {