Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ MetadataAgent::~MetadataAgent() {}

void MetadataAgent::Start() {
metadata_api_server_.reset(new MetadataApiServer(
config_, store_, config_.MetadataApiNumThreads(), "0.0.0.0",
config_, &health_checker_, store_, config_.MetadataApiNumThreads(), "0.0.0.0",
config_.MetadataApiPort()));
reporter_.reset(new MetadataReporter(
config_, &store_, config_.MetadataReporterIntervalSeconds()));
Expand Down
40 changes: 39 additions & 1 deletion src/api_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <boost/range/irange.hpp>

#include "configuration.h"
#include "health_checker.h"
#include "http_common.h"
#include "logging.h"
#include "store.h"
Expand Down Expand Up @@ -67,15 +68,22 @@ void MetadataApiServer::Dispatcher::log(const HttpServer::string_type& info) con


MetadataApiServer::MetadataApiServer(const Configuration& config,
const HealthChecker* health_checker,
const MetadataStore& store,
int server_threads,
const std::string& host, int port)
: config_(config), store_(store), dispatcher_({
: config_(config), health_checker_(health_checker), store_(store),
dispatcher_({
{{"GET", "/monitoredResource/"},
[=](const HttpServer::request& request,
std::shared_ptr<HttpServer::connection> conn) {
HandleMonitoredResource(request, conn);
}},
{{"GET", "/healthz"},
[=](const HttpServer::request& request,
std::shared_ptr<HttpServer::connection> conn) {
HandleHealthz(request, conn);
}},
}, config_.VerboseLogging()),
server_(
HttpServer::options(dispatcher_)
Expand Down Expand Up @@ -139,4 +147,34 @@ void MetadataApiServer::HandleMonitoredResource(
}
}

void MetadataApiServer::HandleHealthz(
const HttpServer::request& request,
std::shared_ptr<HttpServer::connection> conn) {
std::set<std::string> unhealthy_components;
if (health_checker_ != nullptr) {
unhealthy_components = health_checker_->UnhealthyComponents();
}
if (unhealthy_components.empty()) {
if (config_.VerboseLogging()) {
LOG(INFO) << "/healthz returning 200";
}
conn->set_status(HttpServer::connection::ok);
conn->set_headers(std::map<std::string, std::string>({
{"Content-Type", "text/plain"},
}));
conn->write("healthy");
} else {
LOG(WARNING) << "/healthz returning 500; unhealthy components: "
<< boost::algorithm::join(unhealthy_components, ", ");
conn->set_status(HttpServer::connection::internal_server_error);
conn->set_headers(std::map<std::string, std::string>({
{"Content-Type", "text/plain"},
}));
conn->write("unhealthy components:\n");
for (const auto& component : unhealthy_components) {
conn->write(component + "\n");
}
}
}

}
11 changes: 9 additions & 2 deletions src/api_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ namespace google {
// Configuration object.
class Configuration;

class HealthChecker;

// Storage for the metadata mapping.
class MetadataStore;

// A server that implements the metadata agent API.
class MetadataApiServer {
public:
MetadataApiServer(const Configuration& config, const MetadataStore& store,
int server_threads, const std::string& host, int port);
MetadataApiServer(const Configuration& config,
const HealthChecker* health_checker,
const MetadataStore& store, int server_threads,
const std::string& host, int port);
~MetadataApiServer();

// Stops the server and closes the listening socket.
Expand Down Expand Up @@ -71,8 +75,11 @@ class MetadataApiServer {
// Handler functions.
void HandleMonitoredResource(const HttpServer::request& request,
std::shared_ptr<HttpServer::connection> conn);
void HandleHealthz(const HttpServer::request& request,
std::shared_ptr<HttpServer::connection> conn);

const Configuration& config_;
const HealthChecker* health_checker_;
const MetadataStore& store_;
Dispatcher dispatcher_;
HttpServer server_;
Expand Down
8 changes: 7 additions & 1 deletion src/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ constexpr const char kDefaultInstanceId[] = "";
constexpr const char kDefaultInstanceZone[] = "";
constexpr const char kDefaultHealthCheckFile[] =
"/var/run/metadata-agent/health/unhealthy";
constexpr const int kDefaultHealthCheckMaxDataAgeSeconds = 5*60;

}

Expand Down Expand Up @@ -120,7 +121,9 @@ Configuration::Configuration()
kubernetes_service_metadata_(kKubernetesDefaultServiceMetadata),
instance_id_(kDefaultInstanceId),
instance_zone_(kDefaultInstanceZone),
health_check_file_(kDefaultHealthCheckFile) {}
health_check_file_(kDefaultHealthCheckFile),
health_check_max_data_age_seconds_(
kDefaultHealthCheckMaxDataAgeSeconds) {}

Configuration::Configuration(std::istream& input) : Configuration() {
ParseConfiguration(input);
Expand Down Expand Up @@ -289,6 +292,9 @@ void Configuration::ParseConfiguration(std::istream& input) {
config["InstanceZone"].as<std::string>(instance_zone_);
health_check_file_ =
config["HealthCheckFile"].as<std::string>(health_check_file_);
health_check_max_data_age_seconds_ =
config["HealthCheckMaxDataAgeSeconds"].as<int>(
health_check_max_data_age_seconds_);
}

} // google
Expand Down
5 changes: 5 additions & 0 deletions src/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ class Configuration {
std::lock_guard<std::mutex> lock(mutex_);
return health_check_file_;
}
int HealthCheckMaxDataAgeSeconds() const {
std::lock_guard<std::mutex> lock(mutex_);
return health_check_max_data_age_seconds_;
}

private:
friend class ConfigurationArgumentParserTest;
Expand Down Expand Up @@ -208,6 +212,7 @@ class Configuration {
std::string instance_id_;
std::string instance_zone_;
std::string health_check_file_;
int health_check_max_data_age_seconds_;
};

}
Expand Down
22 changes: 22 additions & 0 deletions src/health_checker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,26 @@ void HealthChecker::CleanupForTest() {
config_.HealthCheckFile()).parent_path());
}

std::set<std::string> HealthChecker::UnhealthyComponents() const {
std::lock_guard<std::mutex> lock(mutex_);
std::set<std::string> result(unhealthy_components_);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we were dropping these?..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I forgot what we said -- still use the unhealthy_components_ for IsHealthy()? (The current unit test wants that.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops, they are still needed.

for (auto& c : component_callbacks_) {
if (c.second != nullptr && !c.second()) {
result.insert(c.first);
}
}
return result;
}

void HealthChecker::RegisterComponent(const std::string& component,
std::function<bool()> callback) {
std::lock_guard<std::mutex> lock(mutex_);
component_callbacks_[component] = callback;
}

void HealthChecker::UnregisterComponent(const std::string& component) {
std::lock_guard<std::mutex> lock(mutex_);
component_callbacks_.erase(component);
}

} // namespace google
29 changes: 28 additions & 1 deletion src/health_checker.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
#ifndef HEALTH_CHECKER_H_
#define HEALTH_CHECKER_H_

#include <string>
#include <map>
#include <mutex>
#include <set>
#include <string>

#include "configuration.h"

Expand All @@ -29,6 +30,10 @@ class HealthChecker {
public:
HealthChecker(const Configuration& config);
void SetUnhealthy(const std::string& component);
std::set<std::string> UnhealthyComponents() const;
void RegisterComponent(const std::string& component,
std::function<bool()> callback);
void UnregisterComponent(const std::string& component);

private:
friend class HealthCheckerUnittest;
Expand All @@ -38,9 +43,31 @@ class HealthChecker {

const Configuration& config_;
std::set<std::string> unhealthy_components_;
std::map<std::string, std::function<bool()>> component_callbacks_;
mutable std::mutex mutex_;
};

// Registers a component and then unregisters when it goes out of
// scope.
class CheckHealth {
public:
CheckHealth(HealthChecker* health_checker, const std::string& component,
std::function<bool()> callback)
: health_checker_(health_checker), component_(component) {
if (health_checker_ != nullptr) {
health_checker_->RegisterComponent(component_, callback);
}
}
~CheckHealth() {
if (health_checker_ != nullptr) {
health_checker_->UnregisterComponent(component_);
}
}
private:
HealthChecker* health_checker_;
const std::string component_;
};

} // namespace google

#endif // HEALTH_CHECKER_H_
22 changes: 21 additions & 1 deletion src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,21 @@ void KubernetesReader::WatchMaster(
if (verbose) {
LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint;
}
// Initialize the expiration time. This is the time by when the
// watcher has to receive some data to be considered healthy. Each
// receipt of a new message bumps the expiration forward.
//
// TODO: Cache the expiration (or timestamp of last receipt) in the
// store instead of here.
const int max_data_age = config_.HealthCheckMaxDataAgeSeconds();
std::mutex expiration_mutex;
auto expiration =
std::chrono::steady_clock::now() + time::seconds(max_data_age);
CheckHealth check_health(
health_checker_, name, [&expiration_mutex, &expiration]{
std::lock_guard<std::mutex> expiration_lock(expiration_mutex);
return std::chrono::steady_clock::now() < expiration;
});
try {
if (verbose) {
LOG(INFO) << "Locking completion mutex";
Expand All @@ -805,7 +820,12 @@ void KubernetesReader::WatchMaster(
std::unique_lock<std::mutex> watch_completion(completion_mutex);
Watcher watcher(
endpoint,
[=](json::value raw_watch) {
[=, &expiration_mutex, &expiration](json::value raw_watch) {
{
std::lock_guard<std::mutex> expiration_lock(expiration_mutex);
expiration =
std::chrono::steady_clock::now() + time::seconds(max_data_age);
}
WatchEventCallback(callback, name, std::move(raw_watch));
},
std::move(watch_completion), verbose);
Expand Down
2 changes: 1 addition & 1 deletion test/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ $(TESTS): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)
# Some headers need CPP_NETLIB_LIBS and YAML_CPP_LIBS.
$(TESTS:%=%.o): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS)

api_server_unittest: api_server_unittest.o $(SRC_DIR)/api_server.o $(SRC_DIR)/configuration.o $(SRC_DIR)/store.o $(SRC_DIR)/json.o $(SRC_DIR)/resource.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o
api_server_unittest: api_server_unittest.o $(SRC_DIR)/api_server.o $(SRC_DIR)/configuration.o $(SRC_DIR)/store.o $(SRC_DIR)/json.o $(SRC_DIR)/resource.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o $(SRC_DIR)/health_checker.o
$(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@
base64_unittest: base64_unittest.o $(SRC_DIR)/base64.o
$(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@
Expand Down