From bb232ec85188623485e3e718252c828dad6292b6 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 23 Feb 2018 17:12:51 -0500 Subject: [PATCH 1/7] When a chunk does not end in CRLF, assume a CRLF at the end. --- src/kubernetes.cc | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 6205d8bc..f5f96c54 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "format.h" @@ -650,17 +651,18 @@ struct Watcher { #endif return boost::iterator_range(iter, end); } else if (iter == end) { - LOG(ERROR) << "Invalid chunked encoding: '" - << std::string(begin, end) - << "'"; - return boost::iterator_range(begin, end); + std::string line(begin, end); + LOG(ERROR) << "Invalid chunked encoding: '" << line << "'; assuming crlf"; + std::istringstream stream(line); + stream >> std::hex >> remaining_chunk_bytes_; + return boost::iterator_range(end, end); } std::string line(begin, iter); iter = std::next(iter, crlf.size()); //#ifdef VERBOSE // LOG(DEBUG) << "Line: '" << line << "'"; //#endif - std::stringstream stream(line); + std::istringstream stream(line); stream >> std::hex >> remaining_chunk_bytes_; return boost::iterator_range(iter, end); } From b67448254f2c4ea5c315e9a5a5bb10ba321785fb Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 23 Feb 2018 19:44:14 -0500 Subject: [PATCH 2/7] On second thought, just exit the watch on an invalid chunk. --- src/kubernetes.cc | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index f5f96c54..bb2e1ff4 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -652,9 +652,13 @@ struct Watcher { return boost::iterator_range(iter, end); } else if (iter == end) { std::string line(begin, end); - LOG(ERROR) << "Invalid chunked encoding: '" << line << "'; assuming crlf"; - std::istringstream stream(line); - stream >> std::hex >> remaining_chunk_bytes_; + LOG(ERROR) << "Invalid chunked encoding: '" + << std::string(begin, end) + << "'; exiting"; + if (verbose_) { + LOG(INFO) << "Unlocking completion mutex"; + } + completion_.unlock(); return boost::iterator_range(end, end); } std::string line(begin, iter); From c94ebcd49a54114456fd6e0557eaf9363a43f784 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Mon, 26 Feb 2018 16:45:22 -0500 Subject: [PATCH 3/7] Ignore the rest of the streaming input when an error is encountered. --- src/kubernetes.cc | 114 ++++++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 40 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index bb2e1ff4..87b4dacf 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -572,46 +572,69 @@ struct Watcher { Watcher(std::function event_callback, std::unique_lock&& completion, bool verbose) : completion_(std::move(completion)), event_callback_(event_callback), - remaining_chunk_bytes_(0), verbose_(verbose) {} + remaining_chunk_bytes_(0), verbose_(verbose), exception_() {} ~Watcher() {} // Unlocks the completion_ lock. + + private: + // A representation of all watch-related errors. + class WatcherException { + public: + WatcherException(const std::string& what) : explanation_(what) {} + const std::string& what() const { return explanation_; } + private: + std::string explanation_; + }; + + public: void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { if (!error) { + if (!exception_.empty()) { + // We've encountered an unrecoverable error -- just ignore the rest of + // the input. + return; + } + + try { //#ifdef VERBOSE -// LOG(DEBUG) << "Watch notification: '" -// << std::string(std::begin(range), std::end(range)) -// << "'"; +// LOG(DEBUG) << "Watch notification: '" +// << std::string(std::begin(range), std::end(range)) +// << "'"; //#endif - boost::iterator_range pos = range; - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); + boost::iterator_range pos = range; + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif - } + } - if (remaining_chunk_bytes_ == 0) { - // Invoke the callback. - CompleteChunk(); + if (remaining_chunk_bytes_ == 0) { + // Invoke the callback. + CompleteChunk(); - // Process the next batch. - while (remaining_chunk_bytes_ == 0 && - std::begin(pos) != std::end(pos)) { - pos = StartNewChunk(pos); - } + // Process the next batch. + while (remaining_chunk_bytes_ == 0 && + std::begin(pos) != std::end(pos)) { + pos = StartNewChunk(pos); + } //#ifdef VERBOSE -// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ -// << " bytes remaining"; +// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ +// << " bytes remaining"; //#endif - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif + } } + } catch (const WatcherException& e) { + LOG(ERROR) << "Callback got exception " << e.what(); + exception_ = e.what(); } } else { if (error == boost::asio::error::eof) { @@ -628,9 +651,16 @@ struct Watcher { } } + // The watch callback may throw an exception. Because the watcher runs in a + // separate thread, the exception will be preserved when the watcher exits, + // and can be accessed via this function. An empty string is returned when + // there was no exception. + const std::string& exception() const { return exception_; } + private: boost::iterator_range - StartNewChunk(const boost::iterator_range& range) { + StartNewChunk(const boost::iterator_range& range) + throw(WatcherException) { if (remaining_chunk_bytes_ != 0) { LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_ << " bytes remaining"; @@ -651,15 +681,12 @@ struct Watcher { #endif return boost::iterator_range(iter, end); } else if (iter == end) { - std::string line(begin, end); - LOG(ERROR) << "Invalid chunked encoding: '" - << std::string(begin, end) - << "'; exiting"; - if (verbose_) { - LOG(INFO) << "Unlocking completion mutex"; - } - completion_.unlock(); - return boost::iterator_range(end, end); + throw WatcherException(boost::algorithm::join( + std::vector{ + "Invalid chunked encoding: '", + std::string(begin, end), + "'; exiting" + }, "")); } std::string line(begin, iter); iter = std::next(iter, crlf.size()); @@ -672,10 +699,11 @@ struct Watcher { } boost::iterator_range - ReadNextChunk(const boost::iterator_range& range) { + ReadNextChunk(const boost::iterator_range& range) + throw(WatcherException) { if (remaining_chunk_bytes_ == 0) { LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; - return range; + return range; // TODO: should this throw an exception instead? } const std::string crlf("\r\n"); @@ -691,7 +719,7 @@ struct Watcher { return boost::iterator_range(begin, end); } - void CompleteChunk() { + void CompleteChunk() throw(WatcherException) { if (body_.empty()) { #ifdef VERBOSE LOG(DEBUG) << "Skipping empty watch notification"; @@ -716,7 +744,7 @@ struct Watcher { // LOG(DEBUG) << "All callbacks on '" << body_ << "' completed"; //#endif } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); + throw WatcherException("JSON error: " + e.what()); } } } @@ -726,6 +754,7 @@ struct Watcher { std::string body_; size_t remaining_chunk_bytes_; bool verbose_; + std::string exception_; }; void EventCallback( @@ -775,6 +804,9 @@ void KubernetesReader::WatchMaster( LOG(INFO) << "Waiting for completion"; } std::lock_guard await_completion(completion_mutex); + if (!watcher.exception().empty()) { + throw QueryException(watcher.exception()); + } if (config_.VerboseLogging()) { LOG(INFO) << "WatchMaster completed " << body(response); } @@ -999,8 +1031,9 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); + LOG(ERROR) << "No more pod metadata will be collected"; } catch (const KubernetesReader::QueryException& e) { - // Already logged. + LOG(ERROR) << "No more pod metadata will be collected"; } LOG(INFO) << "Watch thread (pods) exiting"; } @@ -1034,8 +1067,9 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); + LOG(ERROR) << "No more node metadata will be collected"; } catch (const KubernetesReader::QueryException& e) { - // Already logged. + LOG(ERROR) << "No more node metadata will be collected"; } LOG(INFO) << "Watch thread (node) exiting"; } From d62b48dfced6a2c238b63ccbcb7da4a120ee3118 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Mon, 26 Feb 2018 18:43:51 -0500 Subject: [PATCH 4/7] Identify watchers by the endpoint they're watching, to distinguish logs. --- src/kubernetes.cc | 61 +++++++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 87b4dacf..ecb07f65 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -569,9 +569,11 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { - Watcher(std::function event_callback, + Watcher(const std::string& endpoint, + std::function event_callback, std::unique_lock&& completion, bool verbose) - : completion_(std::move(completion)), event_callback_(event_callback), + : name_("Watcher(" + endpoint + ")"), + completion_(std::move(completion)), event_callback_(event_callback), remaining_chunk_bytes_(0), verbose_(verbose), exception_() {} ~Watcher() {} // Unlocks the completion_ lock. @@ -597,7 +599,8 @@ struct Watcher { try { //#ifdef VERBOSE -// LOG(DEBUG) << "Watch notification: '" +// LOG(DEBUG) << name_ << " => " +// << "Watch notification: '" // << std::string(std::begin(range), std::end(range)) // << "'"; //#endif @@ -605,7 +608,8 @@ struct Watcher { if (remaining_chunk_bytes_ != 0) { pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// LOG(DEBUG) << name_ << " => " +// << "Read another chunk; body now is '" << body_ << "'; " // << remaining_chunk_bytes_ << " bytes remaining"; //#endif } @@ -620,32 +624,38 @@ struct Watcher { pos = StartNewChunk(pos); } //#ifdef VERBOSE -// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ +// LOG(DEBUG) << name_ << " => " +// << "Started new chunk; " << remaining_chunk_bytes_ // << " bytes remaining"; //#endif if (remaining_chunk_bytes_ != 0) { pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " +// LOG(DEBUG) << name_ << " => " +// << "Read another chunk; body now is '" << body_ << "'; " // << remaining_chunk_bytes_ << " bytes remaining"; //#endif } } } catch (const WatcherException& e) { - LOG(ERROR) << "Callback got exception " << e.what(); + LOG(ERROR) << name_ << " => " + << "Callback got exception " << e.what(); exception_ = e.what(); } } else { if (error == boost::asio::error::eof) { #ifdef VERBOSE - LOG(DEBUG) << "Watch callback: EOF"; + LOG(DEBUG) << name_ << " => " + << "Watch callback: EOF"; #endif } else { - LOG(ERROR) << "Callback got error " << error; + LOG(ERROR) << name_ << " => " + << "Callback got error " << error; } if (verbose_) { - LOG(INFO) << "Unlocking completion mutex"; + LOG(ERROR) << name_ << " => " + << "Unlocking completion mutex"; } completion_.unlock(); } @@ -662,7 +672,8 @@ struct Watcher { StartNewChunk(const boost::iterator_range& range) throw(WatcherException) { if (remaining_chunk_bytes_ != 0) { - LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_ + LOG(ERROR) << name_ << " => " + << "Starting new chunk with " << remaining_chunk_bytes_ << " bytes remaining"; } @@ -676,7 +687,8 @@ struct Watcher { // Blank lines are fine, just skip them. iter = std::next(iter, crlf.size()); #ifdef VERBOSE - LOG(DEBUG) << "Skipping blank line within chunked encoding;" + LOG(DEBUG) << name_ << " => " + << "Skipping blank line within chunked encoding;" << " remaining data '" << std::string(iter, end) << "'"; #endif return boost::iterator_range(iter, end); @@ -691,7 +703,8 @@ struct Watcher { std::string line(begin, iter); iter = std::next(iter, crlf.size()); //#ifdef VERBOSE -// LOG(DEBUG) << "Line: '" << line << "'"; +// LOG(DEBUG) << name_ << " => " +// << "Line: '" << line << "'"; //#endif std::istringstream stream(line); stream >> std::hex >> remaining_chunk_bytes_; @@ -702,7 +715,8 @@ struct Watcher { ReadNextChunk(const boost::iterator_range& range) throw(WatcherException) { if (remaining_chunk_bytes_ == 0) { - LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; + LOG(ERROR) << name_ << " => " + << "Asked to read next chunk with no bytes remaining"; return range; // TODO: should this throw an exception instead? } @@ -722,26 +736,31 @@ struct Watcher { void CompleteChunk() throw(WatcherException) { if (body_.empty()) { #ifdef VERBOSE - LOG(DEBUG) << "Skipping empty watch notification"; + LOG(DEBUG) << name_ << " => " + << "Skipping empty watch notification"; #endif } else { try { //#ifdef VERBOSE -// LOG(DEBUG) << "Invoking callbacks on '" << body_ << "'"; +// LOG(DEBUG) << name_ << " => " +// << "Invoking callbacks on '" << body_ << "'"; //#endif std::vector events = json::Parser::AllFromString(body_); for (json::value& event : events) { std::string event_str = event->ToString(); #ifdef VERBOSE - LOG(DEBUG) << "Invoking callback('" << event_str << "')"; + LOG(DEBUG) << name_ << " => " + << "Invoking callback('" << event_str << "')"; #endif event_callback_(std::move(event)); #ifdef VERBOSE - LOG(DEBUG) << "callback('" << event_str << "') completed"; + LOG(DEBUG) << name_ << " => " + << "callback('" << event_str << "') completed"; #endif } //#ifdef VERBOSE -// LOG(DEBUG) << "All callbacks on '" << body_ << "' completed"; +// LOG(DEBUG) << name_ << " => " +// << "All callbacks on '" << body_ << "' completed"; //#endif } catch (const json::Exception& e) { throw WatcherException("JSON error: " + e.what()); @@ -749,6 +768,7 @@ struct Watcher { } } + std::string name_; std::unique_lock completion_; std::function event_callback_; std::string body_; @@ -797,7 +817,8 @@ void KubernetesReader::WatchMaster( // A notification for watch completion. std::mutex completion_mutex; std::unique_lock watch_completion(completion_mutex); - Watcher watcher(std::bind(&EventCallback, callback, std::placeholders::_1), + Watcher watcher(endpoint, + std::bind(&EventCallback, callback, std::placeholders::_1), std::move(watch_completion), config_.VerboseLogging()); http::client::response response = client.get(request, boost::ref(watcher)); if (config_.VerboseLogging()) { From ce35ca4a590e04c25b9b71448cba73f2cac59f3d Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Tue, 27 Feb 2018 11:33:55 -0500 Subject: [PATCH 5/7] Turn a return into an exception; minor tweaks. --- src/kubernetes.cc | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index ecb07f65..dfe78c88 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -640,7 +640,7 @@ struct Watcher { } } catch (const WatcherException& e) { LOG(ERROR) << name_ << " => " - << "Callback got exception " << e.what(); + << "Callback got exception: " << e.what(); exception_ = e.what(); } } else { @@ -693,12 +693,9 @@ struct Watcher { #endif return boost::iterator_range(iter, end); } else if (iter == end) { - throw WatcherException(boost::algorithm::join( - std::vector{ - "Invalid chunked encoding: '", - std::string(begin, end), - "'; exiting" - }, "")); + throw WatcherException("Invalid chunked encoding: '" + + std::string(begin, end) + + "'; exiting"); } std::string line(begin, iter); iter = std::next(iter, crlf.size()); @@ -715,9 +712,8 @@ struct Watcher { ReadNextChunk(const boost::iterator_range& range) throw(WatcherException) { if (remaining_chunk_bytes_ == 0) { - LOG(ERROR) << name_ << " => " - << "Asked to read next chunk with no bytes remaining"; - return range; // TODO: should this throw an exception instead? + throw WatcherException( + "Asked to read next chunk with no bytes remaining"); } const std::string crlf("\r\n"); From 44eadf779df558f35548cc2cba19ef70317f68e1 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Tue, 27 Feb 2018 14:03:07 -0500 Subject: [PATCH 6/7] Log before throwing an exception. --- src/kubernetes.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index dfe78c88..67dc5574 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -822,6 +822,8 @@ void KubernetesReader::WatchMaster( } std::lock_guard await_completion(completion_mutex); if (!watcher.exception().empty()) { + LOG(ERROR) << "Exception from the watcher thread: " + << watcher.exception(); throw QueryException(watcher.exception()); } if (config_.VerboseLogging()) { From 64d335c7a17a7c10b487292adb3077b9ac9cdaf8 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Tue, 27 Feb 2018 16:39:55 -0500 Subject: [PATCH 7/7] exception_ -> exception_message_. --- src/kubernetes.cc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 67dc5574..4dc20397 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -574,7 +574,7 @@ struct Watcher { std::unique_lock&& completion, bool verbose) : name_("Watcher(" + endpoint + ")"), completion_(std::move(completion)), event_callback_(event_callback), - remaining_chunk_bytes_(0), verbose_(verbose), exception_() {} + remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {} ~Watcher() {} // Unlocks the completion_ lock. private: @@ -591,7 +591,7 @@ struct Watcher { void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { if (!error) { - if (!exception_.empty()) { + if (!exception_message_.empty()) { // We've encountered an unrecoverable error -- just ignore the rest of // the input. return; @@ -641,7 +641,7 @@ struct Watcher { } catch (const WatcherException& e) { LOG(ERROR) << name_ << " => " << "Callback got exception: " << e.what(); - exception_ = e.what(); + exception_message_ = e.what(); } } else { if (error == boost::asio::error::eof) { @@ -665,7 +665,7 @@ struct Watcher { // separate thread, the exception will be preserved when the watcher exits, // and can be accessed via this function. An empty string is returned when // there was no exception. - const std::string& exception() const { return exception_; } + const std::string& exception() const { return exception_message_; } private: boost::iterator_range @@ -770,7 +770,7 @@ struct Watcher { std::string body_; size_t remaining_chunk_bytes_; bool verbose_; - std::string exception_; + std::string exception_message_; }; void EventCallback(