Skip to content
This repository was archived by the owner on Aug 19, 2019. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 40 additions & 19 deletions src/docker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
#include "resource.h"
#include "store.h"
#include "time.h"
#include "util.h"

namespace http = boost::network::http;

namespace google {

namespace {

constexpr const int kDockerValidationRetryLimit = 10;
constexpr const int kDockerValidationRetryDelaySeconds = 1;

#if 0
constexpr const char kDockerEndpointHost[] = "unix://%2Fvar%2Frun%2Fdocker.sock/";
constexpr const char kDockerApiVersion[] = "1.23";
Expand All @@ -45,24 +49,38 @@ constexpr const char kDockerContainerResourcePrefix[] = "container";

}

// A subclass of QueryException to represent non-retriable errors.
class DockerReader::NonRetriableError : public DockerReader::QueryException {
public:
NonRetriableError(const std::string& what) : QueryException(what) {}
};

DockerReader::DockerReader(const Configuration& config)
: config_(config), environment_(config) {}

bool DockerReader::ValidateConfiguration() const {
try {
const std::string container_filter(
config_.DockerContainerFilter().empty()
? "" : "&" + config_.DockerContainerFilter());

// A limit may exist in the container_filter, however, the docker API only
// uses the first limit provided in the query params.
(void) QueryDocker(std::string(kDockerEndpointPath) +
"/json?all=true&limit=1" + container_filter);
void DockerReader::ValidateConfiguration() const
throw(MetadataUpdater::ConfigurationValidationError) {
const std::string container_filter(
config_.DockerContainerFilter().empty()
? "" : "&" + config_.DockerContainerFilter());

return true;
try {
util::Retry<NonRetriableError, QueryException>(
kDockerValidationRetryLimit,
time::seconds(kDockerValidationRetryDelaySeconds),
[this, &container_filter]() {
// A limit may exist in the container_filter, however, the docker API only
// uses the first limit provided in the query params.
(void) QueryDocker(std::string(kDockerEndpointPath) +
"/json?all=true&limit=1" + container_filter);
});
} catch (const NonRetriableError& e) {
throw MetadataUpdater::ConfigurationValidationError(
"Docker query validation failed: " + e.what());
} catch (const QueryException& e) {
// Already logged.
return false;
throw MetadataUpdater::ConfigurationValidationError(
"Docker query validation retry limit reached: " + e.what());
}
}

Expand Down Expand Up @@ -190,7 +208,12 @@ json::value DockerReader::QueryDocker(const std::string& path) const
}
try {
http::local_client::response response = client.get(request);
if (status(response) >= 300) {
if (status(response) >= 400 && status(response) <= 403) {
throw NonRetriableError(
format::Substitute("Server responded with '{{message}}' ({{code}})",
{{"message", status_message(response)},
{"code", format::str(status(response))}}));
} else if (status(response) >= 300) {
throw boost::system::system_error(
boost::system::errc::make_error_code(boost::system::errc::not_connected),
format::Substitute("Server responded with '{{message}}' ({{code}})",
Expand All @@ -207,12 +230,10 @@ json::value DockerReader::QueryDocker(const std::string& path) const
}
}

bool DockerUpdater::ValidateConfiguration() const {
if (!PollingMetadataUpdater::ValidateConfiguration()) {
return false;
}

return reader_.ValidateConfiguration();
void DockerUpdater::ValidateConfiguration() const
throw(ConfigurationValidationError) {
PollingMetadataUpdater::ValidateConfiguration();
reader_.ValidateConfiguration();
}

}
9 changes: 5 additions & 4 deletions src/docker.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ class DockerReader {
// A Docker metadata query function.
std::vector<MetadataUpdater::ResourceMetadata> MetadataQuery() const;

// Validates the relevant configuration and returns true if it's correct.
// Returns a bool that represents if it's configured properly.
bool ValidateConfiguration() const;
// Validates the relevant configuration and throws if it's incorrect.
void ValidateConfiguration() const
throw(MetadataUpdater::ConfigurationValidationError);

private:
// A representation of all query-related errors.
Expand All @@ -48,6 +48,7 @@ class DockerReader {
private:
std::string explanation_;
};
class NonRetriableError;

// Issues a Docker API query at a given path and returns a parsed
// JSON response. The path has to start with "/".
Expand All @@ -72,7 +73,7 @@ class DockerUpdater : public PollingMetadataUpdater {
[=]() { return reader_.MetadataQuery(); }) { }

protected:
bool ValidateConfiguration() const;
void ValidateConfiguration() const throw(ConfigurationValidationError);

private:
DockerReader reader_;
Expand Down
77 changes: 58 additions & 19 deletions src/kubernetes.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,17 @@
#include "resource.h"
#include "store.h"
#include "time.h"
#include "util.h"

namespace http = boost::network::http;

namespace google {

namespace {

constexpr const int kKubernetesValidationRetryLimit = 10;
constexpr const int kKubernetesValidationRetryDelaySeconds = 1;

#if 0
constexpr const char kKubernetesEndpointHost[] = "https://kubernetes.default.svc";
#endif
Expand Down Expand Up @@ -89,6 +93,13 @@ bool ReadServiceAccountSecret(

}

// A subclass of QueryException to represent non-retriable errors.
class KubernetesReader::NonRetriableError
: public KubernetesReader::QueryException {
public:
NonRetriableError(const std::string& what) : QueryException(what) {}
};

KubernetesReader::KubernetesReader(const Configuration& config,
HealthChecker* health_checker)
: config_(config), environment_(config), health_checker_(health_checker) {}
Expand Down Expand Up @@ -654,7 +665,12 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const
}
try {
http::client::response response = client.get(request);
if (status(response) >= 300) {
if (status(response) >= 400 && status(response) <= 403) {
throw NonRetriableError(
format::Substitute("Server responded with '{{message}}' ({{code}})",
{{"message", status_message(response)},
{"code", format::str(status(response))}}));
} else if (status(response) >= 300) {
throw boost::system::system_error(
boost::system::errc::make_error_code(boost::system::errc::not_connected),
format::Substitute("Server responded with '{{message}}' ({{code}})",
Expand Down Expand Up @@ -1056,31 +1072,50 @@ void KubernetesReader::UpdateServiceToPodsCache(
}
}

bool KubernetesReader::ValidateConfiguration() const {
void KubernetesReader::ValidateConfiguration() const
throw(MetadataUpdater::ConfigurationValidationError) {
try {
(void) QueryMaster(std::string(kKubernetesEndpointPath) + "/nodes?limit=1");
util::Retry<NonRetriableError, QueryException>(
kKubernetesValidationRetryLimit,
time::seconds(kKubernetesValidationRetryDelaySeconds),
[this]() {
(void) QueryMaster(std::string(kKubernetesEndpointPath)
+ "/nodes?limit=1");
});
} catch (const NonRetriableError& e) {
throw MetadataUpdater::ConfigurationValidationError(
"Node query validation failed: " + e.what());
} catch (const QueryException& e) {
// Already logged.
return false;
throw MetadataUpdater::ConfigurationValidationError(
"Node query validation retry limit reached: " + e.what());
}

try {
const std::string pod_label_selector(
config_.KubernetesPodLabelSelector().empty()
? "" : "&" + config_.KubernetesPodLabelSelector());
const std::string pod_label_selector(
config_.KubernetesPodLabelSelector().empty()
? "" : "&" + config_.KubernetesPodLabelSelector());

(void) QueryMaster(std::string(kKubernetesEndpointPath) + "/pods?limit=1" +
pod_label_selector);
try {
util::Retry<NonRetriableError, QueryException>(
kKubernetesValidationRetryLimit,
time::seconds(kKubernetesValidationRetryDelaySeconds),
[this, &pod_label_selector]() {
(void) QueryMaster(std::string(kKubernetesEndpointPath)
+ "/pods?limit=1" + pod_label_selector);
});
} catch (const NonRetriableError& e) {
throw MetadataUpdater::ConfigurationValidationError(
"Pod query validation failed: " + e.what());
} catch (const QueryException& e) {
// Already logged.
return false;
throw MetadataUpdater::ConfigurationValidationError(
"Pod query validation retry limit reached: " + e.what());
}

if (CurrentNode().empty()) {
return false;
throw new MetadataUpdater::ConfigurationValidationError(
"Current node cannot be empty");
}

return true;
}

void KubernetesReader::PodCallback(
Expand Down Expand Up @@ -1233,12 +1268,16 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config,
config.KubernetesUpdaterIntervalSeconds(),
[=]() { return reader_.MetadataQuery(); }) { }

bool KubernetesUpdater::ValidateConfiguration() const {
if (!PollingMetadataUpdater::ValidateConfiguration()) {
return false;
}
void KubernetesUpdater::ValidateConfiguration() const
throw(ConfigurationValidationError) {
PollingMetadataUpdater::ValidateConfiguration();
reader_.ValidateConfiguration();
}

return reader_.ValidateConfiguration();
bool KubernetesUpdater::ShouldStartUpdater() const {
return
PollingMetadataUpdater::ShouldStartUpdater() ||
config().KubernetesUseWatch();
}

void KubernetesUpdater::StartUpdater() {
Expand Down
11 changes: 7 additions & 4 deletions src/kubernetes.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class KubernetesReader {
// A Kubernetes metadata query function.
std::vector<MetadataUpdater::ResourceMetadata> MetadataQuery() const;

// Validates the relevant configuration and returns true if it's correct.
// Returns a bool that represents if it's configured properly.
bool ValidateConfiguration() const;
// Validates the relevant configuration and throws if it's incorrect.
void ValidateConfiguration() const
throw(MetadataUpdater::ConfigurationValidationError);

// Node watcher.
void WatchNodes(const std::string& node_name,
Expand Down Expand Up @@ -75,6 +75,7 @@ class KubernetesReader {
private:
std::string explanation_;
};
class NonRetriableError;

// Issues a Kubernetes master API query at a given path and
// returns a parsed JSON response. The path has to start with "/".
Expand Down Expand Up @@ -226,7 +227,9 @@ class KubernetesUpdater : public PollingMetadataUpdater {
}

protected:
bool ValidateConfiguration() const;
void ValidateConfiguration() const throw(ConfigurationValidationError);
bool ShouldStartUpdater() const;

void StartUpdater();

private:
Expand Down
31 changes: 20 additions & 11 deletions src/updater.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <chrono>

#include "configuration.h"
#include "format.h"
#include "logging.h"

namespace google {
Expand All @@ -29,13 +30,14 @@ MetadataUpdater::MetadataUpdater(const Configuration& config,

MetadataUpdater::~MetadataUpdater() {}

void MetadataUpdater::start() {
if (!ValidateConfiguration()) {
LOG(ERROR) << "Failed to validate configuration for " << name_;
return;
}
void MetadataUpdater::start() throw(ConfigurationValidationError) {
ValidateConfiguration();

StartUpdater();
if (ShouldStartUpdater()) {
StartUpdater();
} else {
LOG(INFO) << "Not starting " << name_;
}
}

void MetadataUpdater::stop() {
Expand All @@ -58,18 +60,25 @@ PollingMetadataUpdater::~PollingMetadataUpdater() {
}
}

bool PollingMetadataUpdater::ValidateConfiguration() const {
return period_ >= time::seconds::zero();
void PollingMetadataUpdater::ValidateConfiguration() const
throw(ConfigurationValidationError) {
if (period_ < time::seconds::zero()) {
throw ConfigurationValidationError(
format::Substitute("Polling period {{period}}s cannot be negative",
{{"period", format::str(period_.count())}}));
}
}

bool PollingMetadataUpdater::ShouldStartUpdater() const {
return period_ > time::seconds::zero();
}

void PollingMetadataUpdater::StartUpdater() {
timer_.lock();
if (config().VerboseLogging()) {
LOG(INFO) << "Timer locked";
}
if (period_ > time::seconds::zero()) {
reporter_thread_ = std::thread([=]() { PollForMetadata(); });
}
reporter_thread_ = std::thread([=]() { PollForMetadata(); });
}

void PollingMetadataUpdater::StopUpdater() {
Expand Down
Loading