Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/agent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,17 @@ MetadataAgent::MetadataAgent(const Configuration& config)

MetadataAgent::~MetadataAgent() {}

void MetadataAgent::start() {
void MetadataAgent::Start() {
metadata_api_server_.reset(new MetadataApiServer(
config_, store_, config_.MetadataApiNumThreads(), "0.0.0.0",
config_.MetadataApiPort()));
reporter_.reset(new MetadataReporter(
config_, &store_, config_.MetadataReporterIntervalSeconds()));
}

void MetadataAgent::Stop() {
metadata_api_server_->Stop();
// TODO: Notify the metadata reporter as well.
}

}
5 changes: 4 additions & 1 deletion src/agent.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ class MetadataAgent {
~MetadataAgent();

// Starts serving.
void start();
void Start();

// Stops serving.
void Stop();

const Configuration& config() const {
return config_;
Expand Down
8 changes: 7 additions & 1 deletion src/api_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,17 @@ MetadataApiServer::MetadataApiServer(const Configuration& config,
server_pool_()
{
for (int i : boost::irange(0, server_threads)) {
server_pool_.emplace_back(&HttpServer::run, &server_);
server_pool_.emplace_back([=]() { server_.run(); });
}
}

void MetadataApiServer::Stop() {
server_.stop();
LOG(INFO) << "API server stopped";
}

MetadataApiServer::~MetadataApiServer() {
Stop();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have Stop in this destructor, should we also call stop in MetadataAgent's destructor for consistency? I'm primary concerned about the inconsistency, I'm not sure what the negative effects would be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MetadataAgent's destructor will deallocate both the API server and the reporter, which will invoke their respective destructors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused, why are we calling Stop from MetadataAgent, if we're relying on the destructor? I may not be clear, but it seems confusing that stop gets propagated through multiple channels simultaneously.

https://github.com/Stackdriver/metadata-agent/pull/136/files#diff-61b93c57ea92f91ec66fdd4a280d8e8bR40

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop() is idempotent. It's just a notification under the covers, so it's ok to call it more than once. Calling it from the destructor guarantees that the server will also shut down cleanly when the object is deleted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

for (auto& thread : server_pool_) {
thread.join();
}
Expand Down
3 changes: 3 additions & 0 deletions src/api_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ class MetadataApiServer {
int server_threads, const std::string& host, int port);
~MetadataApiServer();

// Stops the server and closes the listening socket.
void Stop();

private:
friend class ApiServerTest;

Expand Down
9 changes: 9 additions & 0 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,15 @@ void KubernetesUpdater::StartUpdater() {
}
}

void KubernetesUpdater::NotifyStopUpdater() {
if (config().KubernetesUseWatch()) {
// TODO: How do we interrupt a watch thread?
} else {
// Only stop polling if watch is disabled.
PollingMetadataUpdater::NotifyStopUpdater();
}
}

void KubernetesUpdater::MetadataCallback(
std::vector<MetadataUpdater::ResourceMetadata>&& result_vector) {
for (MetadataUpdater::ResourceMetadata& result : result_vector) {
Expand Down
1 change: 1 addition & 0 deletions src/kubernetes.h
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ class KubernetesUpdater : public PollingMetadataUpdater {
bool ShouldStartUpdater() const;

void StartUpdater();
void NotifyStopUpdater();

private:
// Metadata watcher callback.
Expand Down
64 changes: 60 additions & 4 deletions src/metadatad.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,59 @@
* limitations under the License.
**/

#include <csignal>
#include <cstdlib>
#include <initializer_list>
#include <iostream>

#include "agent.h"
#include "configuration.h"
#include "docker.h"
#include "instance.h"
#include "kubernetes.h"
#include "time.h"

namespace google {
namespace {

class CleanupState {
public:
CleanupState(
std::initializer_list<MetadataUpdater*> updaters, MetadataAgent* server)
: updaters_(updaters), server_(server) { server_wait_mutex_.lock(); }

void StartShutdown() const {
std::cerr << "Stopping server" << std::endl;
server_->Stop();
std::cerr << "Stopping updaters" << std::endl;
for (MetadataUpdater* updater : updaters_) {
updater->NotifyStop();
}
server_wait_mutex_.unlock();
// Give the notifications some time to propagate.
std::this_thread::sleep_for(time::seconds(0.1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this guaranteed to be enough time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Empirically, smaller delays were also sufficient, as this just needs enough time for the thread to notice the timer unlock notification and exit the loop. For poller threads, even if it doesn't, nothing bad is going to happen, so I hesitate to introduce a larger wait here.

Copy link
Contributor

@bmoyles0117 bmoyles0117 May 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me, that unlocking the server_wait_mutex is essentially a noop, so why wait at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlocking server_wait_mutex allows the destructors to start executing, which will start joining threads. More cleanup that can proceed in parallel.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

}

void Wait() const {
std::lock_guard<std::mutex> await_server_shutdown(server_wait_mutex_);
}

private:
mutable std::mutex server_wait_mutex_;
std::vector<MetadataUpdater*> updaters_;
MetadataAgent* server_;
};
const CleanupState* cleanup_state;

} // namespace
} // google

extern "C" [[noreturn]] void handle_sigterm(int signum) {
std::cerr << "Caught SIGTERM; shutting down" << std::endl;
google::cleanup_state->StartShutdown();
std::cerr << "Exiting" << std::endl;
std::exit(0); // SIGTERM means graceful shutdown, so report success.
}

int main(int ac, char** av) {
google::Configuration config;
Expand All @@ -33,9 +81,17 @@ int main(int ac, char** av) {
google::DockerUpdater docker_updater(config, server.mutable_store());
google::KubernetesUpdater kubernetes_updater(config, server.health_checker(), server.mutable_store());

instance_updater.start();
docker_updater.start();
kubernetes_updater.start();
google::cleanup_state = new google::CleanupState(
{&instance_updater, &docker_updater, &kubernetes_updater},
&server);
std::signal(SIGTERM, handle_sigterm);

instance_updater.Start();
docker_updater.Start();
kubernetes_updater.Start();

server.Start();

server.start();
// Wait for the server to shut down.
google::cleanup_state->Wait();
}
18 changes: 9 additions & 9 deletions src/updater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ MetadataUpdater::MetadataUpdater(const Configuration& config,

MetadataUpdater::~MetadataUpdater() {}

void MetadataUpdater::start() throw(ConfigurationValidationError) {
void MetadataUpdater::Start() throw(ConfigurationValidationError) {
ValidateStaticConfiguration();

if (ShouldStartUpdater()) {
Expand All @@ -41,8 +41,8 @@ void MetadataUpdater::start() throw(ConfigurationValidationError) {
}
}

void MetadataUpdater::stop() {
StopUpdater();
void MetadataUpdater::NotifyStop() {
NotifyStopUpdater();
}

PollingMetadataUpdater::PollingMetadataUpdater(
Expand Down Expand Up @@ -77,15 +77,15 @@ bool PollingMetadataUpdater::ShouldStartUpdater() const {
void PollingMetadataUpdater::StartUpdater() {
timer_.lock();
if (config().VerboseLogging()) {
LOG(INFO) << "Timer locked";
LOG(INFO) << "Locked timer for " << name();
}
reporter_thread_ = std::thread([=]() { PollForMetadata(); });
}

void PollingMetadataUpdater::StopUpdater() {
void PollingMetadataUpdater::NotifyStopUpdater() {
timer_.unlock();
if (config().VerboseLogging()) {
LOG(INFO) << "Timer unlocked";
LOG(INFO) << "Unlocked timer for " << name();
}
}

Expand All @@ -99,7 +99,7 @@ void PollingMetadataUpdater::PollForMetadata() {
}
// An unlocked timer means we should stop updating.
if (config().VerboseLogging()) {
LOG(INFO) << "Trying to unlock the timer";
LOG(INFO) << "Trying to unlock the timer for " << name();
}
auto start = std::chrono::high_resolution_clock::now();
auto wakeup = start + period_;
Expand All @@ -113,15 +113,15 @@ void PollingMetadataUpdater::PollForMetadata() {
if (config().VerboseLogging()) {
LOG(INFO) << " Timer unlock timed out after "
<< std::chrono::duration_cast<time::seconds>(now - start).count()
<< "s (good)";
<< "s (good) for " << name();
}
start = now;
wakeup = start + period_;
done = false;
}
} while (!done);
if (config().VerboseLogging()) {
LOG(INFO) << "Timer unlocked (stop polling)";
LOG(INFO) << "Timer unlocked (stop polling) for " << name();
}
}

Expand Down
17 changes: 11 additions & 6 deletions src/updater.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ class MetadataUpdater {
virtual ~MetadataUpdater();

// Starts updating.
void start() throw(ConfigurationValidationError);
void Start() throw(ConfigurationValidationError);

// Stops updating.
void stop();
// Notifies the updater to stop updating.
void NotifyStop();

using UpdateCallback =
std::function<void(std::vector<MetadataUpdater::ResourceMetadata>&&)>;
Expand Down Expand Up @@ -103,8 +103,9 @@ class MetadataUpdater {
// Internal method for starting the updater's logic.
virtual void StartUpdater() = 0;

// Internal method for stopping the updater's logic.
virtual void StopUpdater() = 0;
// Internal method for notifying the updater's to stop its logic.
// This method should not perform any blocking operations (e.g., wait).
virtual void NotifyStopUpdater() = 0;

// Updates the resource map in the store.
void UpdateResourceCallback(const ResourceMetadata& result) {
Expand All @@ -116,6 +117,10 @@ class MetadataUpdater {
store_->UpdateMetadata(result.resource_, std::move(result.metadata_));
}

const std::string& name() const {
return name_;
}

const Configuration& config() const {
return config_;
}
Expand Down Expand Up @@ -146,7 +151,7 @@ class PollingMetadataUpdater : public MetadataUpdater {
using MetadataUpdater::ValidateDynamicConfiguration;
bool ShouldStartUpdater() const;
void StartUpdater();
void StopUpdater();
void NotifyStopUpdater();

private:
friend class InstanceTest;
Expand Down
10 changes: 5 additions & 5 deletions test/updater_unittest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class MockMetadataUpdater : public MetadataUpdater {
void StartUpdater() {
call_sequence_.push_back("StartUpdater");
}
void StopUpdater() {}
void NotifyStopUpdater() {}

mutable std::vector<std::string> call_sequence_;

Expand All @@ -125,7 +125,7 @@ TEST_F(ValidationOrderingTest, FailedStaticCheckStopsOtherChecks) {
/*fail_static=*/true,
/*should_start=*/true,
/*fail_dynamic=*/true);
EXPECT_THROW(updater.start(), MetadataUpdater::ConfigurationValidationError);
EXPECT_THROW(updater.Start(), MetadataUpdater::ConfigurationValidationError);
EXPECT_EQ(
std::vector<std::string>({
"ValidateStaticConfiguration",
Expand All @@ -139,7 +139,7 @@ TEST_F(ValidationOrderingTest, FalseShouldStartUpdaterStopsDynamicChecks) {
/*fail_static=*/false,
/*should_start=*/false,
/*fail_dynamic=*/false);
EXPECT_NO_THROW(updater.start());
EXPECT_NO_THROW(updater.Start());
EXPECT_EQ(
std::vector<std::string>({
"ValidateStaticConfiguration",
Expand All @@ -154,7 +154,7 @@ TEST_F(ValidationOrderingTest, FailedDynamicCheckStopsStartUpdater) {
/*fail_static=*/false,
/*should_start=*/true,
/*fail_dynamic=*/true);
EXPECT_THROW(updater.start(), MetadataUpdater::ConfigurationValidationError);
EXPECT_THROW(updater.Start(), MetadataUpdater::ConfigurationValidationError);
EXPECT_EQ(
std::vector<std::string>({
"ValidateStaticConfiguration",
Expand All @@ -170,7 +170,7 @@ TEST_F(ValidationOrderingTest, AllChecksPassedInvokesStartUpdater) {
/*fail_static=*/false,
/*should_start=*/true,
/*fail_dynamic=*/false);
EXPECT_NO_THROW(updater.start());
EXPECT_NO_THROW(updater.Start());
EXPECT_EQ(
std::vector<std::string>({
"ValidateStaticConfiguration",
Expand Down