From 84457b7f140cae29b2a4011ca9a4e16b2e229c7d Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Mon, 18 Dec 2017 15:22:24 -0500 Subject: [PATCH 01/13] Initial watch implementation. --- src/kubernetes.cc | 71 +++++++++++++++++++++++++++++++++++++++++++++++ src/kubernetes.h | 28 +++++++++++++++---- src/updater.h | 1 + 3 files changed, 95 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 7c7c527b..63a52369 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -56,6 +57,8 @@ constexpr const char kK8sNodeNameResourcePrefix[] = "k8s_nodeName"; constexpr const char kNodeSelectorPrefix[] = "?fieldSelector=spec.nodeName%3D"; +constexpr const char kWatchParam[] = "watch=true"; + constexpr const char kDockerIdPrefix[] = "docker://"; constexpr const char kServiceAccountDirectory[] = @@ -551,6 +554,55 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const } } +namespace { +struct Watcher { + Watcher(std::function callback) : callback_(callback) {} + void operator()(const boost::iterator_range& range, + const boost::system::error_code& error) + throw(boost::system::system_error, json::Exception) { + if (!error) { + std::string body(std::begin(range), std::end(range)); +//#ifdef VERBOSE + LOG(DEBUG) << "Watch notification: " << body; +//#endif + callback_(json::Parser::FromString(body)); + } else { + if (error == boost::asio::error::eof) { + LOG(DEBUG) << "Watch callback: EOF"; + } else { + LOG(ERROR) << "Callback got error " << error; + throw boost::system::system_error(error); + } + } + } + private: + std::function callback_; +}; +} + +void KubernetesReader::WatchMaster( + const std::string& path, std::function callback) const + throw(QueryException, json::Exception) { + const std::string prefix((path.find('?') == std::string::npos) ? "?" : "&"); + const std::string watch_param(prefix + kWatchParam); + const std::string endpoint( + config_.KubernetesEndpointHost() + path + watch_param); + http::client client; + http::client::request request(endpoint); + request << boost::network::header( + "Authorization", "Bearer " + KubernetesApiToken()); +// if (config_.VerboseLogging()) { + LOG(INFO) << "WatchMaster: Contacting " << endpoint; +// } + try { + http::client::response response = client.get(request, Watcher(callback)); + LOG(DEBUG) << "WatchMaster: Response: " << body(response); + } catch (const boost::system::system_error& e) { + LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); + throw QueryException(endpoint + " -> " + e.what()); + } +} + const std::string& KubernetesReader::KubernetesApiToken() const { std::lock_guard lock(mutex_); if (kubernetes_api_token_.empty()) { @@ -719,4 +771,23 @@ json::value KubernetesReader::FindTopLevelOwner( return FindTopLevelOwner(ns, GetOwner(ns, ref->As())); } +namespace { +void WatchCallback(json::value result) throw(json::Exception) { + const json::Object* watch = result->As(); + const std::string type = watch->Get("type"); + const json::Object* object = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *object; +} +} + +void KubernetesReader::WatchPods() const { + try { + WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", WatchCallback); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const KubernetesReader::QueryException& e) { + // Already logged. + } +} + } diff --git a/src/kubernetes.h b/src/kubernetes.h index 428f6058..ef9ccc2d 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -18,6 +18,7 @@ //#include "config.h" +#include #include #include #include @@ -37,6 +38,9 @@ class KubernetesReader { // A Kubernetes metadata query function. std::vector MetadataQuery() const; + // Pod watcher. + void WatchPods() const; + private: // A representation of all query-related errors. class QueryException { @@ -47,6 +51,18 @@ class KubernetesReader { std::string explanation_; }; + // Issues a Kubernetes master API query at a given path and + // returns a parsed JSON response. The path has to start with "/". + json::value QueryMaster(const std::string& path) const + throw(QueryException, json::Exception); + + // Issues a Kubernetes master API query at a given path and + // watches for parsed JSON responses. The path has to start with "/". + // Invokes callback for every notification. + void WatchMaster( + const std::string& path, std::function callback) const + throw(QueryException, json::Exception); + // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); @@ -72,11 +88,6 @@ class KubernetesReader { const json::Object* pod, Timestamp collected_at) const throw(json::Exception); - // Issues a Kubernetes master API query at a given path and - // returns a parsed JSON response. The path has to start with "/". - json::value QueryMaster(const std::string& path) const - throw(QueryException, json::Exception); - // Gets the name of the node the agent is running on. // Returns an empty string if unable to find the current node. const std::string& CurrentNode() const; @@ -123,8 +134,15 @@ class KubernetesUpdater : public PollingMetadataUpdater { : reader_(server->config()), PollingMetadataUpdater( server, server->config().KubernetesUpdaterIntervalSeconds(), std::bind(&google::KubernetesReader::MetadataQuery, &reader_)) { } + + void start() { + PollingMetadataUpdater::start(); + watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_); + } + private: KubernetesReader reader_; + std::thread watch_thread_; }; } diff --git a/src/updater.h b/src/updater.h index 5bf2896a..f2b563ac 100644 --- a/src/updater.h +++ b/src/updater.h @@ -19,6 +19,7 @@ //#include "config.h" #include +#include #include #include #include From 5b451796768ccc995990b519aa9257d09b767005 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Mon, 13 Nov 2017 12:16:05 -0500 Subject: [PATCH 02/13] Add watch thread synchronization. --- src/kubernetes.cc | 39 +++++++++++++++++++++++++++++++-------- src/kubernetes.h | 5 +++++ 2 files changed, 36 insertions(+), 8 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 63a52369..e73b07d1 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -556,26 +556,39 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { - Watcher(std::function callback) : callback_(callback) {} - void operator()(const boost::iterator_range& range, - const boost::system::error_code& error) - throw(boost::system::system_error, json::Exception) { + Watcher(std::function callback, + std::unique_lock&& completion) + : completion_(std::move(completion)), callback_(callback) {} + ~Watcher() {} // Unlocks the completion_ lock. + void operator()(const boost::iterator_range& range, + const boost::system::error_code& error) { if (!error) { + if (std::begin(range) == std::end(range)) { + LOG(INFO) << "Skipping empty watch notification"; + return; + } std::string body(std::begin(range), std::end(range)); //#ifdef VERBOSE LOG(DEBUG) << "Watch notification: " << body; //#endif - callback_(json::Parser::FromString(body)); + try { + callback_(json::Parser::FromString(body)); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } } else { if (error == boost::asio::error::eof) { LOG(DEBUG) << "Watch callback: EOF"; } else { LOG(ERROR) << "Callback got error " << error; - throw boost::system::system_error(error); } + LOG(ERROR) << "Unlocking completion mutex"; + completion_.unlock(); } } + private: + std::unique_lock completion_; std::function callback_; }; } @@ -595,8 +608,15 @@ void KubernetesReader::WatchMaster( LOG(INFO) << "WatchMaster: Contacting " << endpoint; // } try { - http::client::response response = client.get(request, Watcher(callback)); - LOG(DEBUG) << "WatchMaster: Response: " << body(response); + LOG(ERROR) << "Locking completion mutex"; + // A notification for watch completion. + std::mutex completion_mutex; + std::unique_lock watch_completion(completion_mutex); + Watcher watcher(callback, std::move(watch_completion)); + http::client::response response = client.get(request, boost::ref(watcher)); + LOG(ERROR) << "Waiting for completion"; + std::lock_guard await_completion(completion_mutex); + LOG(DEBUG) << "WatchMaster completed " << body(response); } catch (const boost::system::system_error& e) { LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); throw QueryException(endpoint + " -> " + e.what()); @@ -773,6 +793,7 @@ json::value KubernetesReader::FindTopLevelOwner( namespace { void WatchCallback(json::value result) throw(json::Exception) { + LOG(ERROR) << "Watch callback: " << *result; const json::Object* watch = result->As(); const std::string type = watch->Get("type"); const json::Object* object = watch->Get("object"); @@ -781,6 +802,7 @@ void WatchCallback(json::value result) throw(json::Exception) { } void KubernetesReader::WatchPods() const { + LOG(ERROR) << "Watch thread started"; try { WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", WatchCallback); } catch (const json::Exception& e) { @@ -788,6 +810,7 @@ void KubernetesReader::WatchPods() const { } catch (const KubernetesReader::QueryException& e) { // Already logged. } + LOG(ERROR) << "Watch thread exiting"; } } diff --git a/src/kubernetes.h b/src/kubernetes.h index ef9ccc2d..98efea98 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -134,6 +134,11 @@ class KubernetesUpdater : public PollingMetadataUpdater { : reader_(server->config()), PollingMetadataUpdater( server, server->config().KubernetesUpdaterIntervalSeconds(), std::bind(&google::KubernetesReader::MetadataQuery, &reader_)) { } + ~KubernetesUpdater() { + if (watch_thread_.joinable()) { + watch_thread_.join(); + } + } void start() { PollingMetadataUpdater::start(); From 21dc954fdd9f9187a862c433ca6476db14ee6aa7 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Wed, 15 Nov 2017 20:06:30 -0500 Subject: [PATCH 03/13] Parse all objects in the stream. --- src/kubernetes.cc | 132 +++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 120 insertions(+), 12 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index e73b07d1..53df9b73 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -26,6 +26,7 @@ #include #include +#include "http_common.h" #include "json.h" #include "logging.h" #include "resource.h" @@ -558,23 +559,40 @@ namespace { struct Watcher { Watcher(std::function callback, std::unique_lock&& completion) - : completion_(std::move(completion)), callback_(callback) {} + : completion_(std::move(completion)), callback_(callback), + remaining_bytes_(0) {} ~Watcher() {} // Unlocks the completion_ lock. void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { if (!error) { - if (std::begin(range) == std::end(range)) { - LOG(INFO) << "Skipping empty watch notification"; - return; - } - std::string body(std::begin(range), std::end(range)); //#ifdef VERBOSE - LOG(DEBUG) << "Watch notification: " << body; +// LOG(DEBUG) << "Watch notification: '" +// << std::string(std::begin(range), std::end(range)) +// << "'"; //#endif - try { - callback_(json::Parser::FromString(body)); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); + boost::iterator_range pos = range; + if (remaining_bytes_ != 0) { + pos = ReadNextChunk(pos); +// LOG(ERROR) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_bytes_ << " bytes remaining"; + } + + if (remaining_bytes_ == 0) { + // Invoke the callback. + CompleteChunk(); + + // Process the next batch. + while (remaining_bytes_ == 0 && std::begin(pos) != std::end(pos)) { + pos = StartNewChunk(pos); + } +// LOG(ERROR) << "Started new chunk; " << remaining_bytes_ +// << " bytes remaining"; + + if (remaining_bytes_ != 0) { + pos = ReadNextChunk(pos); +// LOG(ERROR) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_bytes_ << " bytes remaining"; + } } } else { if (error == boost::asio::error::eof) { @@ -588,8 +606,86 @@ struct Watcher { } private: + boost::iterator_range + StartNewChunk(const boost::iterator_range& range) { + if (remaining_bytes_ != 0) { + LOG(ERROR) << "Starting new chunk with " << remaining_bytes_ + << " bytes remaining"; + } + + body_.clear(); + + const std::string crlf("\r\n"); + auto begin = std::begin(range); + auto end = std::end(range); + auto iter = std::search(begin, end, crlf.begin(), crlf.end()); + if (iter == end && iter != begin) { + LOG(ERROR) << "Invalid chunked encoding: '" + << std::string(begin, end) + << "'"; + return boost::iterator_range(begin, end); + } + std::string line(begin, iter); +// LOG(ERROR) << "Buffer before increment: '" << std::string(iter, end) << "'"; + iter = std::next(iter, crlf.size()); +// LOG(ERROR) << "Buffer: '" << std::string(iter, end) << "'"; + if (line.empty()) { + // Blank lines are fine, just skip them. + LOG(DEBUG) << "Skipping blank line within chunked encoding;" + << " remaining data '" << std::string(iter, end) << "'"; + } else { +// LOG(ERROR) << "Line: '" << line << "'"; + std::stringstream stream(line); + stream >> std::hex >> remaining_bytes_; + } + return boost::iterator_range(iter, end); + } + + boost::iterator_range + ReadNextChunk(const boost::iterator_range& range) { + if (remaining_bytes_ == 0) { + LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; + return range; + } + + const std::string crlf("\r\n"); + auto begin = std::begin(range); + auto end = std::end(range); + const size_t available = std::distance(begin, end); + const size_t len = std::min(available, remaining_bytes_); + body_.insert(body_.end(), begin, begin + len); + remaining_bytes_ -= len; + begin = std::next(begin, len); +// if (remaining_bytes_ == 0) { +// begin = std::next(begin, crlf.size()); +// } + return boost::iterator_range(begin, end); + } + + void CompleteChunk() { + if (body_.empty()) { + LOG(INFO) << "Skipping empty watch notification"; + } else { + try { +// LOG(INFO) << "Invoking callbacks on '" << body_ << "'"; + std::vector events = json::Parser::AllFromString(body_); + for (json::value& event : events) { + std::string event_str = event->ToString(); +// LOG(INFO) << "Invoking callback('" << event_str << "')"; + callback_(std::move(event)); +// LOG(INFO) << "callback('" << event_str << "') completed"; + } +// LOG(INFO) << "All callbacks on '" << body_ << "' completed"; + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } + } + } + std::unique_lock completion_; std::function callback_; + std::string body_; + size_t remaining_bytes_; }; } @@ -617,6 +713,18 @@ void KubernetesReader::WatchMaster( LOG(ERROR) << "Waiting for completion"; std::lock_guard await_completion(completion_mutex); LOG(DEBUG) << "WatchMaster completed " << body(response); + bool is_chunk_encoding = false; +#ifdef VERBOSE + LOG(DEBUG) << "response headers: " << response.headers(); +#endif + for (const auto& h : headers(response)) { + if (h.first == "Transfer-Encoding" && h.second == "chunked") { + is_chunk_encoding = true; + } + } + if (!is_chunk_encoding) { + LOG(ERROR) << "Expected chunked encoding"; + } } catch (const boost::system::system_error& e) { LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); throw QueryException(endpoint + " -> " + e.what()); @@ -793,7 +901,7 @@ json::value KubernetesReader::FindTopLevelOwner( namespace { void WatchCallback(json::value result) throw(json::Exception) { - LOG(ERROR) << "Watch callback: " << *result; + //LOG(ERROR) << "Watch callback: " << *result; const json::Object* watch = result->As(); const std::string type = watch->Get("type"); const json::Object* object = watch->Get("object"); From 1eccacc970b2dcca7525b23f135c5c3c99af3c3f Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Wed, 15 Nov 2017 21:50:34 -0500 Subject: [PATCH 04/13] Logging cleanup. --- src/kubernetes.cc | 90 ++++++++++++++++++++++++++++++----------------- 1 file changed, 58 insertions(+), 32 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 53df9b73..25f83da4 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -558,9 +558,9 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { Watcher(std::function callback, - std::unique_lock&& completion) + std::unique_lock&& completion, bool verbose) : completion_(std::move(completion)), callback_(callback), - remaining_bytes_(0) {} + remaining_bytes_(0), verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { @@ -573,8 +573,10 @@ struct Watcher { boost::iterator_range pos = range; if (remaining_bytes_ != 0) { pos = ReadNextChunk(pos); -// LOG(ERROR) << "Read another chunk; body now is '" << body_ << "'; " +//#ifdef VERBOSE +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " // << remaining_bytes_ << " bytes remaining"; +//#endif } if (remaining_bytes_ == 0) { @@ -585,22 +587,30 @@ struct Watcher { while (remaining_bytes_ == 0 && std::begin(pos) != std::end(pos)) { pos = StartNewChunk(pos); } -// LOG(ERROR) << "Started new chunk; " << remaining_bytes_ +//#ifdef VERBOSE +// LOG(DEBUG) << "Started new chunk; " << remaining_bytes_ // << " bytes remaining"; +//#endif if (remaining_bytes_ != 0) { pos = ReadNextChunk(pos); -// LOG(ERROR) << "Read another chunk; body now is '" << body_ << "'; " +//#ifdef VERBOSE +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " // << remaining_bytes_ << " bytes remaining"; +//#endif } } } else { if (error == boost::asio::error::eof) { +#ifdef VERBOSE LOG(DEBUG) << "Watch callback: EOF"; +#endif } else { LOG(ERROR) << "Callback got error " << error; } - LOG(ERROR) << "Unlocking completion mutex"; + if (verbose_) { + LOG(INFO) << "Unlocking completion mutex"; + } completion_.unlock(); } } @@ -626,15 +636,17 @@ struct Watcher { return boost::iterator_range(begin, end); } std::string line(begin, iter); -// LOG(ERROR) << "Buffer before increment: '" << std::string(iter, end) << "'"; iter = std::next(iter, crlf.size()); -// LOG(ERROR) << "Buffer: '" << std::string(iter, end) << "'"; if (line.empty()) { // Blank lines are fine, just skip them. +#ifdef VERBOSE LOG(DEBUG) << "Skipping blank line within chunked encoding;" << " remaining data '" << std::string(iter, end) << "'"; +#endif } else { -// LOG(ERROR) << "Line: '" << line << "'"; +//#ifdef VERBOSE +// LOG(DEBUG) << "Line: '" << line << "'"; +//#endif std::stringstream stream(line); stream >> std::hex >> remaining_bytes_; } @@ -656,26 +668,33 @@ struct Watcher { body_.insert(body_.end(), begin, begin + len); remaining_bytes_ -= len; begin = std::next(begin, len); -// if (remaining_bytes_ == 0) { -// begin = std::next(begin, crlf.size()); -// } return boost::iterator_range(begin, end); } void CompleteChunk() { if (body_.empty()) { - LOG(INFO) << "Skipping empty watch notification"; +#ifdef VERBOSE + LOG(DEBUG) << "Skipping empty watch notification"; +#endif } else { try { -// LOG(INFO) << "Invoking callbacks on '" << body_ << "'"; +//#ifdef VERBOSE +// LOG(DEBUG) << "Invoking callbacks on '" << body_ << "'"; +//#endif std::vector events = json::Parser::AllFromString(body_); for (json::value& event : events) { std::string event_str = event->ToString(); -// LOG(INFO) << "Invoking callback('" << event_str << "')"; +#ifdef VERBOSE + LOG(DEBUG) << "Invoking callback('" << event_str << "')"; +#endif callback_(std::move(event)); -// LOG(INFO) << "callback('" << event_str << "') completed"; +#ifdef VERBOSE + LOG(DEBUG) << "callback('" << event_str << "') completed"; +#endif } -// LOG(INFO) << "All callbacks on '" << body_ << "' completed"; +//#ifdef VERBOSE +// LOG(DEBUG) << "All callbacks on '" << body_ << "' completed"; +//#endif } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } @@ -686,6 +705,7 @@ struct Watcher { std::function callback_; std::string body_; size_t remaining_bytes_; + bool verbose_; }; } @@ -700,30 +720,36 @@ void KubernetesReader::WatchMaster( http::client::request request(endpoint); request << boost::network::header( "Authorization", "Bearer " + KubernetesApiToken()); -// if (config_.VerboseLogging()) { + if (config_.VerboseLogging()) { LOG(INFO) << "WatchMaster: Contacting " << endpoint; -// } + } try { - LOG(ERROR) << "Locking completion mutex"; + if (config_.VerboseLogging()) { + LOG(INFO) << "Locking completion mutex"; + } // A notification for watch completion. std::mutex completion_mutex; std::unique_lock watch_completion(completion_mutex); - Watcher watcher(callback, std::move(watch_completion)); + Watcher watcher(callback, std::move(watch_completion), + config_.VerboseLogging()); http::client::response response = client.get(request, boost::ref(watcher)); - LOG(ERROR) << "Waiting for completion"; + if (config_.VerboseLogging()) { + LOG(INFO) << "Waiting for completion"; + } std::lock_guard await_completion(completion_mutex); - LOG(DEBUG) << "WatchMaster completed " << body(response); - bool is_chunk_encoding = false; + if (config_.VerboseLogging()) { + LOG(INFO) << "WatchMaster completed " << body(response); + } + std::string encoding; #ifdef VERBOSE LOG(DEBUG) << "response headers: " << response.headers(); #endif - for (const auto& h : headers(response)) { - if (h.first == "Transfer-Encoding" && h.second == "chunked") { - is_chunk_encoding = true; - } + auto transfer_encoding_header = headers(response)["Transfer-Encoding"]; + if (!boost::empty(transfer_encoding_header)) { + encoding = boost::begin(transfer_encoding_header)->second; } - if (!is_chunk_encoding) { - LOG(ERROR) << "Expected chunked encoding"; + if (encoding != "chunked") { + LOG(ERROR) << "Expected chunked encoding; found '" << encoding << "'"; } } catch (const boost::system::system_error& e) { LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); @@ -910,7 +936,7 @@ void WatchCallback(json::value result) throw(json::Exception) { } void KubernetesReader::WatchPods() const { - LOG(ERROR) << "Watch thread started"; + LOG(INFO) << "Watch thread (pods) started"; try { WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", WatchCallback); } catch (const json::Exception& e) { @@ -918,7 +944,7 @@ void KubernetesReader::WatchPods() const { } catch (const KubernetesReader::QueryException& e) { // Already logged. } - LOG(ERROR) << "Watch thread exiting"; + LOG(INFO) << "Watch thread (pods) exiting"; } } From 70dc72f979e439952856b7f5ee8c3ea7cc7c105c Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 17 Nov 2017 17:23:50 -0500 Subject: [PATCH 05/13] Move watch callback into KubernetesReader. --- src/kubernetes.cc | 21 +++++++++++++++------ src/kubernetes.h | 3 +++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 25f83da4..531da83d 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -925,20 +925,29 @@ json::value KubernetesReader::FindTopLevelOwner( return FindTopLevelOwner(ns, GetOwner(ns, ref->As())); } -namespace { -void WatchCallback(json::value result) throw(json::Exception) { - //LOG(ERROR) << "Watch callback: " << *result; - const json::Object* watch = result->As(); +void KubernetesReader::PodCallback(json::value raw_watch) const + throw(json::Exception) { + Timestamp collected_at = std::chrono::system_clock::now(); + + //LOG(ERROR) << "Watch callback: " << *raw_watch; + const json::Object* watch = raw_watch->As(); const std::string type = watch->Get("type"); const json::Object* object = watch->Get("object"); LOG(ERROR) << "Watch type: " << type << " object: " << *object; -} +// if (type == "MODIFIED" || type == "ADDED") { +// MetadataUpdater::ResourceMetadata result = +// GetPodMetadata(object->Clone(), collected_at); +// UpdateResourceCallback(result); +// UpdateMetadataCallback(std::move(result)); +// } } void KubernetesReader::WatchPods() const { LOG(INFO) << "Watch thread (pods) started"; try { - WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", WatchCallback); + WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", + std::bind(&KubernetesReader::PodCallback, + this, std::placeholders::_1)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const KubernetesReader::QueryException& e) { diff --git a/src/kubernetes.h b/src/kubernetes.h index 98efea98..28a30eab 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -63,6 +63,9 @@ class KubernetesReader { const std::string& path, std::function callback) const throw(QueryException, json::Exception); + // Pod watcher callback. + void PodCallback(json::value result) const throw(json::Exception); + // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const throw(json::Exception); From 35ae959b560863d8ab95daae8d8458712058a69b Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Thu, 16 Nov 2017 18:14:20 -0500 Subject: [PATCH 06/13] Factor out the metadata update callback. --- src/kubernetes.cc | 35 ++++++++++++++++++++++++----------- src/kubernetes.h | 14 +++++++++++--- src/updater.h | 3 +++ 3 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 531da83d..5f97f1c5 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -925,29 +925,34 @@ json::value KubernetesReader::FindTopLevelOwner( return FindTopLevelOwner(ns, GetOwner(ns, ref->As())); } -void KubernetesReader::PodCallback(json::value raw_watch) const +void KubernetesReader::PodCallback(MetadataUpdater::UpdateCallback callback, + json::value raw_watch) const throw(json::Exception) { Timestamp collected_at = std::chrono::system_clock::now(); //LOG(ERROR) << "Watch callback: " << *raw_watch; const json::Object* watch = raw_watch->As(); const std::string type = watch->Get("type"); - const json::Object* object = watch->Get("object"); - LOG(ERROR) << "Watch type: " << type << " object: " << *object; -// if (type == "MODIFIED" || type == "ADDED") { -// MetadataUpdater::ResourceMetadata result = -// GetPodMetadata(object->Clone(), collected_at); -// UpdateResourceCallback(result); -// UpdateMetadataCallback(std::move(result)); -// } + const json::Object* pod = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *pod; + if (type == "MODIFIED" || type == "ADDED") { + json::value associations = ComputePodAssociations(pod); + std::vector result_vector; + result_vector.emplace_back(GetPodMetadata(pod->Clone(), + std::move(associations), + collected_at)); + callback(std::move(result_vector)); + } } -void KubernetesReader::WatchPods() const { +void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) + const { LOG(INFO) << "Watch thread (pods) started"; + try { WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", std::bind(&KubernetesReader::PodCallback, - this, std::placeholders::_1)); + this, callback, std::placeholders::_1)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); } catch (const KubernetesReader::QueryException& e) { @@ -956,4 +961,12 @@ void KubernetesReader::WatchPods() const { LOG(INFO) << "Watch thread (pods) exiting"; } +void KubernetesUpdater::MetadataCallback( + std::vector&& result_vector) { + for (MetadataUpdater::ResourceMetadata& result : result_vector) { + UpdateResourceCallback(result); + UpdateMetadataCallback(std::move(result)); + } +} + } diff --git a/src/kubernetes.h b/src/kubernetes.h index 28a30eab..a9cb36b3 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -39,7 +39,7 @@ class KubernetesReader { std::vector MetadataQuery() const; // Pod watcher. - void WatchPods() const; + void WatchPods(MetadataUpdater::UpdateCallback callback) const; private: // A representation of all query-related errors. @@ -64,7 +64,9 @@ class KubernetesReader { throw(QueryException, json::Exception); // Pod watcher callback. - void PodCallback(json::value result) const throw(json::Exception); + void PodCallback( + MetadataUpdater::UpdateCallback callback, json::value result) const + throw(json::Exception); // Compute the associations for a given pod. json::value ComputePodAssociations(const json::Object* pod) const @@ -145,10 +147,16 @@ class KubernetesUpdater : public PollingMetadataUpdater { void start() { PollingMetadataUpdater::start(); - watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_); + // Wrap the bind expression into a function to use as a bind argument. + UpdateCallback cb = std::bind(&KubernetesUpdater::MetadataCallback, this, + std::placeholders::_1); + watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_, cb); } private: + // Metadata watcher callback. + void MetadataCallback(std::vector&& result_vector); + KubernetesReader reader_; std::thread watch_thread_; }; diff --git a/src/updater.h b/src/updater.h index f2b563ac..d4878962 100644 --- a/src/updater.h +++ b/src/updater.h @@ -59,6 +59,9 @@ class MetadataUpdater { // Stops updating. virtual void stop() = 0; + using UpdateCallback = + std::function&&)>; + protected: // Updates the resource map in the store. void UpdateResourceCallback(const ResourceMetadata& result) { From 67d5f4dd7503b7ade1059f314eaf7ae32d093bfc Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Thu, 16 Nov 2017 15:33:54 -0500 Subject: [PATCH 07/13] Get both pod and container metadata from watch. --- src/kubernetes.cc | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 5f97f1c5..f9d11cd3 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -936,11 +936,8 @@ void KubernetesReader::PodCallback(MetadataUpdater::UpdateCallback callback, const json::Object* pod = watch->Get("object"); LOG(ERROR) << "Watch type: " << type << " object: " << *pod; if (type == "MODIFIED" || type == "ADDED") { - json::value associations = ComputePodAssociations(pod); - std::vector result_vector; - result_vector.emplace_back(GetPodMetadata(pod->Clone(), - std::move(associations), - collected_at)); + std::vector result_vector = + GetPodAndContainerMetadata(pod, collected_at); callback(std::move(result_vector)); } } From 9e96f69f93354a807cd5c7cb088a52d465e6a514 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Thu, 16 Nov 2017 18:54:37 -0500 Subject: [PATCH 08/13] Add a node watch. --- src/kubernetes.cc | 55 ++++++++++++++++++++++++++++++++++++++++++++++- src/kubernetes.h | 22 +++++++++++++++---- 2 files changed, 72 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index f9d11cd3..40f80f0f 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -946,8 +946,20 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) const { LOG(INFO) << "Watch thread (pods) started"; + const std::string node_name = CurrentNode(); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Current node is " << node_name; + } + + const std::string node_selector(kNodeSelectorPrefix + node_name); + const std::string pod_label_selector( + config_.KubernetesPodLabelSelector().empty() + ? "" : "&" + config_.KubernetesPodLabelSelector()); + try { - WatchMaster(std::string(kKubernetesEndpointPath) + "/pods", + WatchMaster(std::string(kKubernetesEndpointPath) + "/pods" + + node_selector + pod_label_selector, std::bind(&KubernetesReader::PodCallback, this, callback, std::placeholders::_1)); } catch (const json::Exception& e) { @@ -958,6 +970,47 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) LOG(INFO) << "Watch thread (pods) exiting"; } +void KubernetesReader::NodeCallback(MetadataUpdater::UpdateCallback callback, + json::value raw_watch) const + throw(json::Exception) { + Timestamp collected_at = std::chrono::system_clock::now(); + + //LOG(ERROR) << "Watch callback: " << *raw_watch; + const json::Object* watch = raw_watch->As(); + const std::string type = watch->Get("type"); + const json::Object* node = watch->Get("object"); + LOG(ERROR) << "Watch type: " << type << " object: " << *node; + if (type == "MODIFIED" || type == "ADDED") { + std::vector result_vector; + result_vector.emplace_back(GetNodeMetadata(node->Clone(), collected_at)); + callback(std::move(result_vector)); + } +} + +void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) + const { + LOG(INFO) << "Watch thread (node) started"; + + const std::string node_name = CurrentNode(); + + if (config_.VerboseLogging()) { + LOG(INFO) << "Current node is " << node_name; + } + + try { + // TODO: There seems to be a Kubernetes API bug with watch=true. + WatchMaster(std::string(kKubernetesEndpointPath) + "/watch/nodes/" + + node_name, + std::bind(&KubernetesReader::NodeCallback, + this, callback, std::placeholders::_1)); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const KubernetesReader::QueryException& e) { + // Already logged. + } + LOG(INFO) << "Watch thread (node) exiting"; +} + void KubernetesUpdater::MetadataCallback( std::vector&& result_vector) { for (MetadataUpdater::ResourceMetadata& result : result_vector) { diff --git a/src/kubernetes.h b/src/kubernetes.h index a9cb36b3..123bfa68 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -38,6 +38,9 @@ class KubernetesReader { // A Kubernetes metadata query function. std::vector MetadataQuery() const; + // Node watcher. + void WatchNode(MetadataUpdater::UpdateCallback callback) const; + // Pod watcher. void WatchPods(MetadataUpdater::UpdateCallback callback) const; @@ -63,6 +66,11 @@ class KubernetesReader { const std::string& path, std::function callback) const throw(QueryException, json::Exception); + // Node watcher callback. + void NodeCallback( + MetadataUpdater::UpdateCallback callback, json::value result) const + throw(json::Exception); + // Pod watcher callback. void PodCallback( MetadataUpdater::UpdateCallback callback, json::value result) const @@ -140,8 +148,11 @@ class KubernetesUpdater : public PollingMetadataUpdater { server, server->config().KubernetesUpdaterIntervalSeconds(), std::bind(&google::KubernetesReader::MetadataQuery, &reader_)) { } ~KubernetesUpdater() { - if (watch_thread_.joinable()) { - watch_thread_.join(); + if (node_watch_thread_.joinable()) { + node_watch_thread_.join(); + } + if (pod_watch_thread_.joinable()) { + pod_watch_thread_.join(); } } @@ -150,7 +161,9 @@ class KubernetesUpdater : public PollingMetadataUpdater { // Wrap the bind expression into a function to use as a bind argument. UpdateCallback cb = std::bind(&KubernetesUpdater::MetadataCallback, this, std::placeholders::_1); - watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_, cb); + node_watch_thread_ = std::thread( + &KubernetesReader::WatchNode, &reader_, cb); + pod_watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_, cb); } private: @@ -158,7 +171,8 @@ class KubernetesUpdater : public PollingMetadataUpdater { void MetadataCallback(std::vector&& result_vector); KubernetesReader reader_; - std::thread watch_thread_; + std::thread node_watch_thread_; + std::thread pod_watch_thread_; }; } From d484ecf6b02880bb2defdf8c06b94156406e324e Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 17 Nov 2017 12:51:59 -0500 Subject: [PATCH 09/13] Guard watch uses by a config option; allow overriding current node. --- src/configuration.cc | 8 ++++++++ src/configuration.h | 10 ++++++++++ src/kubernetes.cc | 47 ++++++++++++++++++++++++++++++-------------- src/kubernetes.h | 10 +--------- 4 files changed, 51 insertions(+), 24 deletions(-) diff --git a/src/configuration.cc b/src/configuration.cc index 364ff951..37d29448 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -46,6 +46,8 @@ constexpr const int kKubernetesUpdaterDefaultIntervalSeconds = 60; constexpr const char kKubernetesDefaultEndpointHost[] = "https://kubernetes"; constexpr const char kKubernetesDefaultPodLabelSelector[] = ""; constexpr const char kKubernetesDefaultClusterName[] = ""; +constexpr const char kKubernetesDefaultNodeName[] = ""; +constexpr const bool kKubernetesDefaultUseWatch = true; constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; @@ -72,6 +74,8 @@ MetadataAgentConfiguration::MetadataAgentConfiguration() kubernetes_endpoint_host_(kKubernetesDefaultEndpointHost), kubernetes_pod_label_selector_(kKubernetesDefaultPodLabelSelector), kubernetes_cluster_name_(kKubernetesDefaultClusterName), + kubernetes_node_name_(kKubernetesDefaultNodeName), + kubernetes_use_watch_(kKubernetesDefaultUseWatch), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone) {} @@ -154,6 +158,10 @@ void MetadataAgentConfiguration::ParseConfigFile(const std::string& filename) { kubernetes_cluster_name_ = config["KubernetesClusterName"].as( kKubernetesDefaultClusterName); + kubernetes_node_name_ = + config["KubernetesNodeName"].as(kKubernetesDefaultNodeName); + kubernetes_use_watch_ = + config["KubernetesUseWatch"].as(kKubernetesDefaultUseWatch); instance_id_ = config["InstanceId"].as(kDefaultInstanceId); instance_zone_ = diff --git a/src/configuration.h b/src/configuration.h index 64e18e6f..c38da470 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -95,6 +95,14 @@ class MetadataAgentConfiguration { std::lock_guard lock(mutex_); return kubernetes_cluster_name_; } + const std::string& KubernetesNodeName() const { + std::lock_guard lock(mutex_); + return kubernetes_node_name_; + } + bool KubernetesUseWatch() const { + std::lock_guard lock(mutex_); + return kubernetes_use_watch_; + } // Common metadata updater options. const std::string& InstanceId() const { std::lock_guard lock(mutex_); @@ -125,6 +133,8 @@ class MetadataAgentConfiguration { std::string kubernetes_endpoint_host_; std::string kubernetes_pod_label_selector_; std::string kubernetes_cluster_name_; + std::string kubernetes_node_name_; + bool kubernetes_use_watch_; std::string instance_id_; std::string instance_zone_; }; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 40f80f0f..28efd196 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -782,21 +782,25 @@ const std::string& KubernetesReader::KubernetesNamespace() const { const std::string& KubernetesReader::CurrentNode() const { std::lock_guard lock(mutex_); if (current_node_.empty()) { - const std::string& ns = KubernetesNamespace(); - // TODO: This is unreliable, see - // https://github.com/kubernetes/kubernetes/issues/52162. - const std::string pod_name = boost::asio::ip::host_name(); - try { - json::value pod_response = QueryMaster( - std::string(kKubernetesEndpointPath) + - "/namespaces/" + ns + "/pods/" + pod_name); - const json::Object* pod = pod_response->As(); - const json::Object* spec = pod->Get("spec"); - current_node_ = spec->Get("nodeName"); - } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); - } catch (const QueryException& e) { - // Already logged. + if (!config_.KubernetesNodeName().empty()) { + current_node_ = config_.KubernetesNodeName(); + } else { + const std::string& ns = KubernetesNamespace(); + // TODO: This is unreliable, see + // https://github.com/kubernetes/kubernetes/issues/52162. + const std::string pod_name = boost::asio::ip::host_name(); + try { + json::value pod_response = QueryMaster( + std::string(kKubernetesEndpointPath) + + "/namespaces/" + ns + "/pods/" + pod_name); + const json::Object* pod = pod_response->As(); + const json::Object* spec = pod->Get("spec"); + current_node_ = spec->Get("nodeName"); + } catch (const json::Exception& e) { + LOG(ERROR) << e.what(); + } catch (const QueryException& e) { + // Already logged. + } } } return current_node_; @@ -1011,6 +1015,19 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) LOG(INFO) << "Watch thread (node) exiting"; } +void KubernetesUpdater::start() { + PollingMetadataUpdater::start(); + if (config().KubernetesUseWatch()) { + // Wrap the bind expression into a function to use as a bind argument. + UpdateCallback cb = std::bind(&KubernetesUpdater::MetadataCallback, this, + std::placeholders::_1); + node_watch_thread_ = + std::thread(&KubernetesReader::WatchNode, &reader_, cb); + pod_watch_thread_ = + std::thread(&KubernetesReader::WatchPods, &reader_, cb); + } +} + void KubernetesUpdater::MetadataCallback( std::vector&& result_vector) { for (MetadataUpdater::ResourceMetadata& result : result_vector) { diff --git a/src/kubernetes.h b/src/kubernetes.h index 123bfa68..d669be92 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -156,15 +156,7 @@ class KubernetesUpdater : public PollingMetadataUpdater { } } - void start() { - PollingMetadataUpdater::start(); - // Wrap the bind expression into a function to use as a bind argument. - UpdateCallback cb = std::bind(&KubernetesUpdater::MetadataCallback, this, - std::placeholders::_1); - node_watch_thread_ = std::thread( - &KubernetesReader::WatchNode, &reader_, cb); - pod_watch_thread_ = std::thread(&KubernetesReader::WatchPods, &reader_, cb); - } + void start(); private: // Metadata watcher callback. From 0e4bc9f9ecc0f1a4b54ae3af35bb974b52e2b469 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Mon, 18 Dec 2017 18:51:36 -0500 Subject: [PATCH 10/13] Bump version to 0.0.14.k8s.watch-1. --- src/Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Makefile b/src/Makefile index 44cdcc88..8d336bde 100644 --- a/src/Makefile +++ b/src/Makefile @@ -63,8 +63,8 @@ install: metadatad export DISTRO PKG_NAME=stackdriver-metadata -PKG_VERSION=0.0.13 -PKG_RELEASE=5 +PKG_VERSION=0.0.14.k8s.watch +PKG_RELEASE=1 PKG_MAINTAINER=Stackdriver Engineering DOCKER_VERSION=0.2 From 40fc0cf489d7b169b514b35d88de553e9aed3b59 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Wed, 20 Dec 2017 14:25:00 -0500 Subject: [PATCH 11/13] Clarify remaining chunk bytes vs available request bytes. --- src/kubernetes.cc | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 28efd196..ea15261a 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -560,7 +560,7 @@ struct Watcher { Watcher(std::function callback, std::unique_lock&& completion, bool verbose) : completion_(std::move(completion)), callback_(callback), - remaining_bytes_(0), verbose_(verbose) {} + remaining_chunk_bytes_(0), verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { @@ -571,32 +571,33 @@ struct Watcher { // << "'"; //#endif boost::iterator_range pos = range; - if (remaining_bytes_ != 0) { + if (remaining_chunk_bytes_ != 0) { pos = ReadNextChunk(pos); //#ifdef VERBOSE // LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_bytes_ << " bytes remaining"; +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif } - if (remaining_bytes_ == 0) { + if (remaining_chunk_bytes_ == 0) { // Invoke the callback. CompleteChunk(); // Process the next batch. - while (remaining_bytes_ == 0 && std::begin(pos) != std::end(pos)) { + while (remaining_chunk_bytes_ == 0 && + std::begin(pos) != std::end(pos)) { pos = StartNewChunk(pos); } //#ifdef VERBOSE -// LOG(DEBUG) << "Started new chunk; " << remaining_bytes_ +// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ // << " bytes remaining"; //#endif - if (remaining_bytes_ != 0) { + if (remaining_chunk_bytes_ != 0) { pos = ReadNextChunk(pos); //#ifdef VERBOSE // LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_bytes_ << " bytes remaining"; +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif } } @@ -618,8 +619,8 @@ struct Watcher { private: boost::iterator_range StartNewChunk(const boost::iterator_range& range) { - if (remaining_bytes_ != 0) { - LOG(ERROR) << "Starting new chunk with " << remaining_bytes_ + if (remaining_chunk_bytes_ != 0) { + LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_ << " bytes remaining"; } @@ -648,14 +649,14 @@ struct Watcher { // LOG(DEBUG) << "Line: '" << line << "'"; //#endif std::stringstream stream(line); - stream >> std::hex >> remaining_bytes_; + stream >> std::hex >> remaining_chunk_bytes_; } return boost::iterator_range(iter, end); } boost::iterator_range ReadNextChunk(const boost::iterator_range& range) { - if (remaining_bytes_ == 0) { + if (remaining_chunk_bytes_ == 0) { LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; return range; } @@ -663,10 +664,12 @@ struct Watcher { const std::string crlf("\r\n"); auto begin = std::begin(range); auto end = std::end(range); + // The available bytes in the current notification, which may include the + // remainder of this chunk and the start of the next one. const size_t available = std::distance(begin, end); - const size_t len = std::min(available, remaining_bytes_); + const size_t len = std::min(available, remaining_chunk_bytes_); body_.insert(body_.end(), begin, begin + len); - remaining_bytes_ -= len; + remaining_chunk_bytes_ -= len; begin = std::next(begin, len); return boost::iterator_range(begin, end); } @@ -704,7 +707,7 @@ struct Watcher { std::unique_lock completion_; std::function callback_; std::string body_; - size_t remaining_bytes_; + size_t remaining_chunk_bytes_; bool verbose_; }; } From 2297cf80861336adb6974c562fd5a9aa19fc1613 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Wed, 20 Dec 2017 14:38:25 -0500 Subject: [PATCH 12/13] Handle blank lines in chunked encoding early. --- src/kubernetes.cc | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index ea15261a..04125295 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -630,7 +630,15 @@ struct Watcher { auto begin = std::begin(range); auto end = std::end(range); auto iter = std::search(begin, end, crlf.begin(), crlf.end()); - if (iter == end && iter != begin) { + if (iter == begin) { + // Blank lines are fine, just skip them. + iter = std::next(iter, crlf.size()); +#ifdef VERBOSE + LOG(DEBUG) << "Skipping blank line within chunked encoding;" + << " remaining data '" << std::string(iter, end) << "'"; +#endif + return boost::iterator_range(iter, end); + } else if (iter == end) { LOG(ERROR) << "Invalid chunked encoding: '" << std::string(begin, end) << "'"; @@ -638,19 +646,11 @@ struct Watcher { } std::string line(begin, iter); iter = std::next(iter, crlf.size()); - if (line.empty()) { - // Blank lines are fine, just skip them. -#ifdef VERBOSE - LOG(DEBUG) << "Skipping blank line within chunked encoding;" - << " remaining data '" << std::string(iter, end) << "'"; -#endif - } else { //#ifdef VERBOSE -// LOG(DEBUG) << "Line: '" << line << "'"; +// LOG(DEBUG) << "Line: '" << line << "'"; //#endif - std::stringstream stream(line); - stream >> std::hex >> remaining_chunk_bytes_; - } + std::stringstream stream(line); + stream >> std::hex >> remaining_chunk_bytes_; return boost::iterator_range(iter, end); } From 815f41719d7a66e89b9505278c25d7f2f086a79b Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Wed, 20 Dec 2017 19:05:42 -0500 Subject: [PATCH 13/13] Bump version to 0.0.14-1. --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 8d336bde..edf6298c 100644 --- a/src/Makefile +++ b/src/Makefile @@ -63,7 +63,7 @@ install: metadatad export DISTRO PKG_NAME=stackdriver-metadata -PKG_VERSION=0.0.14.k8s.watch +PKG_VERSION=0.0.14 PKG_RELEASE=1 PKG_MAINTAINER=Stackdriver Engineering