Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
161 changes: 110 additions & 51 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <chrono>
#include <cstddef>
#include <fstream>
#include <sstream>
#include <tuple>

#include "format.h"
Expand Down Expand Up @@ -568,70 +569,111 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const

namespace {
struct Watcher {
Watcher(std::function<void(json::value)> event_callback,
Watcher(const std::string& endpoint,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea adding a watching name to make the error log more informative.

std::function<void(json::value)> event_callback,
std::unique_lock<std::mutex>&& completion, bool verbose)
: completion_(std::move(completion)), event_callback_(event_callback),
remaining_chunk_bytes_(0), verbose_(verbose) {}
: name_("Watcher(" + endpoint + ")"),
completion_(std::move(completion)), event_callback_(event_callback),
remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {}
~Watcher() {} // Unlocks the completion_ lock.

private:
// A representation of all watch-related errors.
class WatcherException {
public:
WatcherException(const std::string& what) : explanation_(what) {}
const std::string& what() const { return explanation_; }
private:
std::string explanation_;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called explanation_ instead of what_?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency with all other exceptions. :-) If we choose to change it, we should do it in one fell swoop, but we could also keep it this way.
e.what() is fairly standard in Boost and others.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with e.what(), I just got confused why the local variable is called explanation_, not critical in either case so happy to just approve.

};

public:
void operator()(const boost::iterator_range<const char*>& range,
const boost::system::error_code& error) {
if (!error) {
if (!exception_message_.empty()) {
// We've encountered an unrecoverable error -- just ignore the rest of
// the input.
return;
}

try {
//#ifdef VERBOSE

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seem to be many commented-out lines like this. Should we clean them up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in this PR. I have a bug open to factor out the chunked encoding handler — will clean this up when I do that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

// LOG(DEBUG) << "Watch notification: '"
// << std::string(std::begin(range), std::end(range))
// << "'";
// LOG(DEBUG) << name_ << " => "
// << "Watch notification: '"
// << std::string(std::begin(range), std::end(range))
// << "'";
//#endif
boost::iterator_range<const char*> pos = range;
if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
boost::iterator_range<const char*> pos = range;
if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
//#ifdef VERBOSE
// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
// LOG(DEBUG) << name_ << " => "
// << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
//#endif
}
}

if (remaining_chunk_bytes_ == 0) {
// Invoke the callback.
CompleteChunk();
if (remaining_chunk_bytes_ == 0) {
// Invoke the callback.
CompleteChunk();

// Process the next batch.
while (remaining_chunk_bytes_ == 0 &&
std::begin(pos) != std::end(pos)) {
pos = StartNewChunk(pos);
}
// Process the next batch.
while (remaining_chunk_bytes_ == 0 &&
std::begin(pos) != std::end(pos)) {
pos = StartNewChunk(pos);
}
//#ifdef VERBOSE
// LOG(DEBUG) << "Started new chunk; " << remaining_chunk_bytes_
// << " bytes remaining";
// LOG(DEBUG) << name_ << " => "
// << "Started new chunk; " << remaining_chunk_bytes_
// << " bytes remaining";
//#endif

if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
//#ifdef VERBOSE
// LOG(DEBUG) << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
// LOG(DEBUG) << name_ << " => "
// << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
//#endif
}
}
} catch (const WatcherException& e) {
LOG(ERROR) << name_ << " => "
<< "Callback got exception: " << e.what();
exception_message_ = e.what();
}
} else {
if (error == boost::asio::error::eof) {
#ifdef VERBOSE
LOG(DEBUG) << "Watch callback: EOF";
LOG(DEBUG) << name_ << " => "
<< "Watch callback: EOF";
#endif
} else {
LOG(ERROR) << "Callback got error " << error;
LOG(ERROR) << name_ << " => "
<< "Callback got error " << error;
}
if (verbose_) {
LOG(INFO) << "Unlocking completion mutex";
LOG(ERROR) << name_ << " => "
<< "Unlocking completion mutex";
}
completion_.unlock();
}
}

// The watch callback may throw an exception. Because the watcher runs in a
// separate thread, the exception will be preserved when the watcher exits,
// and can be accessed via this function. An empty string is returned when
// there was no exception.
const std::string& exception() const { return exception_message_; }

