diff --git a/src/docker.cc b/src/docker.cc index cd77ecc4..1850e699 100644 --- a/src/docker.cc +++ b/src/docker.cc @@ -29,6 +29,7 @@ #include "resource.h" #include "store.h" #include "time.h" +#include "util.h" namespace http = boost::network::http; @@ -36,6 +37,9 @@ namespace google { namespace { +constexpr const int kDockerValidationRetryLimit = 10; +constexpr const int kDockerValidationRetryDelaySeconds = 1; + #if 0 constexpr const char kDockerEndpointHost[] = "unix://%2Fvar%2Frun%2Fdocker.sock/"; constexpr const char kDockerApiVersion[] = "1.23"; @@ -45,24 +49,38 @@ constexpr const char kDockerContainerResourcePrefix[] = "container"; } +// A subclass of QueryException to represent non-retriable errors. +class DockerReader::NonRetriableError : public DockerReader::QueryException { + public: + NonRetriableError(const std::string& what) : QueryException(what) {} +}; + DockerReader::DockerReader(const Configuration& config) : config_(config), environment_(config) {} -bool DockerReader::ValidateConfiguration() const { - try { - const std::string container_filter( - config_.DockerContainerFilter().empty() - ? "" : "&" + config_.DockerContainerFilter()); - - // A limit may exist in the container_filter, however, the docker API only - // uses the first limit provided in the query params. - (void) QueryDocker(std::string(kDockerEndpointPath) + - "/json?all=true&limit=1" + container_filter); +void DockerReader::ValidateConfiguration() const + throw(MetadataUpdater::ConfigurationValidationError) { + const std::string container_filter( + config_.DockerContainerFilter().empty() + ? "" : "&" + config_.DockerContainerFilter()); - return true; + try { + util::Retry( + kDockerValidationRetryLimit, + time::seconds(kDockerValidationRetryDelaySeconds), + [this, &container_filter]() { + // A limit may exist in the container_filter, however, the docker API only + // uses the first limit provided in the query params. + (void) QueryDocker(std::string(kDockerEndpointPath) + + "/json?all=true&limit=1" + container_filter); + }); + } catch (const NonRetriableError& e) { + throw MetadataUpdater::ConfigurationValidationError( + "Docker query validation failed: " + e.what()); } catch (const QueryException& e) { // Already logged. - return false; + throw MetadataUpdater::ConfigurationValidationError( + "Docker query validation retry limit reached: " + e.what()); } } @@ -190,7 +208,12 @@ json::value DockerReader::QueryDocker(const std::string& path) const } try { http::local_client::response response = client.get(request); - if (status(response) >= 300) { + if (status(response) >= 400 && status(response) <= 403) { + throw NonRetriableError( + format::Substitute("Server responded with '{{message}}' ({{code}})", + {{"message", status_message(response)}, + {"code", format::str(status(response))}})); + } else if (status(response) >= 300) { throw boost::system::system_error( boost::system::errc::make_error_code(boost::system::errc::not_connected), format::Substitute("Server responded with '{{message}}' ({{code}})", @@ -207,12 +230,10 @@ json::value DockerReader::QueryDocker(const std::string& path) const } } -bool DockerUpdater::ValidateConfiguration() const { - if (!PollingMetadataUpdater::ValidateConfiguration()) { - return false; - } - - return reader_.ValidateConfiguration(); +void DockerUpdater::ValidateConfiguration() const + throw(ConfigurationValidationError) { + PollingMetadataUpdater::ValidateConfiguration(); + reader_.ValidateConfiguration(); } } diff --git a/src/docker.h b/src/docker.h index ea62a765..1725118b 100644 --- a/src/docker.h +++ b/src/docker.h @@ -35,9 +35,9 @@ class DockerReader { // A Docker metadata query function. std::vector MetadataQuery() const; - // Validates the relevant configuration and returns true if it's correct. - // Returns a bool that represents if it's configured properly. - bool ValidateConfiguration() const; + // Validates the relevant configuration and throws if it's incorrect. + void ValidateConfiguration() const + throw(MetadataUpdater::ConfigurationValidationError); private: // A representation of all query-related errors. @@ -48,6 +48,7 @@ class DockerReader { private: std::string explanation_; }; + class NonRetriableError; // Issues a Docker API query at a given path and returns a parsed // JSON response. The path has to start with "/". @@ -72,7 +73,7 @@ class DockerUpdater : public PollingMetadataUpdater { [=]() { return reader_.MetadataQuery(); }) { } protected: - bool ValidateConfiguration() const; + void ValidateConfiguration() const throw(ConfigurationValidationError); private: DockerReader reader_; diff --git a/src/kubernetes.cc b/src/kubernetes.cc index 2168768e..c1f2695a 100644 --- a/src/kubernetes.cc +++ b/src/kubernetes.cc @@ -36,6 +36,7 @@ #include "resource.h" #include "store.h" #include "time.h" +#include "util.h" namespace http = boost::network::http; @@ -43,6 +44,9 @@ namespace google { namespace { +constexpr const int kKubernetesValidationRetryLimit = 10; +constexpr const int kKubernetesValidationRetryDelaySeconds = 1; + #if 0 constexpr const char kKubernetesEndpointHost[] = "https://kubernetes.default.svc"; #endif @@ -89,6 +93,13 @@ bool ReadServiceAccountSecret( } +// A subclass of QueryException to represent non-retriable errors. +class KubernetesReader::NonRetriableError + : public KubernetesReader::QueryException { + public: + NonRetriableError(const std::string& what) : QueryException(what) {} +}; + KubernetesReader::KubernetesReader(const Configuration& config, HealthChecker* health_checker) : config_(config), environment_(config), health_checker_(health_checker) {} @@ -654,7 +665,12 @@ json::value KubernetesReader::QueryMaster(const std::string& path) const } try { http::client::response response = client.get(request); - if (status(response) >= 300) { + if (status(response) >= 400 && status(response) <= 403) { + throw NonRetriableError( + format::Substitute("Server responded with '{{message}}' ({{code}})", + {{"message", status_message(response)}, + {"code", format::str(status(response))}})); + } else if (status(response) >= 300) { throw boost::system::system_error( boost::system::errc::make_error_code(boost::system::errc::not_connected), format::Substitute("Server responded with '{{message}}' ({{code}})", @@ -1056,31 +1072,50 @@ void KubernetesReader::UpdateServiceToPodsCache( } } -bool KubernetesReader::ValidateConfiguration() const { +void KubernetesReader::ValidateConfiguration() const + throw(MetadataUpdater::ConfigurationValidationError) { try { - (void) QueryMaster(std::string(kKubernetesEndpointPath) + "/nodes?limit=1"); + util::Retry( + kKubernetesValidationRetryLimit, + time::seconds(kKubernetesValidationRetryDelaySeconds), + [this]() { + (void) QueryMaster(std::string(kKubernetesEndpointPath) + + "/nodes?limit=1"); + }); + } catch (const NonRetriableError& e) { + throw MetadataUpdater::ConfigurationValidationError( + "Node query validation failed: " + e.what()); } catch (const QueryException& e) { // Already logged. - return false; + throw MetadataUpdater::ConfigurationValidationError( + "Node query validation retry limit reached: " + e.what()); } - try { - const std::string pod_label_selector( - config_.KubernetesPodLabelSelector().empty() - ? "" : "&" + config_.KubernetesPodLabelSelector()); + const std::string pod_label_selector( + config_.KubernetesPodLabelSelector().empty() + ? "" : "&" + config_.KubernetesPodLabelSelector()); - (void) QueryMaster(std::string(kKubernetesEndpointPath) + "/pods?limit=1" + - pod_label_selector); + try { + util::Retry( + kKubernetesValidationRetryLimit, + time::seconds(kKubernetesValidationRetryDelaySeconds), + [this, &pod_label_selector]() { + (void) QueryMaster(std::string(kKubernetesEndpointPath) + + "/pods?limit=1" + pod_label_selector); + }); + } catch (const NonRetriableError& e) { + throw MetadataUpdater::ConfigurationValidationError( + "Pod query validation failed: " + e.what()); } catch (const QueryException& e) { // Already logged. - return false; + throw MetadataUpdater::ConfigurationValidationError( + "Pod query validation retry limit reached: " + e.what()); } if (CurrentNode().empty()) { - return false; + throw new MetadataUpdater::ConfigurationValidationError( + "Current node cannot be empty"); } - - return true; } void KubernetesReader::PodCallback( @@ -1233,12 +1268,16 @@ KubernetesUpdater::KubernetesUpdater(const Configuration& config, config.KubernetesUpdaterIntervalSeconds(), [=]() { return reader_.MetadataQuery(); }) { } -bool KubernetesUpdater::ValidateConfiguration() const { - if (!PollingMetadataUpdater::ValidateConfiguration()) { - return false; - } +void KubernetesUpdater::ValidateConfiguration() const + throw(ConfigurationValidationError) { + PollingMetadataUpdater::ValidateConfiguration(); + reader_.ValidateConfiguration(); +} - return reader_.ValidateConfiguration(); +bool KubernetesUpdater::ShouldStartUpdater() const { + return + PollingMetadataUpdater::ShouldStartUpdater() || + config().KubernetesUseWatch(); } void KubernetesUpdater::StartUpdater() { diff --git a/src/kubernetes.h b/src/kubernetes.h index ea620df3..d5d7d9dc 100644 --- a/src/kubernetes.h +++ b/src/kubernetes.h @@ -43,9 +43,9 @@ class KubernetesReader { // A Kubernetes metadata query function. std::vector MetadataQuery() const; - // Validates the relevant configuration and returns true if it's correct. - // Returns a bool that represents if it's configured properly. - bool ValidateConfiguration() const; + // Validates the relevant configuration and throws if it's incorrect. + void ValidateConfiguration() const + throw(MetadataUpdater::ConfigurationValidationError); // Node watcher. void WatchNodes(const std::string& node_name, @@ -75,6 +75,7 @@ class KubernetesReader { private: std::string explanation_; }; + class NonRetriableError; // Issues a Kubernetes master API query at a given path and // returns a parsed JSON response. The path has to start with "/". @@ -226,7 +227,9 @@ class KubernetesUpdater : public PollingMetadataUpdater { } protected: - bool ValidateConfiguration() const; + void ValidateConfiguration() const throw(ConfigurationValidationError); + bool ShouldStartUpdater() const; + void StartUpdater(); private: diff --git a/src/updater.cc b/src/updater.cc index 980555df..293d4b57 100644 --- a/src/updater.cc +++ b/src/updater.cc @@ -19,6 +19,7 @@ #include #include "configuration.h" +#include "format.h" #include "logging.h" namespace google { @@ -29,13 +30,14 @@ MetadataUpdater::MetadataUpdater(const Configuration& config, MetadataUpdater::~MetadataUpdater() {} -void MetadataUpdater::start() { - if (!ValidateConfiguration()) { - LOG(ERROR) << "Failed to validate configuration for " << name_; - return; - } +void MetadataUpdater::start() throw(ConfigurationValidationError) { + ValidateConfiguration(); - StartUpdater(); + if (ShouldStartUpdater()) { + StartUpdater(); + } else { + LOG(INFO) << "Not starting " << name_; + } } void MetadataUpdater::stop() { @@ -58,8 +60,17 @@ PollingMetadataUpdater::~PollingMetadataUpdater() { } } -bool PollingMetadataUpdater::ValidateConfiguration() const { - return period_ >= time::seconds::zero(); +void PollingMetadataUpdater::ValidateConfiguration() const + throw(ConfigurationValidationError) { + if (period_ < time::seconds::zero()) { + throw ConfigurationValidationError( + format::Substitute("Polling period {{period}}s cannot be negative", + {{"period", format::str(period_.count())}})); + } +} + +bool PollingMetadataUpdater::ShouldStartUpdater() const { + return period_ > time::seconds::zero(); } void PollingMetadataUpdater::StartUpdater() { @@ -67,9 +78,7 @@ void PollingMetadataUpdater::StartUpdater() { if (config().VerboseLogging()) { LOG(INFO) << "Timer locked"; } - if (period_ > time::seconds::zero()) { - reporter_thread_ = std::thread([=]() { PollForMetadata(); }); - } + reporter_thread_ = std::thread([=]() { PollForMetadata(); }); } void PollingMetadataUpdater::StopUpdater() { diff --git a/src/updater.h b/src/updater.h index 8205f912..e26276be 100644 --- a/src/updater.h +++ b/src/updater.h @@ -55,12 +55,22 @@ class MetadataUpdater { MetadataStore::Metadata metadata_; }; + // A representation of all validation errors. + class ConfigurationValidationError { + public: + ConfigurationValidationError(const std::string& what) + : explanation_(what) {} + const std::string& what() const { return explanation_; } + private: + std::string explanation_; + }; + MetadataUpdater(const Configuration& config, MetadataStore* store, const std::string& name); virtual ~MetadataUpdater(); // Starts updating. - void start(); + void start() throw(ConfigurationValidationError); // Stops updating. void stop(); @@ -71,9 +81,15 @@ class MetadataUpdater { protected: friend class UpdaterTest; - // Validates the relevant configuration and returns true if it's correct. - // Returns a bool that represents if it's configured properly. - virtual bool ValidateConfiguration() const { + // Validates the relevant configuration. + // If the configuration is invalid, throws a ConfigurationValidationError, + // which is generally expected to pass through and terminate the program. + virtual void ValidateConfiguration() const + throw(ConfigurationValidationError) {} + + // Decides whether the updater logic should be started based on the current + // configuration. + virtual bool ShouldStartUpdater() const { return true; } @@ -93,7 +109,7 @@ class MetadataUpdater { store_->UpdateMetadata(result.resource_, std::move(result.metadata_)); } - const Configuration& config() { + const Configuration& config() const { return config_; } @@ -119,7 +135,8 @@ class PollingMetadataUpdater : public MetadataUpdater { protected: friend class UpdaterTest; - bool ValidateConfiguration() const; + void ValidateConfiguration() const throw(ConfigurationValidationError); + bool ShouldStartUpdater() const; void StartUpdater(); void StopUpdater(); diff --git a/src/util.h b/src/util.h new file mode 100644 index 00000000..44193bf7 --- /dev/null +++ b/src/util.h @@ -0,0 +1,59 @@ +/* + * Copyright 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +#ifndef UTIL_H_ +#define UTIL_H_ + +#include +#include + +#include "time.h" + +namespace google { + +namespace util { + +// Executes the work function while it throws exception Fail, up to a given +// number of times, with a delay between executions. If it throws exception +// Terminate, which may be a subclass of Fail, pass it through (no retries). +// Any other exception outside of the Fail hierarchy will be passed through. +template +void Retry(int max_retries, time::seconds delay, + std::function work); + +template +void Retry(int max_retries, time::seconds delay, + std::function work) { + for (int i = 0; true; ++i) { + try { + work(); + return; + } catch (const Terminate& e) { // Catch first due to inheritance. + throw e; + } catch (const Fail& e) { + if (i < max_retries - 1) { + std::this_thread::sleep_for(delay); + } else { + throw e; + } + } + } +} + +} // util + +} // google + +#endif // UTIL_H_ diff --git a/test/Makefile b/test/Makefile index b729dfda..645a1754 100644 --- a/test/Makefile +++ b/test/Makefile @@ -59,7 +59,8 @@ TESTS=\ resource_unittest \ store_unittest \ time_unittest \ - updater_unittest + updater_unittest \ + util_unittest GTEST_LIB=gtest_lib.a @@ -138,7 +139,9 @@ store_unittest: store_unittest.o $(SRC_DIR)/store.o $(SRC_DIR)/resource.o $(SRC_ $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@ time_unittest: time_unittest.o $(SRC_DIR)/time.o $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@ -updater_unittest: updater_unittest.o $(SRC_DIR)/updater.o $(SRC_DIR)/resource.o $(SRC_DIR)/store.o $(SRC_DIR)/configuration.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o $(SRC_DIR)/json.o +updater_unittest: updater_unittest.o $(SRC_DIR)/updater.o $(SRC_DIR)/resource.o $(SRC_DIR)/store.o $(SRC_DIR)/configuration.o $(SRC_DIR)/logging.o $(SRC_DIR)/time.o $(SRC_DIR)/json.o $(SRC_DIR)/format.o + $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@ +util_unittest: util_unittest.o $(CXX) $(LDFLAGS) $^ $(LDLIBS) -o $@ .PHONY: all test clean purge diff --git a/test/updater_unittest.cc b/test/updater_unittest.cc index 6dd53404..030e1dca 100644 --- a/test/updater_unittest.cc +++ b/test/updater_unittest.cc @@ -14,8 +14,12 @@ class UpdaterTest : public ::testing::Test { // query_metadata function not needed to test callbacks. UpdaterTest() : config(), store(config) {} - static bool ValidateConfiguration(MetadataUpdater* updater) { - return updater->ValidateConfiguration(); + static void ValidateConfiguration(MetadataUpdater* updater) { + updater->ValidateConfiguration(); + } + + static bool ShouldStartUpdater(MetadataUpdater* updater) { + return updater->ShouldStartUpdater(); } static void UpdateMetadataCallback( @@ -36,19 +40,31 @@ class UpdaterTest : public ::testing::Test { namespace { -TEST_F(UpdaterTest, OneMinutePollingIntervalIsValid) { +TEST_F(UpdaterTest, ValidateConfiguration_OneMinutePollingIntervalIsValid) { PollingMetadataUpdater updater(config, &store, "Test", 60, nullptr); - EXPECT_TRUE(ValidateConfiguration(&updater)); + EXPECT_NO_THROW(ValidateConfiguration(&updater)); } -TEST_F(UpdaterTest, ZeroSecondPollingIntervalIsValid) { +TEST_F(UpdaterTest, ValidateConfiguration_ZeroSecondPollingIntervalIsValid) { PollingMetadataUpdater updater(config, &store, "Test", 0, nullptr); - EXPECT_TRUE(ValidateConfiguration(&updater)); + EXPECT_NO_THROW(ValidateConfiguration(&updater)); } -TEST_F(UpdaterTest, NegativePollingIntervalIsInvalid) { +TEST_F(UpdaterTest, ValidateConfiguration_NegativePollingIntervalIsInvalid) { PollingMetadataUpdater updater(config, &store, "BadUpdater", -1, nullptr); - EXPECT_FALSE(ValidateConfiguration(&updater)); + EXPECT_THROW( + ValidateConfiguration(&updater), + MetadataUpdater::ConfigurationValidationError); +} + +TEST_F(UpdaterTest, ShouldStart_OneMinutePollingIntervalEnablesUpdate) { + PollingMetadataUpdater updater(config, &store, "Test", 60, nullptr); + EXPECT_TRUE(ShouldStartUpdater(&updater)); +} + +TEST_F(UpdaterTest, ShouldStart_ZeroSecondPollingIntervalDisablesUpdate) { + PollingMetadataUpdater updater(config, &store, "Test", 0, nullptr); + EXPECT_FALSE(ShouldStartUpdater(&updater)); } TEST_F(UpdaterTest, UpdateMetadataCallback) { diff --git a/test/util_unittest.cc b/test/util_unittest.cc new file mode 100644 index 00000000..b24b5afd --- /dev/null +++ b/test/util_unittest.cc @@ -0,0 +1,74 @@ +#include "../src/util.h" +#include "gtest/gtest.h" + +#include "../src/time.h" + +namespace google { + +namespace { + +class Fail {}; + +class Terminate {}; + +class TerminateSubclass : public Fail {}; + +TEST(RetryTest, NoRetriesOnSuccess) { + int invocation_count = 0; + EXPECT_NO_THROW((util::Retry( + 10, time::seconds(0.01), + [&invocation_count]() { ++invocation_count; }))); + EXPECT_EQ(1, invocation_count); +} + +TEST(RetryTest, RetryOnFail) { + int invocation_count = 0; + EXPECT_THROW( + (util::Retry( + 10, time::seconds(0.01), + [&invocation_count]() { ++invocation_count; throw Fail(); })), + Fail); + EXPECT_EQ(10, invocation_count); +} + +TEST(RetryTest, NoRetryOnTerminate) { + int invocation_count = 0; + EXPECT_THROW( + (util::Retry( + 10, time::seconds(0.01), + [&invocation_count]() { ++invocation_count; throw Terminate(); })), + Terminate); + EXPECT_EQ(1, invocation_count); +} + +TEST(RetryTest, RetryWhileFail) { + int invocation_count = 0; + EXPECT_THROW( + (util::Retry( + 10, time::seconds(0.01), + [&invocation_count]() { + if (++invocation_count < 3) { + throw Fail(); + } else { + throw Terminate(); + } + })), + Terminate); + EXPECT_EQ(3, invocation_count); +} + +TEST(RetryTest, RetryWithSubclass) { + int invocation_count = 0; + EXPECT_THROW((util::Retry( + 10, time::seconds(0.01), + [&invocation_count]() { + if (++invocation_count < 5) { + throw Fail(); + } else { + throw TerminateSubclass(); + } + })), TerminateSubclass); + EXPECT_EQ(5, invocation_count); +} +} // namespace +} // namespace google