diff --git a/src/agent.cc b/src/agent.cc index 77f9717b..14fdae8e 100644 --- a/src/agent.cc +++ b/src/agent.cc @@ -30,7 +30,7 @@ MetadataAgent::~MetadataAgent() {} void MetadataAgent::Start() { metadata_api_server_.reset(new MetadataApiServer( - config_, store_, config_.MetadataApiNumThreads(), "0.0.0.0", + config_, &health_checker_, store_, config_.MetadataApiNumThreads(), "0.0.0.0", config_.MetadataApiPort())); reporter_.reset(new MetadataReporter( config_, &store_, config_.MetadataReporterIntervalSeconds())); diff --git a/src/api_server.cc b/src/api_server.cc index 84036935..275eddaf 100644 --- a/src/api_server.cc +++ b/src/api_server.cc @@ -19,6 +19,7 @@ #include #include "configuration.h" +#include "health_checker.h" #include "http_common.h" #include "logging.h" #include "store.h" @@ -67,15 +68,22 @@ void MetadataApiServer::Dispatcher::log(const HttpServer::string_type& info) con MetadataApiServer::MetadataApiServer(const Configuration& config, + const HealthChecker* health_checker, const MetadataStore& store, int server_threads, const std::string& host, int port) - : config_(config), store_(store), dispatcher_({ + : config_(config), health_checker_(health_checker), store_(store), + dispatcher_({ {{"GET", "/monitoredResource/"}, [=](const HttpServer::request& request, std::shared_ptr conn) { HandleMonitoredResource(request, conn); }}, + {{"GET", "/healthz"}, + [=](const HttpServer::request& request, + std::shared_ptr conn) { + HandleHealthz(request, conn); + }}, }, config_.VerboseLogging()), server_( HttpServer::options(dispatcher_) @@ -139,4 +147,34 @@ void MetadataApiServer::HandleMonitoredResource( } } +void MetadataApiServer::HandleHealthz( + const HttpServer::request& request, + std::shared_ptr conn) { + std::set unhealthy_components; + if (health_checker_ != nullptr) { + unhealthy_components = health_checker_->UnhealthyComponents(); + } + if (unhealthy_components.empty()) { + if (config_.VerboseLogging()) { + LOG(INFO) << "/healthz returning 200"; + } + conn->set_status(HttpServer::connection::ok); + conn->set_headers(std::map({ + {"Content-Type", "text/plain"}, + })); + conn->write("healthy"); + } else { + LOG(WARNING) << "/healthz returning 500; unhealthy components: " + << boost::algorithm::join(unhealthy_components, ", "); + conn->set_status(HttpServer::connection::internal_server_error); + conn->set_headers(std::map({ + {"Content-Type", "text/plain"}, + })); + conn->write("unhealthy components:\n"); + for (const auto& component : unhealthy_components) { + conn->write(component + "\n"); + } + } +} + } diff --git a/src/api_server.h b/src/api_server.h index b700a27b..687e8e8b 100644 --- a/src/api_server.h +++ b/src/api_server.h @@ -33,14 +33,18 @@ namespace google { // Configuration object. class Configuration; +class HealthChecker; + // Storage for the metadata mapping. class MetadataStore; // A server that implements the metadata agent API. class MetadataApiServer { public: - MetadataApiServer(const Configuration& config, const MetadataStore& store, - int server_threads, const std::string& host, int port); + MetadataApiServer(const Configuration& config, + const HealthChecker* health_checker, + const MetadataStore& store, int server_threads, + const std::string& host, int port); ~MetadataApiServer(); // Stops the server and closes the listening socket. @@ -71,8 +75,11 @@ class MetadataApiServer { // Handler functions. void HandleMonitoredResource(const HttpServer::request& request, std::shared_ptr conn); + void HandleHealthz(const HttpServer::request& request, + std::shared_ptr conn); const Configuration& config_; + const HealthChecker* health_checker_; const MetadataStore& store_; Dispatcher dispatcher_; HttpServer server_; diff --git a/src/configuration.cc b/src/configuration.cc index 41c56731..e6f37a54 100644 --- a/src/configuration.cc +++ b/src/configuration.cc @@ -75,6 +75,7 @@ constexpr const char kDefaultInstanceId[] = ""; constexpr const char kDefaultInstanceZone[] = ""; constexpr const char kDefaultHealthCheckFile[] = "/var/run/metadata-agent/health/unhealthy"; +constexpr const int kDefaultHealthCheckMaxDataAgeSeconds = 5*60; } @@ -120,7 +121,9 @@ Configuration::Configuration() kubernetes_service_metadata_(kKubernetesDefaultServiceMetadata), instance_id_(kDefaultInstanceId), instance_zone_(kDefaultInstanceZone), - health_check_file_(kDefaultHealthCheckFile) {} + health_check_file_(kDefaultHealthCheckFile), + health_check_max_data_age_seconds_( + kDefaultHealthCheckMaxDataAgeSeconds) {} Configuration::Configuration(std::istream& input) : Configuration() { ParseConfiguration(input); @@ -289,6 +292,9 @@ void Configuration::ParseConfiguration(std::istream& input) { config["InstanceZone"].as(instance_zone_); health_check_file_ = config["HealthCheckFile"].as(health_check_file_); + health_check_max_data_age_seconds_ = + config["HealthCheckMaxDataAgeSeconds"].as( + health_check_max_data_age_seconds_); } } // google diff --git a/src/configuration.h b/src/configuration.h index 2f90a3b6..099d767b 100644 --- a/src/configuration.h +++ b/src/configuration.h @@ -162,6 +162,10 @@ class Configuration { std::lock_guard lock(mutex_); return health_check_file_; } + int HealthCheckMaxDataAgeSeconds() const { + std::lock_guard lock(mutex_); + return health_check_max_data_age_seconds_; + } private: friend class ConfigurationArgumentParserTest; @@ -208,6 +212,7 @@ class Configuration { std::string instance_id_; std::string instance_zone_; std::string health_check_file_; + int health_check_max_data_age_seconds_; }; } diff --git a/src/health_checker.cc b/src/health_checker.cc index 2b783964..9f5adb90 100644 --- a/src/health_checker.cc +++ b/src/health_checker.cc @@ -47,4 +47,26 @@ void HealthChecker::CleanupForTest() { config_.HealthCheckFile()).parent_path()); } +std::set HealthChecker::UnhealthyComponents() const { + std::lock_guard lock(mutex_); + std::set result(unhealthy_components_); + for (auto& c : component_callbacks_) { + if (c.second != nullptr && !c.second()) { + result.insert(c.first); + } + } + return result; +} + +void HealthChecker::RegisterComponent(const std::string& component, + std::function callback) { + std::lock_guard lock(mutex_); + component_callbacks_[component] = callback; +} + +void HealthChecker::UnregisterComponent(const std::string& component) { + std::lock_guard lock(mutex_); + component_callbacks_.erase(component); +} + } // namespace google diff --git a/src/health_checker.h b/src/health_checker.h index 0afb107b..ade8401d 100644 --- a/src/health_checker.h +++ b/src/health_checker.h @@ -16,9 +16,10 @@ #ifndef HEALTH_CHECKER_H_ #define HEALTH_CHECKER_H_ -#include +#include #include #include +#include #include "configuration.h" @@ -29,6 +30,10 @@ class HealthChecker { public: HealthChecker(const Configuration& config); void SetUnhealthy(const std::string& component); + std::set UnhealthyComponents() const; + void RegisterComponent(const std::string& component, + std::function callback); + void UnregisterComponent(const std::string& component); private: friend class HealthCheckerUnittest; @@ -38,9 +43,31 @@ class HealthChecker { const Configuration& config_; std::set unhealthy_components_; + std::map> component_callbacks_; mutable std::mutex mutex_; }; +// Registers a component and then unregisters when it goes out of +// scope. +class CheckHealth { + public: + CheckHealth(HealthChecker* health_checker, const std::string& component, + std::function callback) + : health_checker_(health_checker), component_(component) { + if (health_checker_ != nullptr) { + health_checker_->RegisterComponent(component_, callback); + } + } + ~CheckHealth() { + if (health_checker_ != nullptr) { + health_checker_->UnregisterComponent(component_); + } + } + private: + HealthChecker* health_checker_; + const std::string component_; +}; + } // namespace google #endif // HEALTH_CHECKER_H_ diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 5433dd30..f20f40ca 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -796,6 +796,21 @@ void KubernetesReader::WatchMaster( if (verbose) { LOG(INFO) << "WatchMaster(" << name << "): Contacting " << endpoint; } + // Initialize the expiration time. This is the time by when the + // watcher has to receive some data to be considered healthy. Each + // receipt of a new message bumps the expiration forward. + // + // TODO: Cache the expiration (or timestamp of last receipt) in the + // store instead of here. + const int max_data_age = config_.HealthCheckMaxDataAgeSeconds(); + std::mutex expiration_mutex; + auto expiration = + std::chrono::steady_clock::now() + time::seconds(max_data_age); + CheckHealth check_health( + health_checker_, name, [&expiration_mutex, &expiration]{ + std::lock_guard expiration_lock(expiration_mutex); + return std::chrono::steady_clock::now() < expiration; + }); try { if (verbose) { LOG(INFO) << "Locking completion mutex"; @@ -805,7 +820,12 @@ void KubernetesReader::WatchMaster( std::unique_lock watch_completion(completion_mutex); Watcher watcher( endpoint, - [=](json::value raw_watch) { + [=, &expiration_mutex, &expiration](json::value raw_watch) { + { + std::lock_guard expiration_lock(expiration_mutex); + expiration = + std::chrono::steady_clock::now() + time::seconds(max_data_age); + } WatchEventCallback(callback, name, std::move(raw_watch)); }, std::move(watch_completion), verbose); diff --git a/test/Makefile b/test/Makefile index 3ad7a3bc..fb8d0709 100644 --- a/test/Makefile +++ b/test/Makefile @@ -115,7 +115,7 @@ $(TESTS): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS) # Some headers need CPP_NETLIB_LIBS and YAML_CPP_LIBS. $(TESTS:%=%.o): $(GTEST_LIB) $(CPP_NETLIB_LIBS) $(YAML_CPP_LIBS) -api_server_unittest: api_server_unittest.o $(SRC_DIR)/api_server.o $(SRC_DIR)/configuration.o $(SRC_DIR)/store.o $(SRC_DIR)/json.o $(SRC_DIR)/resource.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o +api_server_unittest: api_server_unittest.o $(SRC_DIR)/api_server.o $(SRC_DIR)/configuration.o $(SRC_DIR)/store.o $(SRC_DIR)/json.o $(SRC_DIR)/resource.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o $(SRC_DIR)/health_checker.o $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@ base64_unittest: base64_unittest.o $(SRC_DIR)/base64.o $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@