private:
boost::iterator_range<const char*>
StartNewChunk(const boost::iterator_range<const char*>& range) {
StartNewChunk(const boost::iterator_range<const char*>& range)
throw(WatcherException) {
if (remaining_chunk_bytes_ != 0) {
LOG(ERROR) << "Starting new chunk with " << remaining_chunk_bytes_
LOG(ERROR) << name_ << " => "
<< "Starting new chunk with " << remaining_chunk_bytes_
<< " bytes remaining";
}

Expand All @@ -645,31 +687,33 @@ struct Watcher {
// Blank lines are fine, just skip them.
iter = std::next(iter, crlf.size());
#ifdef VERBOSE
LOG(DEBUG) << "Skipping blank line within chunked encoding;"
LOG(DEBUG) << name_ << " => "
<< "Skipping blank line within chunked encoding;"
<< " remaining data '" << std::string(iter, end) << "'";
#endif
return boost::iterator_range<const char*>(iter, end);
} else if (iter == end) {
LOG(ERROR) << "Invalid chunked encoding: '"
<< std::string(begin, end)
<< "'";
return boost::iterator_range<const char*>(begin, end);
throw WatcherException("Invalid chunked encoding: '" +
std::string(begin, end) +
"'; exiting");
}
std::string line(begin, iter);
iter = std::next(iter, crlf.size());
//#ifdef VERBOSE
// LOG(DEBUG) << "Line: '" << line << "'";
// LOG(DEBUG) << name_ << " => "
// << "Line: '" << line << "'";
//#endif
std::stringstream stream(line);
std::istringstream stream(line);
stream >> std::hex >> remaining_chunk_bytes_;
return boost::iterator_range<const char*>(iter, end);
}

boost::iterator_range<const char*>
ReadNextChunk(const boost::iterator_range<const char*>& range) {
ReadNextChunk(const boost::iterator_range<const char*>& range)
throw(WatcherException) {
if (remaining_chunk_bytes_ == 0) {
LOG(ERROR) << "Asked to read next chunk with no bytes remaining";
return range;
throw WatcherException(
"Asked to read next chunk with no bytes remaining");
}

const std::string crlf("\r\n");
Expand All @@ -685,41 +729,48 @@ struct Watcher {
return boost::iterator_range<const char*>(begin, end);
}

void CompleteChunk() {
void CompleteChunk() throw(WatcherException) {
if (body_.empty()) {
#ifdef VERBOSE
LOG(DEBUG) << "Skipping empty watch notification";
LOG(DEBUG) << name_ << " => "
<< "Skipping empty watch notification";
#endif
} else {
try {
//#ifdef VERBOSE
// LOG(DEBUG) << "Invoking callbacks on '" << body_ << "'";
// LOG(DEBUG) << name_ << " => "
// << "Invoking callbacks on '" << body_ << "'";
//#endif
std::vector<json::value> events = json::Parser::AllFromString(body_);
for (json::value& event : events) {
std::string event_str = event->ToString();
#ifdef VERBOSE
LOG(DEBUG) << "Invoking callback('" << event_str << "')";
LOG(DEBUG) << name_ << " => "
<< "Invoking callback('" << event_str << "')";
#endif
event_callback_(std::move(event));
#ifdef VERBOSE
LOG(DEBUG) << "callback('" << event_str << "') completed";
LOG(DEBUG) << name_ << " => "
<< "callback('" << event_str << "') completed";
#endif
}
//#ifdef VERBOSE
// LOG(DEBUG) << "All callbacks on '" << body_ << "' completed";
// LOG(DEBUG) << name_ << " => "
// << "All callbacks on '" << body_ << "' completed";
//#endif
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
throw WatcherException("JSON error: " + e.what());
}
}
}

std::string name_;
std::unique_lock<std::mutex> completion_;
std::function<void(json::value)> event_callback_;
std::string body_;
size_t remaining_chunk_bytes_;
bool verbose_;
std::string exception_message_;
};

void EventCallback(
Expand Down Expand Up @@ -762,13 +813,19 @@ void KubernetesReader::WatchMaster(
// A notification for watch completion.
std::mutex completion_mutex;
std::unique_lock<std::mutex> watch_completion(completion_mutex);
Watcher watcher(std::bind(&EventCallback, callback, std::placeholders::_1),
Watcher watcher(endpoint,
std::bind(&EventCallback, callback, std::placeholders::_1),
std::move(watch_completion), config_.VerboseLogging());
http::client::response response = client.get(request, boost::ref(watcher));
if (config_.VerboseLogging()) {
LOG(INFO) << "Waiting for completion";
}
std::lock_guard<std::mutex> await_completion(completion_mutex);
if (!watcher.exception().empty()) {
LOG(ERROR) << "Exception from the watcher thread: "
<< watcher.exception();
throw QueryException(watcher.exception());
}
if (config_.VerboseLogging()) {
LOG(INFO) << "WatchMaster completed " << body(response);
}
Expand Down Expand Up @@ -993,8 +1050,9 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback)
std::placeholders::_2, std::placeholders::_3));
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
LOG(ERROR) << "No more pod metadata will be collected";
} catch (const KubernetesReader::QueryException& e) {
// Already logged.
LOG(ERROR) << "No more pod metadata will be collected";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Distinguish the log of these two errors for debugging purpose? Same below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They would each be preceded by another log message that details the error. That should be enough to distinguish them.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Then we should be fine.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not logging the e.what() here as we do above, is that because it's logged elsewhere? Can we juggle things around for consistency?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's because it's logged elsewhere. The code is fairly consistent about logging any time a QueryException is thrown, so we don't need to log when catching it.
I've just added one missing log statement above.

}
LOG(INFO) << "Watch thread (pods) exiting";
}
Expand Down Expand Up @@ -1028,8 +1086,9 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback)
std::placeholders::_2, std::placeholders::_3));
} catch (const json::Exception& e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to handle the WatcherException here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No -- it will never be thrown in this thread, and exceptions don't propagate across threads.

LOG(ERROR) << e.what();
LOG(ERROR) << "No more node metadata will be collected";
} catch (const KubernetesReader::QueryException& e) {
// Already logged.
LOG(ERROR) << "No more node metadata will be collected";
}
LOG(INFO) << "Watch thread (node) exiting";
}
Expand Down