Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 60 additions & 50 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,10 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const
namespace {
struct Watcher {
Watcher(const std::string& endpoint,
std::function<void(json::value)> event_callback,
std::function<void(const std::string&)> body_callback,
std::unique_lock<std::mutex>&& completion, bool verbose)
: name_("Watcher(" + endpoint + ")"),
completion_(std::move(completion)), event_callback_(event_callback),
completion_(std::move(completion)), body_callback_(body_callback),
remaining_chunk_bytes_(0), verbose_(verbose), exception_message_() {}
~Watcher() {} // Unlocks the completion_ lock.

Expand Down Expand Up @@ -616,7 +616,7 @@ struct Watcher {

if (remaining_chunk_bytes_ == 0) {
// Invoke the callback.
CompleteChunk();
body_callback_(body_);

// Process the next batch.
while (remaining_chunk_bytes_ == 0 &&
Expand Down Expand Up @@ -729,51 +729,53 @@ struct Watcher {
return boost::iterator_range<const char*>(begin, end);
}

void CompleteChunk() throw(WatcherException) {
if (body_.empty()) {
std::string name_;
std::unique_lock<std::mutex> completion_;
std::function<void(const std::string&)> body_callback_;
std::string body_;
size_t remaining_chunk_bytes_;
bool verbose_;
std::string exception_message_;
};

void BodyCallback(const std::string& name,
std::function<void(json::value)> event_callback,
const std::string& body) {
if (body.empty()) {
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => "
<< "Skipping empty watch notification";
LOG(DEBUG) << name << " => "
<< "Skipping empty watch notification";
#endif
} else {
try {
} else {
try {
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "Invoking callbacks on '" << body_ << "'";
// LOG(DEBUG) << name << " => "
// << "Invoking callbacks on '" << body << "'";
//#endif
std::vector<json::value> events = json::Parser::AllFromString(body_);
for (json::value& event : events) {
std::string event_str = event->ToString();
std::vector<json::value> events = json::Parser::AllFromString(body);
for (json::value& event : events) {
std::string event_str = event->ToString();
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => "
<< "Invoking callback('" << event_str << "')";
LOG(DEBUG) << name << " => "
<< "Invoking callback('" << event_str << "')";
#endif
event_callback_(std::move(event));
event_callback(std::move(event));
#ifdef VERBOSE
LOG(DEBUG) << name_ << " => "
<< "callback('" << event_str << "') completed";
LOG(DEBUG) << name << " => "
<< "callback('" << event_str << "') completed";
#endif
}
}
//#ifdef VERBOSE
// LOG(DEBUG) << name_ << " => "
// << "All callbacks on '" << body_ << "' completed";
// LOG(DEBUG) << name << " => "
// << "All callbacks on '" << body << "' completed";
//#endif
} catch (const json::Exception& e) {
throw WatcherException("JSON error: " + e.what());
}
} catch (const json::Exception& e) {
LOG(ERROR) << "Unable to process events: " << e.what();
}
}
}

std::string name_;
std::unique_lock<std::mutex> completion_;
std::function<void(json::value)> event_callback_;
std::string body_;
size_t remaining_chunk_bytes_;
bool verbose_;
std::string exception_message_;
};

void EventCallback(
void WatchEventCallback(
std::function<void(const json::Object*, Timestamp, bool)> callback,
json::value raw_watch) throw(json::Exception) {
Timestamp collected_at = std::chrono::system_clock::now();
Expand All @@ -792,6 +794,7 @@ void EventCallback(
}

void KubernetesReader::WatchMaster(
const std::string& name,
const std::string& path,
std::function<void(const json::Object*, Timestamp, bool)> callback) const
throw(QueryException, json::Exception) {
Expand All @@ -804,7 +807,7 @@ void KubernetesReader::WatchMaster(
request << boost::network::header(
"Authorization", "Bearer " + KubernetesApiToken());
if (config_.VerboseLogging()) {
LOG(INFO) << "WatchMaster: Contacting " << endpoint;
LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint;
}
try {
if (config_.VerboseLogging()) {
Expand All @@ -813,9 +816,13 @@ void KubernetesReader::WatchMaster(
// A notification for watch completion.
std::mutex completion_mutex;
std::unique_lock<std::mutex> watch_completion(completion_mutex);
Watcher watcher(endpoint,
std::bind(&EventCallback, callback, std::placeholders::_1),
std::move(watch_completion), config_.VerboseLogging());
// Pull this out, as nested std::bind expressions are nasty.
std::function<void(json::value)> event_callback =
std::bind(&WatchEventCallback, callback, std::placeholders::_1);
Watcher watcher(
endpoint,
std::bind(&BodyCallback, name, event_callback, std::placeholders::_1),
std::move(watch_completion), config_.VerboseLogging());
http::client::response response = client.get(request, boost::ref(watcher));
if (config_.VerboseLogging()) {
LOG(INFO) << "Waiting for completion";
Expand All @@ -827,7 +834,7 @@ void KubernetesReader::WatchMaster(
throw QueryException(watcher.exception());
}
if (config_.VerboseLogging()) {
LOG(INFO) << "WatchMaster completed " << body(response);
LOG(INFO) << "WatchMaster(" << name << ") completed " << body(response);
}
std::string encoding;
#ifdef VERBOSE
Expand Down Expand Up @@ -1043,11 +1050,13 @@ void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback)
? "" : "&" + config_.KubernetesPodLabelSelector());

try {
WatchMaster(std::string(kKubernetesEndpointPath) + "/pods"
+ node_selector + pod_label_selector,
std::bind(&KubernetesReader::PodCallback,
this, callback, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
WatchMaster(
"Pods",
std::string(kKubernetesEndpointPath) + "/pods" + node_selector
+ pod_label_selector,
std::bind(&KubernetesReader::PodCallback,
this, callback, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
LOG(ERROR) << "No more pod metadata will be collected";
Expand Down Expand Up @@ -1079,11 +1088,12 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback)

try {
// TODO: There seems to be a Kubernetes API bug with watch=true.
WatchMaster(std::string(kKubernetesEndpointPath) + "/watch/nodes/"
+ node_name,
std::bind(&KubernetesReader::NodeCallback,
this, callback, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
WatchMaster(
"Node",
std::string(kKubernetesEndpointPath) + "/watch/nodes/" + node_name,
std::bind(&KubernetesReader::NodeCallback,
this, callback, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
LOG(ERROR) << "No more node metadata will be collected";
Expand Down
2 changes: 2 additions & 0 deletions src/kubernetes.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ class KubernetesReader {
// Issues a Kubernetes master API query at a given path and
// watches for parsed JSON responses. The path has to start with "/".
// Invokes callback for every notification.
// The name is for logging purposes.
void WatchMaster(
const std::string& name,
const std::string& path,
std::function<void(const json::Object*, Timestamp, bool)> callback) const
throw(QueryException, json::Exception);
Expand Down