diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 6205d8bc..4dc20397 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -24,6 +24,7 @@ #include #include #include +#include #include #include "format.h" @@ -568,70 +569,111 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { - Watcher(std::function event_callback, + Watcher(const std::string& endpoint, + std::function event_callback, std::unique_lock&& completion, bool verbose) - : completion_(std::move(completion)), event_callback_(event_callback), - remaining_chunk_bytes_(0), verbose_(verbose) {} + : name_("Watcher(" + endpoint + ")"), + completion_(std::move(completion)), event_callback_(event_callback), + remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {} ~Watcher() {} // Unlocks the completion_ lock. + + private: + // A representation of all watch-related errors. + class WatcherException { + public: + WatcherException(const std::string& what) : explanation_(what) {} + const std::string& what() const { return explanation_; } + private: + std::string explanation_; + }; + + public: void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { if (!error) { + if (!exception_message_.empty()) { + // We've encountered an unrecoverable error -- just ignore the rest of + // the input. + return; + } + + try { //#ifdef VERBOSE -// LOG(DEBUG) << "Watch notification: '" -// << std::string(std::begin(range), std::end(range)) -// << "'"; +// LOG(DEBUG) << name_ << " => " +// << "Watch notification: '" +// << std::string(std::begin(range), std::end(range)) +// << "'"; //#endif - boost::iterator_range pos = range; - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); + boost::iterator_range pos = range; + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// LOG(DEBUG) << name_ << " => " +// << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif - } + } - if (remaining_chunk_bytes_ == 0) { - // Invoke the callback. - CompleteChunk(); + if (remaining_chunk_bytes_ == 0) { + // Invoke the callback. + CompleteChunk(); - // Process the next batch. - while (remaining_chunk_bytes_ == 0 && - std::begin(pos) != std::end(pos)) { - pos = StartNewChunk(pos); - } + // Process the next batch. + while (remaining_chunk_bytes_ == 0 && + std::begin(pos) != std::end(pos)) { + pos = StartNewChunk(pos); + } //#ifdef VERBOSE -// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_ -// << " bytes remaining"; +// LOG(DEBUG) << name_ << " => " +// << "Started new chunk; " << remaining_chunk_bytes_ +// << " bytes remaining"; //#endif - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); + if (remaining_chunk_bytes_ != 0) { + pos = ReadNextChunk(pos); //#ifdef VERBOSE -// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// LOG(DEBUG) << name_ << " => " +// << "Read another chunk; body now is '" << body_ << "'; " +// << remaining_chunk_bytes_ << " bytes remaining"; //#endif + } } + } catch (const WatcherException& e) { + LOG(ERROR) << name_ << " => " + << "Callback got exception: " << e.what(); + exception_message_ = e.what(); } } else { if (error == boost::asio::error::eof) { #ifdef VERBOSE - LOG(DEBUG) << "Watch callback: EOF"; + LOG(DEBUG) << name_ << " => " + << "Watch callback: EOF"; #endif } else { - LOG(ERROR) << "Callback got error " << error; + LOG(ERROR) << name_ << " => " + << "Callback got error " << error; } if (verbose_) { - LOG(INFO) << "Unlocking completion mutex"; + LOG(ERROR) << name_ << " => " + << "Unlocking completion mutex"; } completion_.unlock(); } } + // The watch callback may throw an exception. Because the watcher runs in a + // separate thread, the exception will be preserved when the watcher exits, + // and can be accessed via this function. An empty string is returned when + // there was no exception. + const std::string& exception() const { return exception_message_; } + private: boost::iterator_range - StartNewChunk(const boost::iterator_range& range) { + StartNewChunk(const boost::iterator_range& range) + throw(WatcherException) { if (remaining_chunk_bytes_ != 0) { - LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_ + LOG(ERROR) << name_ << " => " + << "Starting new chunk with " << remaining_chunk_bytes_ << " bytes remaining"; } @@ -645,31 +687,33 @@ struct Watcher { // Blank lines are fine, just skip them. iter = std::next(iter, crlf.size()); #ifdef VERBOSE - LOG(DEBUG) << "Skipping blank line within chunked encoding;" + LOG(DEBUG) << name_ << " => " + << "Skipping blank line within chunked encoding;" << " remaining data '" << std::string(iter, end) << "'"; #endif return boost::iterator_range(iter, end); } else if (iter == end) { - LOG(ERROR) << "Invalid chunked encoding: '" - << std::string(begin, end) - << "'"; - return boost::iterator_range(begin, end); + throw WatcherException("Invalid chunked encoding: '" + + std::string(begin, end) + + "'; exiting"); } std::string line(begin, iter); iter = std::next(iter, crlf.size()); //#ifdef VERBOSE -// LOG(DEBUG) << "Line: '" << line << "'"; +// LOG(DEBUG) << name_ << " => " +// << "Line: '" << line << "'"; //#endif - std::stringstream stream(line); + std::istringstream stream(line); stream >> std::hex >> remaining_chunk_bytes_; return boost::iterator_range(iter, end); } boost::iterator_range - ReadNextChunk(const boost::iterator_range& range) { + ReadNextChunk(const boost::iterator_range& range) + throw(WatcherException) { if (remaining_chunk_bytes_ == 0) { - LOG(ERROR) << "Asked to read next chunk with no bytes remaining"; - return range; + throw WatcherException( + "Asked to read next chunk with no bytes remaining"); } const std::string crlf("\r\n"); @@ -685,41 +729,48 @@ struct Watcher { return boost::iterator_range(begin, end); } - void CompleteChunk() { + void CompleteChunk() throw(WatcherException) { if (body_.empty()) { #ifdef VERBOSE - LOG(DEBUG) << "Skipping empty watch notification"; + LOG(DEBUG) << name_ << " => " + << "Skipping empty watch notification"; #endif } else { try { //#ifdef VERBOSE -// LOG(DEBUG) << "Invoking callbacks on '" << body_ << "'"; +// LOG(DEBUG) << name_ << " => " +// << "Invoking callbacks on '" << body_ << "'"; //#endif std::vector events = json::Parser::AllFromString(body_); for (json::value& event : events) { std::string event_str = event->ToString(); #ifdef VERBOSE - LOG(DEBUG) << "Invoking callback('" << event_str << "')"; + LOG(DEBUG) << name_ << " => " + << "Invoking callback('" << event_str << "')"; #endif event_callback_(std::move(event)); #ifdef VERBOSE - LOG(DEBUG) << "callback('" << event_str << "') completed"; + LOG(DEBUG) << name_ << " => " + << "callback('" << event_str << "') completed"; #endif } //#ifdef VERBOSE -// LOG(DEBUG) << "All callbacks on '" << body_ << "' completed"; +// LOG(DEBUG) << name_ << " => " +// << "All callbacks on '" << body_ << "' completed"; //#endif } catch (const json::Exception& e) { - LOG(ERROR) << e.what(); + throw WatcherException("JSON error: " + e.what()); } } } + std::string name_; std::unique_lock completion_; std::function event_callback_; std::string body_; size_t remaining_chunk_bytes_; bool verbose_; + std::string exception_message_; }; void EventCallback( @@ -762,13 +813,19 @@ void KubernetesReader::WatchMaster( // A notification for watch completion. std::mutex completion_mutex; std::unique_lock watch_completion(completion_mutex); - Watcher watcher(std::bind(&EventCallback, callback, std::placeholders::_1), + Watcher watcher(endpoint, + std::bind(&EventCallback, callback, std::placeholders::_1), std::move(watch_completion), config_.VerboseLogging()); http::client::response response = client.get(request, boost::ref(watcher)); if (config_.VerboseLogging()) { LOG(INFO) << "Waiting for completion"; } std::lock_guard await_completion(completion_mutex); + if (!watcher.exception().empty()) { + LOG(ERROR) << "Exception from the watcher thread: " + << watcher.exception(); + throw QueryException(watcher.exception()); + } if (config_.VerboseLogging()) { LOG(INFO) << "WatchMaster completed " << body(response); } @@ -993,8 +1050,9 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback) std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); + LOG(ERROR) << "No more pod metadata will be collected"; } catch (const KubernetesReader::QueryException& e) { - // Already logged. + LOG(ERROR) << "No more pod metadata will be collected"; } LOG(INFO) << "Watch thread (pods) exiting"; } @@ -1028,8 +1086,9 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback) std::placeholders::_2, std::placeholders::_3)); } catch (const json::Exception& e) { LOG(ERROR) << e.what(); + LOG(ERROR) << "No more node metadata will be collected"; } catch (const KubernetesReader::QueryException& e) { - // Already logged. + LOG(ERROR) << "No more node metadata will be collected"; } LOG(INFO) << "Watch thread (node) exiting"; }