Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 34 additions & 209 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -605,217 +605,62 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const
namespace {
struct Watcher {
Watcher(const std::string& endpoint,
std::function<void(const std::string&)> body_callback,
std::function<void(json::value)> event_callback,
std::unique_lock<std::mutex>&& completion, bool verbose)
: name_("Watcher(" + endpoint + ")"),
completion_(std::move(completion)), body_callback_(body_callback),
remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {}
completion_(std::move(completion)), event_parser_(event_callback),
verbose_(verbose) {}
~Watcher() {} // Unlocks the completion_ lock.

private:
// A representation of all watch-related errors.
class WatcherException {
public:
WatcherException(const std::string& what) : explanation_(what) {}
const std::string& what() const { return explanation_; }
private:
std::string explanation_;
};

public:
void operator()(const boost::iterator_range<const char*>& range,
const boost::system::error_code& error) {
if (!error) {
if (!exception_message_.empty()) {
// We've encountered an unrecoverable error -- just ignore the rest of
// the input.
return;
}

const std::string body(std::begin(range), std::end(range));
if (!body.empty()) {
try {
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Watch notification: '"
// << std::string(std::begin(range), std::end(range))
// << "'";
//#endif
boost::iterator_range<const char*> pos = range;
if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
//#endif
}

if (remaining_chunk_bytes_ == 0) {
// Invoke the callback.
body_callback_(body_);

// Process the next batch.
while (remaining_chunk_bytes_ == 0 &&
std::begin(pos) != std::end(pos)) {
pos = StartNewChunk(pos);
}
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Started new chunk; " << remaining_chunk_bytes_
// << " bytes remaining";
//#endif

if (remaining_chunk_bytes_ != 0) {
pos = ReadNextChunk(pos);
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Read another chunk; body now is '" << body_ << "'; "
// << remaining_chunk_bytes_ << " bytes remaining";
//#endif
}
}
} catch (const WatcherException& e) {
LOG(ERROR) << name_ << " => "
<< "Callback got exception: " << e.what();
exception_message_ = e.what();
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => Parsing '" << body << "'";
#endif
std::istringstream input(body);
event_parser_.ParseStream(input);
} catch (const json::Exception& e) {
LOG(ERROR) << "Unable to process events: " << e.what();
}
} else {
} else if (!error) {
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => Skipping empty watch notification";
#endif
}
if (error) {
if (error == boost::asio::error::eof) {
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => "
<< "Watch callback: EOF";
LOG(DEBUG) << name_ << " => Watch callback: EOF";
#endif
} else {
LOG(ERROR) << name_ << " => "
<< "Callback got error " << error;
LOG(ERROR) << name_ << " => Callback got error " << error;
}
if (verbose_) {
LOG(ERROR) << name_ << " => "
<< "Unlocking completion mutex";
LOG(ERROR) << name_ << " => Unlocking completion mutex";
}
completion_.unlock();
}
}

// The watch callback may throw an exception. Because the watcher runs in a
// separate thread, the exception will be preserved when the watcher exits,
// and can be accessed via this function. An empty string is returned when
// there was no exception.
const std::string& exception() const { return exception_message_; }

private:
boost::iterator_range<const char*>
StartNewChunk(const boost::iterator_range<const char*>& range)
throw(WatcherException) {
if (remaining_chunk_bytes_ != 0) {
LOG(ERROR) << name_ << " => "
<< "Starting new chunk with " << remaining_chunk_bytes_
<< " bytes remaining";
}

body_.clear();

const std::string crlf("\r\n");
auto begin = std::begin(range);
auto end = std::end(range);
auto iter = std::search(begin, end, crlf.begin(), crlf.end());
if (iter == begin) {
// Blank lines are fine, just skip them.
iter = std::next(iter, crlf.size());
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => "
<< "Skipping blank line within chunked encoding;"
<< " remaining data '" << std::string(iter, end) << "'";
#endif
return boost::iterator_range<const char*>(iter, end);
} else if (iter == end) {
throw WatcherException("Invalid chunked encoding: '" +
std::string(begin, end) +
"'; exiting");
}
std::string line(begin, iter);
iter = std::next(iter, crlf.size());
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Line: '" << line << "'";
//#endif
std::istringstream stream(line);
stream >> std::hex >> remaining_chunk_bytes_;
return boost::iterator_range<const char*>(iter, end);
}

boost::iterator_range<const char*>
ReadNextChunk(const boost::iterator_range<const char*>& range)
throw(WatcherException) {
if (remaining_chunk_bytes_ == 0) {
throw WatcherException(
"Asked to read next chunk with no bytes remaining");
}

const std::string crlf("\r\n");
auto begin = std::begin(range);
auto end = std::end(range);
// The available bytes in the current notification, which may include the
// remainder of this chunk and the start of the next one.
const size_t available = std::distance(begin, end);
const size_t len = std::min(available, remaining_chunk_bytes_);
body_.insert(body_.end(), begin, begin + len);
remaining_chunk_bytes_ -= len;
begin = std::next(begin, len);
return boost::iterator_range<const char*>(begin, end);
}

std::string name_;
std::unique_lock<std::mutex> completion_;
std::function<void(const std::string&)> body_callback_;
std::string body_;
size_t remaining_chunk_bytes_;
json::Parser event_parser_;
bool verbose_;
std::string exception_message_;
};

void BodyCallback(const std::string& name,
std::function<void(json::value)> event_callback,
const std::string& body) {
if (body.empty()) {
#ifdef VERBOSE
LOG(DEBUG) << name << " => "
<< "Skipping empty watch notification";
#endif
} else {
try {
//#ifdef VERBOSE
// LOG(DEBUG) << name << " => "
// << "Invoking callbacks on '" << body << "'";
//#endif
std::vector<json::value> events = json::Parser::AllFromString(body);
for (json::value& event : events) {
std::string event_str = event->ToString();
#ifdef VERBOSE
LOG(DEBUG) << name << " => "
<< "Invoking callback('" << event_str << "')";
#endif
event_callback(std::move(event));
#ifdef VERBOSE
LOG(DEBUG) << name << " => "
<< "callback('" << event_str << "') completed";
#endif
}
//#ifdef VERBOSE
// LOG(DEBUG) << name << " => "
// << "All callbacks on '" << body << "' completed";
//#endif
} catch (const json::Exception& e) {
LOG(ERROR) << "Unable to process events: " << e.what();
}
}
}

void WatchEventCallback(
std::function<void(const json::Object*, Timestamp, bool)> callback,
json::value raw_watch) throw(json::Exception) {
const std::string& name, json::value raw_watch) throw(json::Exception) {
Timestamp collected_at = std::chrono::system_clock::now();

//LOG(ERROR) << "Watch callback: " << *raw_watch;
#ifdef VERBOSE
LOG(DEBUG) << name << " => WatchEventCallback('" << *raw_watch << "')";
#endif
const json::Object* watch = raw_watch->As<json::Object>();
const std::string type = watch->Get<json::String>("type");
const json::Object* object = watch->Get<json::Object>("object");
Expand All @@ -839,55 +684,35 @@ void KubernetesReader::WatchMaster(
config_.KubernetesEndpointHost() + path + watch_param);
http::client client(
http::client::options()
.remove_chunk_markers(false)
.openssl_certificate(SecretPath("ca.crt")));
http::client::request request(endpoint);
request << boost::network::header(
"Authorization", "Bearer " + KubernetesApiToken());
if (config_.VerboseLogging()) {
const bool verbose = config_.VerboseLogging();
if (verbose) {
LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint;
}
try {
if (config_.VerboseLogging()) {
if (verbose) {
LOG(INFO) << "Locking completion mutex";
}
// A notification for watch completion.
std::mutex completion_mutex;
std::unique_lock<std::mutex> watch_completion(completion_mutex);
Watcher watcher(
endpoint,
[=](const std::string& body) {
BodyCallback(name,
[=](json::value raw_watch) {
WatchEventCallback(callback, std::move(raw_watch));
},
body);
[=](json::value raw_watch) {
WatchEventCallback(callback, name, std::move(raw_watch));
},
std::move(watch_completion), config_.VerboseLogging());
std::move(watch_completion), verbose);
http::client::response response = client.get(request, std::ref(watcher));
if (config_.VerboseLogging()) {
if (verbose) {
LOG(INFO) << "Waiting for completion";
}
std::lock_guard<std::mutex> await_completion(completion_mutex);
if (!watcher.exception().empty()) {
LOG(ERROR) << "Exception from the watcher thread: "
<< watcher.exception();
throw QueryException(watcher.exception());
}
if (config_.VerboseLogging()) {
if (verbose) {
LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response);
}
std::string encoding;
#ifdef VERBOSE
LOG(DEBUG) << "response headers: " << response.headers();
#endif
auto transfer_encoding_header = headers(response)["Transfer-Encoding"];
if (!boost::empty(transfer_encoding_header)) {
encoding = boost::begin(transfer_encoding_header)->second;
}
if (encoding != "chunked") {
LOG(ERROR) << "Expected chunked encoding; found '" << encoding << "'";
}
} catch (const boost::system::system_error& e) {
LOG(ERROR) << "Failed to query " << endpoint << ": " << e.what();
throw QueryException(endpoint + " -> " + e.what());
Expand Down