Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ constexpr const char kKubernetesDefaultClusterName[] = "";
constexpr const char kKubernetesDefaultClusterLocation[] = "";
constexpr const char kKubernetesDefaultNodeName[] = "";
constexpr const bool kKubernetesDefaultUseWatch = true;
constexpr const bool kKubernetesDefaultClusterLevelMetadata = false;
constexpr const char kDefaultInstanceId[] = "";
constexpr const char kDefaultInstanceZone[] = "";

Expand Down Expand Up @@ -95,6 +96,8 @@ MetadataAgentConfiguration::MetadataAgentConfiguration()
kubernetes_cluster_location_(kKubernetesDefaultClusterLocation),
kubernetes_node_name_(kKubernetesDefaultNodeName),
kubernetes_use_watch_(kKubernetesDefaultUseWatch),
kubernetes_cluster_level_metadata_(
kKubernetesDefaultClusterLevelMetadata),
instance_id_(kDefaultInstanceId),
instance_zone_(kDefaultInstanceZone) {}

Expand Down Expand Up @@ -204,6 +207,9 @@ void MetadataAgentConfiguration::ParseConfiguration(std::istream& input) {
config["KubernetesNodeName"].as<std::string>(kKubernetesDefaultNodeName);
kubernetes_use_watch_ =
config["KubernetesUseWatch"].as<bool>(kKubernetesDefaultUseWatch);
kubernetes_cluster_level_metadata_ =
config["KubernetesClusterLevelMetadata"].as<bool>(
kKubernetesDefaultClusterLevelMetadata);
instance_id_ =
config["InstanceId"].as<std::string>(kDefaultInstanceId);
instance_zone_ =
Expand Down
5 changes: 5 additions & 0 deletions src/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class MetadataAgentConfiguration {
std::lock_guard<std::mutex> lock(mutex_);
return kubernetes_use_watch_;
}
bool KubernetesClusterLevelMetadata() const {
std::lock_guard<std::mutex> lock(mutex_);
return kubernetes_cluster_level_metadata_;
}
// Common metadata updater options.
const std::string& InstanceId() const {
std::lock_guard<std::mutex> lock(mutex_);
Expand Down Expand Up @@ -169,6 +173,7 @@ class MetadataAgentConfiguration {
std::string kubernetes_cluster_location_;
std::string kubernetes_node_name_;
bool kubernetes_use_watch_;
bool kubernetes_cluster_level_metadata_;
std::string instance_id_;
std::string instance_zone_;
};
Expand Down
138 changes: 87 additions & 51 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,11 @@ KubernetesReader::KubernetesReader(const MetadataAgentConfiguration& config)
: config_(config), environment_(config) {}

MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata(
json::value raw_node, Timestamp collected_at, bool is_deleted) const
const json::Object* node, Timestamp collected_at, bool is_deleted) const
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice change!

throw(json::Exception) {
const std::string cluster_name = environment_.KubernetesClusterName();
const std::string location = environment_.KubernetesClusterLocation();

const json::Object* node = raw_node->As<json::Object>();
const json::Object* metadata = node->Get<json::Object>("metadata");
const std::string node_name = metadata->Get<json::String>("name");
const std::string created_str =
Expand All @@ -122,7 +121,7 @@ MetadataUpdater::ResourceMetadata KubernetesReader::GetNodeMetadata(
})},
{"api", json::object({
{"version", json::string(kKubernetesApiVersion)},
{"raw", std::move(raw_node)},
{"raw", node->Clone()},
})},
})},
});
Expand Down Expand Up @@ -169,34 +168,38 @@ json::value KubernetesReader::ComputePodAssociations(const json::Object* pod)
? top_level_controller->Get<json::String>("kind")
: "Pod";

// TODO: What about pods that are not scheduled yet?
const json::Object* spec = pod->Get<json::Object>("spec");
const std::string node_name = spec->Get<json::String>("nodeName");

json::value instance_resource =
InstanceReader::InstanceResource(environment_).ToJSON();

std::unique_ptr<json::Object> raw_associations(new json::Object({
{"infrastructureResource", std::move(instance_resource)},
{"controllers", json::object({
{"topLevelControllerType", json::string(top_level_kind)},
{"topLevelControllerName", json::string(top_level_name)},
})},
}));

const json::Object* spec = pod->Get<json::Object>("spec");
if (spec->Has("nodeName")) {
// Pods that have been scheduled will have a nodeName.
raw_associations->emplace(std::make_pair(
"nodeName",
json::string(spec->Get<json::String>("nodeName"))
));
}

return json::object({
{"version", json::string(config_.MetadataIngestionRawContentVersion())},
{"raw", json::object({
{"infrastructureResource", std::move(instance_resource)},
{"controllers", json::object({
{"topLevelControllerType", json::string(top_level_kind)},
{"topLevelControllerName", json::string(top_level_name)},
})},
{"nodeName", json::string(node_name)},
})},
{"raw", json::value(std::move(raw_associations))},
});
}

