diff --git a/src/kubernetes.cc b/src/kubernetes.cc index cde233b3..e1fe00bb 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -605,217 +605,62 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { Watcher(const std::string& endpoint, - std::function body_callback, + std::function event_callback, std::unique_lock&& completion, bool verbose) : name_("Watcher(" + endpoint + ")"), - completion_(std::move(completion)), body_callback_(body_callback), - remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {} + completion_(std::move(completion)), event_parser_(event_callback), + verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. - private: - // A representation of all watch-related errors. - class WatcherException { - public: - WatcherException(const std::string& what) : explanation_(what) {} - const std::string& what() const { return explanation_; } - private: - std::string explanation_; - }; - public: void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { - if (!error) { - if (!exception_message_.empty()) { - // We've encountered an unrecoverable error -- just ignore the rest of - // the input. - return; - } - + const std::string body(std::begin(range), std::end(range)); + if (!body.empty()) { try { -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Watch notification: '" -// << std::string(std::begin(range), std::end(range)) -// << "'"; -//#endif - boost::iterator_range pos = range; - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; -//#endif - } - - if (remaining_chunk_bytes_ == 0) { - // Invoke the callback. - body_callback_(body_); - - // Process the next batch. - while (remaining_chunk_bytes_ == 0 && - std::begin(pos) != std::end(pos)) { - pos = StartNewChunk(pos); - } -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Started new chunk; " << remaining_chunk_bytes_ -// << " bytes remaining"; -//#endif - - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; -//#endif - } - } - } catch (const WatcherException& e) { - LOG(ERROR) << name_ << " => " - << "Callback got exception: " << e.what(); - exception_message_ = e.what(); +#ifdef VERBOSE + LOG(DEBUG) << name_ << " => Parsing '" << body << "'"; +#endif + std::istringstream input(body); + event_parser_.ParseStream(input); + } catch (const json::Exception& e) { + LOG(ERROR) << "Unable to process events: " << e.what(); } - } else { + } else if (!error) { +#ifdef VERBOSE + LOG(DEBUG) << name_ << " => Skipping empty watch notification"; +#endif + } + if (error) { if (error == boost::asio::error::eof) { #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Watch callback: EOF"; + LOG(DEBUG) << name_ << " => Watch callback: EOF"; #endif } else { - LOG(ERROR) << name_ << " => " - << "Callback got error " << error; + LOG(ERROR) << name_ << " => Callback got error " << error; } if (verbose_) { - LOG(ERROR) << name_ << " => " - << "Unlocking completion mutex"; + LOG(ERROR) << name_ << " => Unlocking completion mutex"; } completion_.unlock(); } } - // The watch callback may throw an exception. Because the watcher runs in a - // separate thread, the exception will be preserved when the watcher exits, - // and can be accessed via this function. An empty string is returned when - // there was no exception. - const std::string& exception() const { return exception_message_; } - private: - boost::iterator_range - StartNewChunk(const boost::iterator_range& range) - throw(WatcherException) { - if (remaining_chunk_bytes_ != 0) { - LOG(ERROR) << name_ << " => " - << "Starting new chunk with " << remaining_chunk_bytes_ - << " bytes remaining"; - } - - body_.clear(); - - const std::string crlf("\r\n"); - auto begin = std::begin(range); - auto end = std::end(range); - auto iter = std::search(begin, end, crlf.begin(), crlf.end()); - if (iter == begin) { - // Blank lines are fine, just skip them. - iter = std::next(iter, crlf.size()); -#ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Skipping blank line within chunked encoding;" - << " remaining data '" << std::string(iter, end) << "'"; -#endif - return boost::iterator_range(iter, end); - } else if (iter == end) { - throw WatcherException("Invalid chunked encoding: '" + - std::string(begin, end) + - "'; exiting"); - } - std::string line(begin, iter); - iter = std::next(iter, crlf.size()); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Line: '" << line << "'"; -//#endif - std::istringstream stream(line); - stream >> std::hex >> remaining_chunk_bytes_; - return boost::iterator_range(iter, end); - } - - boost::iterator_range - ReadNextChunk(const boost::iterator_range& range) - throw(WatcherException) { - if (remaining_chunk_bytes_ == 0) { - throw WatcherException( - "Asked to read next chunk with no bytes remaining"); - } - - const std::string crlf("\r\n"); - auto begin = std::begin(range); - auto end = std::end(range); - // The available bytes in the current notification, which may include the - // remainder of this chunk and the start of the next one. - const size_t available = std::distance(begin, end); - const size_t len = std::min(available, remaining_chunk_bytes_); - body_.insert(body_.end(), begin, begin + len); - remaining_chunk_bytes_ -= len; - begin = std::next(begin, len); - return boost::iterator_range(begin, end); - } - std::string name_; std::unique_lock completion_; - std::function body_callback_; - std::string body_; - size_t remaining_chunk_bytes_; + json::Parser event_parser_; bool verbose_; - std::string exception_message_; }; -void BodyCallback(const std::string& name, - std::function event_callback, - const std::string& body) { - if (body.empty()) { -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "Skipping empty watch notification"; -#endif - } else { - try { -//#ifdef VERBOSE -// LOG(DEBUG) << name << " => " -// << "Invoking callbacks on '" << body << "'"; -//#endif - std::vector events = json::Parser::AllFromString(body); - for (json::value& event : events) { - std::string event_str = event->ToString(); -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "Invoking callback('" << event_str << "')"; -#endif - event_callback(std::move(event)); -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "callback('" << event_str << "') completed"; -#endif - } -//#ifdef VERBOSE -// LOG(DEBUG) << name << " => " -// << "All callbacks on '" << body << "' completed"; -//#endif - } catch (const json::Exception& e) { - LOG(ERROR) << "Unable to process events: " << e.what(); - } - } -} - void WatchEventCallback( std::function callback, - json::value raw_watch) throw(json::Exception) { + const std::string& name, json::value raw_watch) throw(json::Exception) { Timestamp collected_at = std::chrono::system_clock::now(); - //LOG(ERROR) << "Watch callback: " << *raw_watch; +#ifdef VERBOSE + LOG(DEBUG) << name << " => WatchEventCallback('" << *raw_watch << "')"; +#endif const json::Object* watch = raw_watch->As(); const std::string type = watch->Get("type"); const json::Object* object = watch->Get("object"); @@ -839,16 +684,16 @@ void KubernetesReader::WatchMaster( config_.KubernetesEndpointHost() + path + watch_param); http::client client( http::client::options() - .remove_chunk_markers(false) .openssl_certificate(SecretPath("ca.crt"))); http::client::request request(endpoint); request << boost::network::header( "Authorization", "Bearer " + KubernetesApiToken()); - if (config_.VerboseLogging()) { + const bool verbose = config_.VerboseLogging(); + if (verbose) { LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint; } try { - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "Locking completion mutex"; } // A notification for watch completion. @@ -856,38 +701,18 @@ void KubernetesReader::WatchMaster( std::unique_lock watch_completion(completion_mutex); Watcher watcher( endpoint, - [=](const std::string& body) { - BodyCallback(name, - [=](json::value raw_watch) { - WatchEventCallback(callback, std::move(raw_watch)); - }, - body); + [=](json::value raw_watch) { + WatchEventCallback(callback, name, std::move(raw_watch)); }, - std::move(watch_completion), config_.VerboseLogging()); + std::move(watch_completion), verbose); http::client::response response = client.get(request, std::ref(watcher)); - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "Waiting for completion"; } std::lock_guard await_completion(completion_mutex); - if (!watcher.exception().empty()) { - LOG(ERROR) << "Exception from the watcher thread: " - << watcher.exception(); - throw QueryException(watcher.exception()); - } - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response); } - std::string encoding; -#ifdef VERBOSE - LOG(DEBUG) << "response headers: " << response.headers(); -#endif - auto transfer_encoding_header = headers(response)["Transfer-Encoding"]; - if (!boost::empty(transfer_encoding_header)) { - encoding = boost::begin(transfer_encoding_header)->second; - } - if (encoding != "chunked") { - LOG(ERROR) << "Expected chunked encoding; found '" << encoding << "'"; - } } catch (const boost::system::system_error& e) { LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); throw QueryException(endpoint + " -> " + e.what());