diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 4dc20397..025b271d 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -570,10 +570,10 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { Watcher(const std::string& endpoint, - std::function event_callback, + std::function body_callback, std::unique_lock&& completion, bool verbose) : name_("Watcher(" + endpoint + ")"), - completion_(std::move(completion)), event_callback_(event_callback), + completion_(std::move(completion)), body_callback_(body_callback), remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {} ~Watcher() {} // Unlocks the completion_ lock. @@ -616,7 +616,7 @@ struct Watcher { if (remaining_chunk_bytes_ == 0) { // Invoke the callback. - CompleteChunk(); + body_callback_(body_); // Process the next batch. while (remaining_chunk_bytes_ == 0 && @@ -729,51 +729,53 @@ struct Watcher { return boost::iterator_range(begin, end); } - void CompleteChunk() throw(WatcherException) { - if (body_.empty()) { + std::string name_; + std::unique_lock completion_; + std::function body_callback_; + std::string body_; + size_t remaining_chunk_bytes_; + bool verbose_; + std::string exception_message_; +}; + +void BodyCallback(const std::string& name, + std::function event_callback, + const std::string& body) { + if (body.empty()) { #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Skipping empty watch notification"; + LOG(DEBUG) << name << " => " + << "Skipping empty watch notification"; #endif - } else { - try { + } else { + try { //#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Invoking callbacks on '" << body_ << "'"; +// LOG(DEBUG) << name << " => " +// << "Invoking callbacks on '" << body << "'"; //#endif - std::vector events = json::Parser::AllFromString(body_); - for (json::value& event : events) { - std::string event_str = event->ToString(); + std::vector events = json::Parser::AllFromString(body); + for (json::value& event : events) { + std::string event_str = event->ToString(); #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Invoking callback('" << event_str << "')"; + LOG(DEBUG) << name << " => " + << "Invoking callback('" << event_str << "')"; #endif - event_callback_(std::move(event)); + event_callback(std::move(event)); #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "callback('" << event_str << "') completed"; + LOG(DEBUG) << name << " => " + << "callback('" << event_str << "') completed"; #endif - } + } //#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "All callbacks on '" << body_ << "' completed"; +// LOG(DEBUG) << name << " => " +// << "All callbacks on '" << body << "' completed"; //#endif - } catch (const json::Exception& e) { - throw WatcherException("JSON error: " + e.what()); - } + } catch (const json::Exception& e) { + LOG(ERROR) << "Unable to process events: " << e.what(); } } +} - std::string name_; - std::unique_lock completion_; - std::function event_callback_; - std::string body_; - size_t remaining_chunk_bytes_; - bool verbose_; - std::string exception_message_; -}; - -void EventCallback( +void WatchEventCallback( std::function callback, json::value raw_watch) throw(json::Exception) { Timestamp collected_at = std::chrono::system_clock::now(); @@ -792,6 +794,7 @@ void EventCallback( } void KubernetesReader::WatchMaster( + const std::string& name, const std::string& path, std::function callback) const throw(QueryException, json::Exception) { @@ -804,7 +807,7 @@ void KubernetesReader::WatchMaster( request << boost::network::header( "Authorization", "Bearer " + KubernetesApiToken()); if (config_.VerboseLogging()) { - LOG(INFO) << "WatchMaster: Contacting " << endpoint; + LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint; } try { if (config_.VerboseLogging()) { @@ -813,9 +816,13 @@ void KubernetesReader::WatchMaster( // A notification for watch completion. std::mutex completion_mutex; std::unique_lock watch_completion(completion_mutex); - Watcher watcher(endpoint, - std::bind(&EventCallback, callback, std::placeholders::_1), - std::move(watch_completion), config_.VerboseLogging()); + // Pull this out, as nested std::bind expressions are nasty. + std::function event_callback = + std::bind(&WatchEventCallback, callback, std::placeholders::_1); + Watcher watcher( + endpoint, + std::bind(&BodyCallback, name, event_callback, std::placeholders::_1), + std::move(watch_completion), config_.VerboseLogging()); http::client::response response = client.get(request, boost::ref(watcher)); if (config_.VerboseLogging()) { LOG(INFO) << "Waiting for completion"; @@ -827,7 +834,7 @@ void KubernetesReader::WatchMaster( throw QueryException(watcher.exception()); } if (config_.VerboseLogging()) { - LOG(INFO) << "WatchMaster completed " << body(response); + LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response); } std::string encoding; #ifdef VERBOSE @@ -1043,11 +1050,13 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) ? "" : "&" + config_.KubernetesPodLabelSelector()); try { - WatchMaster(std::string(kKubernetesEndpointPath) + "/pods" - + node_selector + pod_label_selector, - std::bind(&KubernetesReader::PodCallback, - this, callback, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + WatchMaster( + "Pods", + std::string(kKubernetesEndpointPath) + "/pods" + node_selector + + pod_label_selector, + std::bind(&KubernetesReader::PodCallback, + this, callback, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); LOG(ERROR) << "No more pod metadata will be collected"; @@ -1079,11 +1088,12 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) try { // TODO: There seems to be a Kubernetes API bug with watch=true. - WatchMaster(std::string(kKubernetesEndpointPath) + "/watch/nodes/" - + node_name, - std::bind(&KubernetesReader::NodeCallback, - this, callback, std::placeholders::_1, - std::placeholders::_2, std::placeholders::_3)); + WatchMaster( + "Node", + std::string(kKubernetesEndpointPath) + "/watch/nodes/" + node_name, + std::bind(&KubernetesReader::NodeCallback, + this, callback, std::placeholders::_1, + std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); LOG(ERROR) << "No more node metadata will be collected"; diff --git a/src/kubernetes.h b/src/kubernetes.h index 3ed88cdc..e6ec2940 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -62,7 +62,9 @@ class KubernetesReader { // Issues a Kubernetes master API query at a given path and // watches for parsed JSON responses. The path has to start with "/". // Invokes callback for every notification. + // The name is for logging purposes. void WatchMaster( + const std::string& name, const std::string& path, std::function callback) const throw(QueryException, json::Exception);