From 74cb4c73624bea850c0d11a47d954073e36ecd24 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Sat, 17 Mar 2018 17:55:47 -0400 Subject: [PATCH 1/3] Use upstream chunked encoding processing. --- src/kubernetes.cc | 211 +++++++--------------------------------------- 1 file changed, 32 insertions(+), 179 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index cde233b3..c63fb0a8 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -605,80 +605,50 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const namespace { struct Watcher { Watcher(const std::string& endpoint, - std::function body_callback, + std::function event_callback, std::unique_lock&& completion, bool verbose) : name_("Watcher(" + endpoint + ")"), - completion_(std::move(completion)), body_callback_(body_callback), - remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {} + completion_(std::move(completion)), event_callback_(event_callback), + verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. - private: - // A representation of all watch-related errors. - class WatcherException { - public: - WatcherException(const std::string& what) : explanation_(what) {} - const std::string& what() const { return explanation_; } - private: - std::string explanation_; - }; - public: void operator()(const boost::iterator_range& range, const boost::system::error_code& error) { - if (!error) { - if (!exception_message_.empty()) { - // We've encountered an unrecoverable error -- just ignore the rest of - // the input. - return; - } - + const std::string body(std::begin(range), std::end(range)); + if (!body.empty()) { try { //#ifdef VERBOSE // LOG(DEBUG) << name_ << " => " -// << "Watch notification: '" -// << std::string(std::begin(range), std::end(range)) -// << "'"; -//#endif - boost::iterator_range pos = range; - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// << "Invoking callbacks on '" << body << "'"; //#endif + std::vector events = json::Parser::AllFromString(body); + for (json::value& event : events) { + std::string event_str = event->ToString(); +#ifdef VERBOSE + LOG(DEBUG) << name_ << " => " + << "Invoking callback('" << event_str << "')"; +#endif + event_callback_(std::move(event)); +#ifdef VERBOSE + LOG(DEBUG) << name_ << " => " + << "callback('" << event_str << "') completed"; +#endif } - - if (remaining_chunk_bytes_ == 0) { - // Invoke the callback. - body_callback_(body_); - - // Process the next batch. - while (remaining_chunk_bytes_ == 0 && - std::begin(pos) != std::end(pos)) { - pos = StartNewChunk(pos); - } //#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Started new chunk; " << remaining_chunk_bytes_ -// << " bytes remaining"; -//#endif - - if (remaining_chunk_bytes_ != 0) { - pos = ReadNextChunk(pos); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Read another chunk; body now is '" << body_ << "'; " -// << remaining_chunk_bytes_ << " bytes remaining"; +// LOG(DEBUG) << name_ << " => " +// << "All callbacks on '" << body << "' completed"; //#endif - } - } - } catch (const WatcherException& e) { - LOG(ERROR) << name_ << " => " - << "Callback got exception: " << e.what(); - exception_message_ = e.what(); + } catch (const json::Exception& e) { + LOG(ERROR) << "Unable to process events: " << e.what(); } - } else { + } else if (!error) { +#ifdef VERBOSE + LOG(DEBUG) << name_ << " => " + << "Skipping empty watch notification"; +#endif + } + if (error) { if (error == boost::asio::error::eof) { #ifdef VERBOSE LOG(DEBUG) << name_ << " => " @@ -696,120 +666,13 @@ struct Watcher { } } - // The watch callback may throw an exception. Because the watcher runs in a - // separate thread, the exception will be preserved when the watcher exits, - // and can be accessed via this function. An empty string is returned when - // there was no exception. - const std::string& exception() const { return exception_message_; } - private: - boost::iterator_range - StartNewChunk(const boost::iterator_range& range) - throw(WatcherException) { - if (remaining_chunk_bytes_ != 0) { - LOG(ERROR) << name_ << " => " - << "Starting new chunk with " << remaining_chunk_bytes_ - << " bytes remaining"; - } - - body_.clear(); - - const std::string crlf("\r\n"); - auto begin = std::begin(range); - auto end = std::end(range); - auto iter = std::search(begin, end, crlf.begin(), crlf.end()); - if (iter == begin) { - // Blank lines are fine, just skip them. - iter = std::next(iter, crlf.size()); -#ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Skipping blank line within chunked encoding;" - << " remaining data '" << std::string(iter, end) << "'"; -#endif - return boost::iterator_range(iter, end); - } else if (iter == end) { - throw WatcherException("Invalid chunked encoding: '" + - std::string(begin, end) + - "'; exiting"); - } - std::string line(begin, iter); - iter = std::next(iter, crlf.size()); -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Line: '" << line << "'"; -//#endif - std::istringstream stream(line); - stream >> std::hex >> remaining_chunk_bytes_; - return boost::iterator_range(iter, end); - } - - boost::iterator_range - ReadNextChunk(const boost::iterator_range& range) - throw(WatcherException) { - if (remaining_chunk_bytes_ == 0) { - throw WatcherException( - "Asked to read next chunk with no bytes remaining"); - } - - const std::string crlf("\r\n"); - auto begin = std::begin(range); - auto end = std::end(range); - // The available bytes in the current notification, which may include the - // remainder of this chunk and the start of the next one. - const size_t available = std::distance(begin, end); - const size_t len = std::min(available, remaining_chunk_bytes_); - body_.insert(body_.end(), begin, begin + len); - remaining_chunk_bytes_ -= len; - begin = std::next(begin, len); - return boost::iterator_range(begin, end); - } - std::string name_; std::unique_lock completion_; - std::function body_callback_; - std::string body_; - size_t remaining_chunk_bytes_; + std::function event_callback_; bool verbose_; - std::string exception_message_; }; -void BodyCallback(const std::string& name, - std::function event_callback, - const std::string& body) { - if (body.empty()) { -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "Skipping empty watch notification"; -#endif - } else { - try { -//#ifdef VERBOSE -// LOG(DEBUG) << name << " => " -// << "Invoking callbacks on '" << body << "'"; -//#endif - std::vector events = json::Parser::AllFromString(body); - for (json::value& event : events) { - std::string event_str = event->ToString(); -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "Invoking callback('" << event_str << "')"; -#endif - event_callback(std::move(event)); -#ifdef VERBOSE - LOG(DEBUG) << name << " => " - << "callback('" << event_str << "') completed"; -#endif - } -//#ifdef VERBOSE -// LOG(DEBUG) << name << " => " -// << "All callbacks on '" << body << "' completed"; -//#endif - } catch (const json::Exception& e) { - LOG(ERROR) << "Unable to process events: " << e.what(); - } - } -} - void WatchEventCallback( std::function callback, json::value raw_watch) throw(json::Exception) { @@ -839,7 +702,6 @@ void KubernetesReader::WatchMaster( config_.KubernetesEndpointHost() + path + watch_param); http::client client( http::client::options() - .remove_chunk_markers(false) .openssl_certificate(SecretPath("ca.crt"))); http::client::request request(endpoint); request << boost::network::header( @@ -856,12 +718,8 @@ void KubernetesReader::WatchMaster( std::unique_lock watch_completion(completion_mutex); Watcher watcher( endpoint, - [=](const std::string& body) { - BodyCallback(name, - [=](json::value raw_watch) { - WatchEventCallback(callback, std::move(raw_watch)); - }, - body); + [=](json::value raw_watch) { + WatchEventCallback(callback, std::move(raw_watch)); }, std::move(watch_completion), config_.VerboseLogging()); http::client::response response = client.get(request, std::ref(watcher)); @@ -869,11 +727,6 @@ void KubernetesReader::WatchMaster( LOG(INFO) << "Waiting for completion"; } std::lock_guard await_completion(completion_mutex); - if (!watcher.exception().empty()) { - LOG(ERROR) << "Exception from the watcher thread: " - << watcher.exception(); - throw QueryException(watcher.exception()); - } if (config_.VerboseLogging()) { LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response); } From 9856fd9e48c15b8833324294ff81702fbb0c554d Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 30 Mar 2018 17:13:04 -0400 Subject: [PATCH 2/3] Use the streaming JSON parser. --- src/kubernetes.cc | 69 +++++++++++++++-------------------------------- 1 file changed, 21 insertions(+), 48 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index c63fb0a8..323bc3cd 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -608,7 +608,7 @@ struct Watcher { std::function event_callback, std::unique_lock&& completion, bool verbose) : name_("Watcher(" + endpoint + ")"), - completion_(std::move(completion)), event_callback_(event_callback), + completion_(std::move(completion)), event_parser_(event_callback), verbose_(verbose) {} ~Watcher() {} // Unlocks the completion_ lock. @@ -618,49 +618,29 @@ struct Watcher { const std::string body(std::begin(range), std::end(range)); if (!body.empty()) { try { -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "Invoking callbacks on '" << body << "'"; -//#endif - std::vector events = json::Parser::AllFromString(body); - for (json::value& event : events) { - std::string event_str = event->ToString(); #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Invoking callback('" << event_str << "')"; + LOG(DEBUG) << name_ << " => Parsing '" << body << "'"; #endif - event_callback_(std::move(event)); -#ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "callback('" << event_str << "') completed"; -#endif - } -//#ifdef VERBOSE -// LOG(DEBUG) << name_ << " => " -// << "All callbacks on '" << body << "' completed"; -//#endif + std::istringstream input(body); + event_parser_.ParseStream(input); } catch (const json::Exception& e) { LOG(ERROR) << "Unable to process events: " << e.what(); } } else if (!error) { #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Skipping empty watch notification"; + LOG(DEBUG) << name_ << " => Skipping empty watch notification"; #endif } if (error) { if (error == boost::asio::error::eof) { #ifdef VERBOSE - LOG(DEBUG) << name_ << " => " - << "Watch callback: EOF"; + LOG(DEBUG) << name_ << " => Watch callback: EOF"; #endif } else { - LOG(ERROR) << name_ << " => " - << "Callback got error " << error; + LOG(ERROR) << name_ << " => Callback got error " << error; } if (verbose_) { - LOG(ERROR) << name_ << " => " - << "Unlocking completion mutex"; + LOG(ERROR) << name_ << " => Unlocking completion mutex"; } completion_.unlock(); } @@ -669,16 +649,19 @@ struct Watcher { private: std::string name_; std::unique_lock completion_; - std::function event_callback_; + json::Parser event_parser_; bool verbose_; }; void WatchEventCallback( std::function callback, - json::value raw_watch) throw(json::Exception) { + const std::string& name, json::value raw_watch) + throw(json::Exception) { Timestamp collected_at = std::chrono::system_clock::now(); - //LOG(ERROR) << "Watch callback: " << *raw_watch; +#ifdef VERBOSE + LOG(DEBUG) << name << " => WatchEventCallback('" << *raw_watch << "')"; +#endif const json::Object* watch = raw_watch->As(); const std::string type = watch->Get("type"); const json::Object* object = watch->Get("object"); @@ -706,11 +689,12 @@ void KubernetesReader::WatchMaster( http::client::request request(endpoint); request << boost::network::header( "Authorization", "Bearer " + KubernetesApiToken()); - if (config_.VerboseLogging()) { + const bool verbose = config_.VerboseLogging(); + if (verbose) { LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint; } try { - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "Locking completion mutex"; } // A notification for watch completion. @@ -719,28 +703,17 @@ void KubernetesReader::WatchMaster( Watcher watcher( endpoint, [=](json::value raw_watch) { - WatchEventCallback(callback, std::move(raw_watch)); + WatchEventCallback(callback, name, std::move(raw_watch)); }, - std::move(watch_completion), config_.VerboseLogging()); + std::move(watch_completion), verbose); http::client::response response = client.get(request, std::ref(watcher)); - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "Waiting for completion"; } std::lock_guard await_completion(completion_mutex); - if (config_.VerboseLogging()) { + if (verbose) { LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response); } - std::string encoding; -#ifdef VERBOSE - LOG(DEBUG) << "response headers: " << response.headers(); -#endif - auto transfer_encoding_header = headers(response)["Transfer-Encoding"]; - if (!boost::empty(transfer_encoding_header)) { - encoding = boost::begin(transfer_encoding_header)->second; - } - if (encoding != "chunked") { - LOG(ERROR) << "Expected chunked encoding; found '" << encoding << "'"; - } } catch (const boost::system::system_error& e) { LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what(); throw QueryException(endpoint + " -> " + e.what()); From 53f9b62b40a91dd7dbd8e5be90f73d2713085286 Mon Sep 17 00:00:00 2001 From: Igor Peshansky Date: Fri, 30 Mar 2018 17:42:17 -0400 Subject: [PATCH 3/3] Minor formatting fix. --- src/kubernetes.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 323bc3cd..e1fe00bb 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -655,8 +655,7 @@ struct Watcher { void WatchEventCallback( std::function callback, - const std::string& name, json::value raw_watch) - throw(json::Exception) { + const std::string& name, json::value raw_watch) throw(json::Exception) { Timestamp collected_at = std::chrono::system_clock::now(); #ifdef VERBOSE