diff --git a/src/agent.cc b/src/agent.cc index 57507b1a..77f9717b 100644 --- a/src/agent.cc +++ b/src/agent.cc @@ -28,7 +28,7 @@ MetadataAgent::MetadataAgent(const Configuration& config) MetadataAgent::~MetadataAgent() {} -void MetadataAgent::start() { +void MetadataAgent::Start() { metadata_api_server_.reset(new MetadataApiServer( config_, store_, config_.MetadataApiNumThreads(), "0.0.0.0", config_.MetadataApiPort())); @@ -36,4 +36,9 @@ void MetadataAgent::start() { config_, &store_, config_.MetadataReporterIntervalSeconds())); } +void MetadataAgent::Stop() { + metadata_api_server_->Stop(); + // TODO: Notify the metadata reporter as well. +} + } diff --git a/src/agent.h b/src/agent.h index 05364671..8f4669e5 100644 --- a/src/agent.h +++ b/src/agent.h @@ -39,7 +39,10 @@ class MetadataAgent { ~MetadataAgent(); // Starts serving. - void start(); + void Start(); + + // Stops serving. + void Stop(); const Configuration& config() const { return config_; diff --git a/src/api_server.cc b/src/api_server.cc index 6768b9a4..84036935 100644 --- a/src/api_server.cc +++ b/src/api_server.cc @@ -84,11 +84,17 @@ MetadataApiServer::MetadataApiServer(const Configuration& config, server_pool_() { for (int i : boost::irange(0, server_threads)) { - server_pool_.emplace_back(&HttpServer::run, &server_); + server_pool_.emplace_back([=]() { server_.run(); }); } } +void MetadataApiServer::Stop() { + server_.stop(); + LOG(INFO) << "API server stopped"; +} + MetadataApiServer::~MetadataApiServer() { + Stop(); for (auto& thread : server_pool_) { thread.join(); } diff --git a/src/api_server.h b/src/api_server.h index 850eb8be..b700a27b 100644 --- a/src/api_server.h +++ b/src/api_server.h @@ -43,6 +43,9 @@ class MetadataApiServer { int server_threads, const std::string& host, int port); ~MetadataApiServer(); + // Stops the server and closes the listening socket. + void Stop(); + private: friend class ApiServerTest; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 8594b4aa..a04d4e51 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -1321,6 +1321,15 @@ void KubernetesUpdater::StartUpdater() { } } +void KubernetesUpdater::NotifyStopUpdater() { + if (config().KubernetesUseWatch()) { + // TODO: How do we interrupt a watch thread? + } else { + // Only stop polling if watch is disabled. + PollingMetadataUpdater::NotifyStopUpdater(); + } +} + void KubernetesUpdater::MetadataCallback( std::vector&& result_vector) { for (MetadataUpdater::ResourceMetadata& result : result_vector) { diff --git a/src/kubernetes.h b/src/kubernetes.h index 757d49ff..2e3d50be 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -231,6 +231,7 @@ class KubernetesUpdater : public PollingMetadataUpdater { bool ShouldStartUpdater() const; void StartUpdater(); + void NotifyStopUpdater(); private: // Metadata watcher callback. diff --git a/src/metadatad.cc b/src/metadatad.cc index bd2f0bb8..7363d2f5 100644 --- a/src/metadatad.cc +++ b/src/metadatad.cc @@ -14,11 +14,59 @@ * limitations under the License. **/ +#include +#include +#include +#include + #include "agent.h" #include "configuration.h" #include "docker.h" #include "instance.h" #include "kubernetes.h" +#include "time.h" + +namespace google { +namespace { + +class CleanupState { + public: + CleanupState( + std::initializer_list updaters, MetadataAgent* server) + : updaters_(updaters), server_(server) { server_wait_mutex_.lock(); } + + void StartShutdown() const { + std::cerr << "Stopping server" << std::endl; + server_->Stop(); + std::cerr << "Stopping updaters" << std::endl; + for (MetadataUpdater* updater : updaters_) { + updater->NotifyStop(); + } + server_wait_mutex_.unlock(); + // Give the notifications some time to propagate. + std::this_thread::sleep_for(time::seconds(0.1)); + } + + void Wait() const { + std::lock_guard await_server_shutdown(server_wait_mutex_); + } + + private: + mutable std::mutex server_wait_mutex_; + std::vector updaters_; + MetadataAgent* server_; +}; +const CleanupState* cleanup_state; + +} // namespace +} // google + +extern "C" [[noreturn]] void handle_sigterm(int signum) { + std::cerr << "Caught SIGTERM; shutting down" << std::endl; + google::cleanup_state->StartShutdown(); + std::cerr << "Exiting" << std::endl; + std::exit(0); // SIGTERM means graceful shutdown, so report success. +} int main(int ac, char** av) { google::Configuration config; @@ -33,9 +81,17 @@ int main(int ac, char** av) { google::DockerUpdater docker_updater(config, server.mutable_store()); google::KubernetesUpdater kubernetes_updater(config, server.health_checker(), server.mutable_store()); - instance_updater.start(); - docker_updater.start(); - kubernetes_updater.start(); + google::cleanup_state = new google::CleanupState( + {&instance_updater, &docker_updater, &kubernetes_updater}, + &server); + std::signal(SIGTERM, handle_sigterm); + + instance_updater.Start(); + docker_updater.Start(); + kubernetes_updater.Start(); + + server.Start(); - server.start(); + // Wait for the server to shut down. + google::cleanup_state->Wait(); } diff --git a/src/updater.cc b/src/updater.cc index 4286df9c..6c3d9d41 100644 --- a/src/updater.cc +++ b/src/updater.cc @@ -30,7 +30,7 @@ MetadataUpdater::MetadataUpdater(const Configuration& config, MetadataUpdater::~MetadataUpdater() {} -void MetadataUpdater::start() throw(ConfigurationValidationError) { +void MetadataUpdater::Start() throw(ConfigurationValidationError) { ValidateStaticConfiguration(); if (ShouldStartUpdater()) { @@ -41,8 +41,8 @@ void MetadataUpdater::start() throw(ConfigurationValidationError) { } } -void MetadataUpdater::stop() { - StopUpdater(); +void MetadataUpdater::NotifyStop() { + NotifyStopUpdater(); } PollingMetadataUpdater::PollingMetadataUpdater( @@ -77,15 +77,15 @@ bool PollingMetadataUpdater::ShouldStartUpdater() const { void PollingMetadataUpdater::StartUpdater() { timer_.lock(); if (config().VerboseLogging()) { - LOG(INFO) << "Timer locked"; + LOG(INFO) << "Locked timer for " << name(); } reporter_thread_ = std::thread([=]() { PollForMetadata(); }); } -void PollingMetadataUpdater::StopUpdater() { +void PollingMetadataUpdater::NotifyStopUpdater() { timer_.unlock(); if (config().VerboseLogging()) { - LOG(INFO) << "Timer unlocked"; + LOG(INFO) << "Unlocked timer for " << name(); } } @@ -99,7 +99,7 @@ void PollingMetadataUpdater::PollForMetadata() { } // An unlocked timer means we should stop updating. if (config().VerboseLogging()) { - LOG(INFO) << "Trying to unlock the timer"; + LOG(INFO) << "Trying to unlock the timer for " << name(); } auto start = std::chrono::high_resolution_clock::now(); auto wakeup = start + period_; @@ -113,7 +113,7 @@ void PollingMetadataUpdater::PollForMetadata() { if (config().VerboseLogging()) { LOG(INFO) << " Timer unlock timed out after " << std::chrono::duration_cast(now - start).count() - << "s (good)"; + << "s (good) for " << name(); } start = now; wakeup = start + period_; @@ -121,7 +121,7 @@ void PollingMetadataUpdater::PollForMetadata() { } } while (!done); if (config().VerboseLogging()) { - LOG(INFO) << "Timer unlocked (stop polling)"; + LOG(INFO) << "Timer unlocked (stop polling) for " << name(); } } diff --git a/src/updater.h b/src/updater.h index 4e47e37d..3d21f86a 100644 --- a/src/updater.h +++ b/src/updater.h @@ -70,10 +70,10 @@ class MetadataUpdater { virtual ~MetadataUpdater(); // Starts updating. - void start() throw(ConfigurationValidationError); + void Start() throw(ConfigurationValidationError); - // Stops updating. - void stop(); + // Notifies the updater to stop updating. + void NotifyStop(); using UpdateCallback = std::function&&)>; @@ -103,8 +103,9 @@ class MetadataUpdater { // Internal method for starting the updater's logic. virtual void StartUpdater() = 0; - // Internal method for stopping the updater's logic. - virtual void StopUpdater() = 0; + // Internal method for notifying the updater's to stop its logic. + // This method should not perform any blocking operations (e.g., wait). + virtual void NotifyStopUpdater() = 0; // Updates the resource map in the store. void UpdateResourceCallback(const ResourceMetadata& result) { @@ -116,6 +117,10 @@ class MetadataUpdater { store_->UpdateMetadata(result.resource_, std::move(result.metadata_)); } + const std::string& name() const { + return name_; + } + const Configuration& config() const { return config_; } @@ -146,7 +151,7 @@ class PollingMetadataUpdater : public MetadataUpdater { using MetadataUpdater::ValidateDynamicConfiguration; bool ShouldStartUpdater() const; void StartUpdater(); - void StopUpdater(); + void NotifyStopUpdater(); private: friend class InstanceTest; diff --git a/test/updater_unittest.cc b/test/updater_unittest.cc index be37d904..2e729325 100644 --- a/test/updater_unittest.cc +++ b/test/updater_unittest.cc @@ -109,7 +109,7 @@ class MockMetadataUpdater : public MetadataUpdater { void StartUpdater() { call_sequence_.push_back("StartUpdater"); } - void StopUpdater() {} + void NotifyStopUpdater() {} mutable std::vector call_sequence_; @@ -125,7 +125,7 @@ TEST_F(ValidationOrderingTest, FailedStaticCheckStopsOtherChecks) { /*fail_static=*/true, /*should_start=*/true, /*fail_dynamic=*/true); - EXPECT_THROW(updater.start(), MetadataUpdater::ConfigurationValidationError); + EXPECT_THROW(updater.Start(), MetadataUpdater::ConfigurationValidationError); EXPECT_EQ( std::vector({ "ValidateStaticConfiguration", @@ -139,7 +139,7 @@ TEST_F(ValidationOrderingTest, FalseShouldStartUpdaterStopsDynamicChecks) { /*fail_static=*/false, /*should_start=*/false, /*fail_dynamic=*/false); - EXPECT_NO_THROW(updater.start()); + EXPECT_NO_THROW(updater.Start()); EXPECT_EQ( std::vector({ "ValidateStaticConfiguration", @@ -154,7 +154,7 @@ TEST_F(ValidationOrderingTest, FailedDynamicCheckStopsStartUpdater) { /*fail_static=*/false, /*should_start=*/true, /*fail_dynamic=*/true); - EXPECT_THROW(updater.start(), MetadataUpdater::ConfigurationValidationError); + EXPECT_THROW(updater.Start(), MetadataUpdater::ConfigurationValidationError); EXPECT_EQ( std::vector({ "ValidateStaticConfiguration", @@ -170,7 +170,7 @@ TEST_F(ValidationOrderingTest, AllChecksPassedInvokesStartUpdater) { /*fail_static=*/false, /*should_start=*/true, /*fail_dynamic=*/false); - EXPECT_NO_THROW(updater.start()); + EXPECT_NO_THROW(updater.Start()); EXPECT_EQ( std::vector({ "ValidateStaticConfiguration",