MetadataUpdater::ResourceMetadata KubernetesReader::GetPodMetadata(
json::value raw_pod, json::value associations, Timestamp collected_at,
const json::Object* pod, json::value associations, Timestamp collected_at,
bool is_deleted) const throw(json::Exception) {
const std::string cluster_name = environment_.KubernetesClusterName();
const std::string location = environment_.KubernetesClusterLocation();

const json::Object* pod = raw_pod->As<json::Object>();

const json::Object* metadata = pod->Get<json::Object>("metadata");
const std::string namespace_name = metadata->Get<json::String>("namespace");
const std::string pod_name = metadata->Get<json::String>("name");
Expand Down Expand Up @@ -442,8 +445,7 @@ KubernetesReader::GetPodAndContainerMetadata(
}

result.emplace_back(
GetPodMetadata(pod->Clone(), std::move(associations), collected_at,
is_deleted));
GetPodMetadata(pod, std::move(associations), collected_at, is_deleted));
return std::move(result);
}

Expand All @@ -454,19 +456,34 @@ std::vector<MetadataUpdater::ResourceMetadata>
}
std::vector<MetadataUpdater::ResourceMetadata> result;

const std::string node_name = CurrentNode();
const std::string current_node = CurrentNode();

if (config_.VerboseLogging()) {
LOG(INFO) << "Current node is " << node_name;
LOG(INFO) << "Current node is " << current_node;
}

const std::string node_name(
config_.KubernetesClusterLevelMetadata() ? "" : current_node);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned that the responses are very different here. When a node name is provided, a single node is returned. Is there any logic that distinguishes between a list / single node?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I forgot that in one case you get a Node and in another a NodeList. I've updated the code to be more robust.


