Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ constexpr const char kKubernetesDefaultClusterLocation[] = "";
constexpr const char kKubernetesDefaultNodeName[] = "";
constexpr const bool kKubernetesDefaultUseWatch = false;
constexpr const bool kKubernetesDefaultClusterLevelMetadata = false;
constexpr const bool kKubernetesDefaultServiceMetadata = true;
constexpr const char kDefaultInstanceId[] = "";
constexpr const char kDefaultInstanceZone[] = "";
constexpr const char kDefaultHealthCheckFile[] =
Expand Down Expand Up @@ -113,6 +114,7 @@ Configuration::Configuration()
kubernetes_use_watch_(kKubernetesDefaultUseWatch),
kubernetes_cluster_level_metadata_(
kKubernetesDefaultClusterLevelMetadata),
kubernetes_service_metadata_(kKubernetesDefaultServiceMetadata),
instance_id_(kDefaultInstanceId),
instance_zone_(kDefaultInstanceZone),
health_check_file_(kDefaultHealthCheckFile) {}
Expand Down Expand Up @@ -270,7 +272,10 @@ void Configuration::ParseConfiguration(std::istream& input) {
config["KubernetesUseWatch"].as<bool>(kubernetes_use_watch_);
kubernetes_cluster_level_metadata_ =
config["KubernetesClusterLevelMetadata"].as<bool>(
kubernetes_cluster_level_metadata_);
kKubernetesDefaultClusterLevelMetadata);
kubernetes_service_metadata_ =
config["KubernetesServiceMetadata"].as<bool>(
kKubernetesDefaultServiceMetadata);
instance_id_ =
config["InstanceId"].as<std::string>(instance_id_);
instance_zone_ =
Expand Down
5 changes: 5 additions & 0 deletions src/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ class Configuration {
std::lock_guard<std::mutex> lock(mutex_);
return kubernetes_cluster_level_metadata_;
}
bool KubernetesServiceMetadata() const {
std::lock_guard<std::mutex> lock(mutex_);
return kubernetes_service_metadata_;
}
// Common metadata updater options.
const std::string& InstanceId() const {
std::lock_guard<std::mutex> lock(mutex_);
Expand Down Expand Up @@ -194,6 +198,7 @@ class Configuration {
std::string kubernetes_node_name_;
bool kubernetes_use_watch_;
bool kubernetes_cluster_level_metadata_;
bool kubernetes_service_metadata_;
std::string instance_id_;
std::string instance_zone_;
std::string health_check_file_;
Expand Down
222 changes: 222 additions & 0 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,75 @@ KubernetesReader::GetPodAndContainerMetadata(
return std::move(result);
}

std::vector<json::value> KubernetesReader::GetServiceList(
const std::string& cluster_name, const std::string& location) const
throw(json::Exception) {
std::lock_guard<std::mutex> lock(service_mutex_);
std::vector<json::value> service_list;
for (const auto& metadata_it : service_to_metadata_) {
// A service key consists of a namespace name and a service name.
const ServiceKey& service_key = metadata_it.first;
const std::string namespace_name = service_key.first;
const json::value& service_metadata = metadata_it.second;
auto pods_it = service_to_pods_.find(service_key);
const std::vector<std::string>& pod_names =
(pods_it != service_to_pods_.end()) ? pods_it->second : kNoPods;
std::vector<json::value> pod_resources;
for (const std::string& pod_name : pod_names) {
const MonitoredResource k8s_pod("k8s_pod", {
{"cluster_name", cluster_name},
{"namespace_name", namespace_name},
{"pod_name", pod_name},
{"location", location},
});
pod_resources.emplace_back(k8s_pod.ToJSON());
}
service_list.emplace_back(json::object({
{"api", json::object({
{"version", json::string(kKubernetesApiVersion)},
{"raw", service_metadata->Clone()},
{"pods", json::array(std::move(pod_resources))},
})},
}));
}
return service_list;
}

MetadataUpdater::ResourceMetadata KubernetesReader::GetClusterMetadata(
Timestamp collected_at) const throw(json::Exception) {
const std::string cluster_name = environment_.KubernetesClusterName();
const std::string location = environment_.KubernetesClusterLocation();
std::vector<json::value> service_list =
GetServiceList(cluster_name, location);
const MonitoredResource k8s_cluster("k8s_cluster", {
{"cluster_name", cluster_name},
{"location", location},
});
json::value cluster_raw_metadata = json::object({
{"blobs", json::object({
{"services", json::array(std::move(service_list))},
})},
});
if (config_.VerboseLogging()) {
LOG(INFO) << "Raw cluster metadata: " << *cluster_raw_metadata;
}

// There is no created_at for the cluster since the metadata contains
// ALL current services.
Timestamp created_at = time_point();
return MetadataUpdater::ResourceMetadata(
std::vector<std::string>{},
k8s_cluster,
#ifdef ENABLE_KUBERNETES_METADATA
MetadataStore::Metadata(config_.MetadataIngestionRawContentVersion(),
/*is_deleted=*/false, created_at, collected_at,
std::move(cluster_raw_metadata))
#else
MetadataStore::Metadata::IGNORED()
#endif
);
}

std::vector<MetadataUpdater::ResourceMetadata>
KubernetesReader::MetadataQuery() const {
if (config_.VerboseLogging()) {
Expand Down Expand Up @@ -911,6 +980,80 @@ json::value KubernetesReader::FindTopLevelController(
return FindTopLevelController(ns, GetOwner(ns, controller_ref));
}

void KubernetesReader::UpdateServiceToMetadataCache(
const json::Object* service, bool is_deleted) throw(json::Exception) {
#ifdef VERBOSE
LOG(DEBUG) << "UpdateServiceToMetadataCache(" << *service << ")";
#endif
const json::Object* metadata = service->Get<json::Object>("metadata");
const std::string namespace_name = metadata->Get<json::String>("namespace");
const std::string service_name = metadata->Get<json::String>("name");
const ServiceKey service_key(namespace_name, service_name);

std::lock_guard<std::mutex> lock(service_mutex_);
if (is_deleted) {
service_to_metadata_.erase(service_key);
} else {
auto it_inserted =
service_to_metadata_.emplace(service_key, json::value());
it_inserted.first->second = service->Clone();
}
}

void KubernetesReader::UpdateServiceToPodsCache(
Copy link
Contributor

@bmoyles0117 bmoyles0117 Apr 1, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little concerned calling this UpdateServiceToPodsCache, a service itself doesn't have pods, the endpoint does. I may have missed a lot of the conversation, but I'm of the opinion that we should address endpoints as endpoints internally, even if we end up mapping it to a service at the end.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It really is the service that has the pods - Endpoints is just an API that provides the mapping. According to the docs:
Kubernetes offers a simple Endpoints API that is updated whenever the set of Pods in a Service changes.

const json::Object* endpoints, bool is_deleted) throw(json::Exception) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Endpoints represents a single endpoint, could we rename this to be singular throughout?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.10/#endpoints-v1-core

The resource is called Endpoints, not Endpoint, since it lists all endpoints used by a single service.

Added comments to this method in the header file.

#ifdef VERBOSE
LOG(DEBUG) << "UpdateServiceToPodsCache(" << *endpoints << ")";
#endif
const json::Object* metadata = endpoints->Get<json::Object>("metadata");
const std::string namespace_name = metadata->Get<json::String>("namespace");
// Endpoints name is the same as the matching service name.
const std::string service_name = metadata->Get<json::String>("name");
const ServiceKey service_key(namespace_name, service_name);

std::vector<std::string> pod_names;
// Only extract the pod names when this is not a deletion. In the case of
// a deletion, we delete the mapping below.
if (!is_deleted && endpoints->Has("subsets") &&
!endpoints->at("subsets")->Is<json::Null>()) {
const json::Array* subsets = endpoints->Get<json::Array>("subsets");
for (const json::value& subset : *subsets) {
const json::Object* subset_obj = subset->As<json::Object>();
if (!subset_obj->Has("addresses")) {
continue;
}
const json::Array* addresses = subset_obj->Get<json::Array>("addresses");
for (const json::value& address : *addresses) {
const json::Object* address_obj = address->As<json::Object>();
if (!address_obj->Has("targetRef")) {
continue;
}
const json::Object* ref = address_obj->Get<json::Object>("targetRef");
if (!(ref->Has("kind") && ref->Has("name"))) {
continue;
}
const std::string target_kind = ref->Get<json::String>("kind");
if (target_kind != "Pod") {
LOG(INFO) << "Found a resource other than a pod in Endpoint "
<< service_name << "'s targetRef: " << target_kind;
continue;
}
const std::string pod_name = ref->Get<json::String>("name");
pod_names.push_back(pod_name);
}
}
}

std::lock_guard<std::mutex> lock(service_mutex_);
if (is_deleted) {
service_to_pods_.erase(service_key);
} else {
auto it_inserted =
service_to_pods_.emplace(service_key, std::vector<std::string>());
it_inserted.first->second = pod_names;
}
}

bool KubernetesReader::ValidateConfiguration() const {
try {
(void) QueryMaster(std::string(kKubernetesEndpointPath) + "/nodes?limit=1");
Expand Down Expand Up @@ -1013,6 +1156,73 @@ void KubernetesReader::WatchNodes(
LOG(INFO) << "Watch thread (node) exiting";
}

void KubernetesReader::ServiceCallback(
MetadataUpdater::UpdateCallback callback,
const json::Object* service, Timestamp collected_at, bool is_deleted)
throw(json::Exception) {
UpdateServiceToMetadataCache(service, is_deleted);

// TODO: using a temporary did not work here.
std::vector<MetadataUpdater::ResourceMetadata> result_vector;
result_vector.emplace_back(GetClusterMetadata(collected_at));
callback(std::move(result_vector));
}

void KubernetesReader::WatchServices(MetadataUpdater::UpdateCallback callback) {
LOG(INFO) << "Watch thread started for services";

try {
WatchMaster(
"Service",
std::string(kKubernetesEndpointPath) + "/watch/services/",
[=](const json::Object* service, Timestamp collected_at,
bool is_deleted) {
ServiceCallback(callback, service, collected_at, is_deleted);
});
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
LOG(ERROR) << "No more service metadata will be collected";
} catch (const KubernetesReader::QueryException& e) {
LOG(ERROR) << "No more service metadata will be collected";
}
health_checker_->SetUnhealthy("kubernetes_service_thread");
LOG(INFO) << "Watch thread (service) exiting";
}

void KubernetesReader::EndpointsCallback(
MetadataUpdater::UpdateCallback callback,
const json::Object* endpoints, Timestamp collected_at, bool is_deleted)
throw(json::Exception) {
UpdateServiceToPodsCache(endpoints, is_deleted);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean for an endpoint to be deleted? Can one be deleted without the corresponding service being deleted, or are they always correlated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the two are correlated - endpoints is deleted when the service is deleted.


// TODO: using a temporary did not work here.
std::vector<MetadataUpdater::ResourceMetadata> result_vector;
result_vector.emplace_back(GetClusterMetadata(collected_at));
callback(std::move(result_vector));
}

void KubernetesReader::WatchEndpoints(
MetadataUpdater::UpdateCallback callback) {
LOG(INFO) << "Watch thread started for endpoints";

try {
WatchMaster(
"Endpoints",
std::string(kKubernetesEndpointPath) + "/watch/endpoints/",
[=](const json::Object* endpoints, Timestamp collected_at,
bool is_deleted) {
EndpointsCallback(callback, endpoints, collected_at, is_deleted);
});
} catch (const json::Exception& e) {
LOG(ERROR) << e.what();
LOG(ERROR) << "No more endpoints metadata will be collected";
} catch (const KubernetesReader::QueryException& e) {
LOG(ERROR) << "No more endpoints metadata will be collected";
}
health_checker_->SetUnhealthy("kubernetes_endpoints_thread");
LOG(INFO) << "Watch thread (endpoints) exiting";
}

KubernetesUpdater::KubernetesUpdater(const Configuration& config,
HealthChecker* health_checker,
MetadataStore* store)
Expand Down Expand Up @@ -1055,6 +1265,15 @@ void KubernetesUpdater::StartUpdater() {
pod_watch_thread_ = std::thread([=]() {
reader_.WatchPods(watched_node, cb);
});
if (config().KubernetesClusterLevelMetadata() &&
config().KubernetesServiceMetadata()) {
service_watch_thread_ = std::thread([=]() {
reader_.WatchServices(cb);
});
endpoints_watch_thread_ = std::thread([=]() {
reader_.WatchEndpoints(cb);
});
}
} else {
// Only try to poll if watch is disabled.
PollingMetadataUpdater::StartUpdater();
Expand All @@ -1064,6 +1283,9 @@ void KubernetesUpdater::StartUpdater() {
void KubernetesUpdater::MetadataCallback(
std::vector<MetadataUpdater::ResourceMetadata>&& result_vector) {
for (MetadataUpdater::ResourceMetadata& result : result_vector) {
#ifdef VERBOSE
LOG(DEBUG) << "MetadataCallback (" << result << ")";
#endif
UpdateResourceCallback(result);
UpdateMetadataCallback(std::move(result));
}
Expand Down
Loading