try {
json::value raw_node = QueryMaster(
json::value raw_nodes = QueryMaster(
std::string(kKubernetesEndpointPath) + "/nodes/" + node_name);
Timestamp collected_at = std::chrono::system_clock::now();

result.emplace_back(GetNodeMetadata(std::move(raw_node), collected_at,
/*is_deleted=*/false));
if (!node_name.empty()) {
// It's a single node object -- fake a NodeList.
raw_nodes.reset(json::object({
{"items", json::array({std::move(raw_nodes)})},
}).release());
}

const json::Object* nodelist = raw_nodes->As<json::Object>();
const json::Array* nodes_array = nodelist->Get<json::Array>("items");
for (const json::value& raw_node : *nodes_array) {
const json::Object* node = raw_node->As<json::Object>();
result.emplace_back(
GetNodeMetadata(node, collected_at, /*is_deleted=*/false));
}
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
} catch (const QueryException& e) {
Expand Down Expand Up @@ -502,10 +519,13 @@ std::vector<MetadataUpdater::ResourceMetadata>
const std::string pod_id = metadata->Get<json::String>("uid");

const json::Object* spec = pod->Get<json::Object>("spec");
const std::string pod_node_name = spec->Get<json::String>("nodeName");
if (pod_node_name != node_name) {
LOG(ERROR) << "Internal error; pod's node " << pod_node_name
<< " not the same as agent node " << node_name;

if (!node_name.empty()) {
const std::string pod_node_name = spec->Get<json::String>("nodeName");
if (pod_node_name != node_name) {
LOG(ERROR) << "Internal error; pod's node " << pod_node_name
<< " not the same as agent node " << node_name;
}
}

const json::Object* status = pod->Get<json::Object>("status");
Expand Down Expand Up @@ -1078,15 +1098,11 @@ void KubernetesReader::PodCallback(
callback(std::move(result_vector));
}

void KubernetesReader::WatchPods(MetadataUpdater::UpdateCallback callback)
const {
LOG(INFO) << "Watch thread (pods) started";

const std::string node_name = CurrentNode();

if (config_.VerboseLogging()) {
LOG(INFO) << "Current node is " << node_name;
}
void KubernetesReader::WatchPods(
const std::string& node_name,
MetadataUpdater::UpdateCallback callback) const {
LOG(INFO) << "Watch thread (pods) started for node "
<< (node_name.empty() ? "<unscheduled>" : node_name);

const std::string node_selector(kNodeSelectorPrefix + node_name);
const std::string pod_label_selector(
Expand Down Expand Up @@ -1115,20 +1131,15 @@ void KubernetesReader::NodeCallback(
const json::Object* node, Timestamp collected_at, bool is_deleted) const
throw(json::Exception) {
std::vector<MetadataUpdater::ResourceMetadata> result_vector;
result_vector.emplace_back(
GetNodeMetadata(node->Clone(), collected_at, is_deleted));
result_vector.emplace_back(GetNodeMetadata(node, collected_at, is_deleted));
callback(std::move(result_vector));
}

void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback)
const {
LOG(INFO) << "Watch thread (node) started";

const std::string node_name = CurrentNode();

if (config_.VerboseLogging()) {
LOG(INFO) << "Current node is " << node_name;
}
void KubernetesReader::WatchNodes(
const std::string& node_name,
MetadataUpdater::UpdateCallback callback) const {
LOG(INFO) << "Watch thread (node) started for node "
<< (node_name.empty() ? "<all>" : node_name);

try {
// TODO: There seems to be a Kubernetes API bug with watch=true.
Expand All @@ -1147,6 +1158,12 @@ void KubernetesReader::WatchNode(MetadataUpdater::UpdateCallback callback)
LOG(INFO) << "Watch thread (node) exiting";
}

KubernetesUpdater::KubernetesUpdater(MetadataAgent* server)
: reader_(server->config()), PollingMetadataUpdater(
server, "KubernetesUpdater",
server->config().KubernetesUpdaterIntervalSeconds(),
[=]() { return reader_.MetadataQuery(); }) { }

bool KubernetesUpdater::ValidateConfiguration() const {
if (!PollingMetadataUpdater::ValidateConfiguration()) {
return false;
Expand All @@ -1157,11 +1174,30 @@ bool KubernetesUpdater::ValidateConfiguration() const {

void KubernetesUpdater::StartUpdater() {
if (config().KubernetesUseWatch()) {
const std::string current_node = reader_.CurrentNode();

if (config().VerboseLogging()) {
LOG(INFO) << "Current node is " << current_node;
}

if (config().KubernetesClusterLevelMetadata()) {
LOG(INFO) << "Watching for cluster-level metadata";
} else {
LOG(INFO) << "Watching for node-level metadata";
}

const std::string watched_node(
config().KubernetesClusterLevelMetadata() ? "" : current_node);

auto cb = [=](std::vector<MetadataUpdater::ResourceMetadata>&& results) {
MetadataCallback(std::move(results));
};
node_watch_thread_ = std::thread([=]() { reader_.WatchNode(cb); });
pod_watch_thread_ = std::thread([=]() { reader_.WatchPods(cb); });
node_watch_thread_ = std::thread([=]() {
reader_.WatchNodes(watched_node, cb);
});
pod_watch_thread_ = std::thread([=]() {
reader_.WatchPods(watched_node, cb);
});
} else {
// Only try to poll if watch is disabled.
PollingMetadataUpdater::StartUpdater();
Expand Down
25 changes: 11 additions & 14 deletions src/kubernetes.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,16 @@ class KubernetesReader {
bool ValidateConfiguration() const;

// Node watcher.
void WatchNode(MetadataUpdater::UpdateCallback callback) const;
void WatchNodes(const std::string& node_name,
MetadataUpdater::UpdateCallback callback) const;

// Pod watcher.
void WatchPods(MetadataUpdater::UpdateCallback callback) const;
void WatchPods(const std::string& node_name,
MetadataUpdater::UpdateCallback callback) const;

// Gets the name of the node the agent is running on.
// Returns an empty string if unable to find the current node.
const std::string& CurrentNode() const;

private:
// A representation of all query-related errors.
Expand Down Expand Up @@ -88,11 +94,11 @@ class KubernetesReader {
throw(json::Exception);
// Given a node object, return the associated metadata.
MetadataUpdater::ResourceMetadata GetNodeMetadata(
json::value raw_node, Timestamp collected_at, bool is_deleted) const
const json::Object* node, Timestamp collected_at, bool is_deleted) const
throw(json::Exception);
// Given a pod object, return the associated metadata.
MetadataUpdater::ResourceMetadata GetPodMetadata(
json::value raw_pod, json::value associations, Timestamp collected_at,
const json::Object* pod, json::value associations, Timestamp collected_at,
bool is_deleted) const throw(json::Exception);
// Given a pod object and container info, return the container metadata.
MetadataUpdater::ResourceMetadata GetContainerMetadata(
Expand All @@ -109,10 +115,6 @@ class KubernetesReader {
const json::Object* pod, Timestamp collected_at, bool is_deleted) const
throw(json::Exception);

// Gets the name of the node the agent is running on.
// Returns an empty string if unable to find the current node.
const std::string& CurrentNode() const;

// Gets the Kubernetes master API token.
// Returns an empty string if unable to find the token.
const std::string& KubernetesApiToken() const;
Expand Down Expand Up @@ -151,12 +153,7 @@ class KubernetesReader {

class KubernetesUpdater : public PollingMetadataUpdater {
public:
KubernetesUpdater(MetadataAgent* server)
: reader_(server->config()), PollingMetadataUpdater(
server, "KubernetesUpdater",
server->config().KubernetesUpdaterIntervalSeconds(),
[=]() { return reader_.MetadataQuery(); }) { }

KubernetesUpdater(MetadataAgent* server);
~KubernetesUpdater() {
if (node_watch_thread_.joinable()) {
node_watch_thread_.join();
Expand Down