From 24e165c8e277046c4c4e290b86fb2647541db387 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Tue, 21 Jan 2020 10:05:43 -0500 Subject: [PATCH 1/8] Add leader election config and to sharedmain --- injection/sharedmain/main.go | 139 ++++++++++++++++++++---- leaderelection/leaderelection.go | 147 +++++++++++++++++++++++++ leaderelection/leaderelection_test.go | 148 ++++++++++++++++++++++++++ 3 files changed, 412 insertions(+), 22 deletions(-) create mode 100644 leaderelection/leaderelection.go create mode 100644 leaderelection/leaderelection_test.go diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 83dd0d250b..188091df47 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -32,8 +32,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" + "k8s.io/client-go/tools/record" "go.uber.org/zap" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -41,6 +47,7 @@ import ( "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection" + kle "knative.dev/pkg/leaderelection" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" @@ -94,6 +101,20 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { return logging.NewConfigFromConfigMap(loggingConfigMap) } +// GetLeaderElectionConfig gets the leader election config. +func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) { + leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(kle.ConfigMapName(), metav1.GetOptions{}) + if err != nil { + if apierrors.IsNotFound(err) { + return kle.NewConfigFromMap(nil) + } else { + return nil, err + } + } + + return kle.NewConfigFromConfigMap(leaderElectionConfigMap) +} + // Main runs the generic main flow for non-webhook controllers with a new // context. Use WebhookMainWith* if you need to serve webhooks. func Main(component string, ctors ...injection.ControllerConstructor) { @@ -129,35 +150,109 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto profilingHandler := profiling.NewHandler(logger, false) CheckK8sClientMinimumVersionOrDie(ctx, logger) - cmw := SetupConfigMapWatchOrDie(ctx, logger) - controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) - WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) - WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) - logger.Info("Starting configuration manager...") - if err := cmw.Start(ctx.Done()); err != nil { - logger.Fatalw("Failed to start configuration manager", zap.Error(err)) + run := func(ctx context.Context) { + cmw := SetupConfigMapWatchOrDie(ctx, logger) + controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...) + WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component) + WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component) + + logger.Info("Starting configuration manager...") + if err := cmw.Start(ctx.Done()); err != nil { + logger.Fatalw("Failed to start configuration manager", zap.Error(err)) + } + logger.Info("Starting informers...") + if err := controller.StartInformers(ctx.Done(), informers...); err != nil { + logger.Fatalw("Failed to start informers", zap.Error(err)) + } + logger.Info("Starting controllers...") + go controller.StartAll(ctx.Done(), controllers...) + + profilingServer := profiling.NewServer(profilingHandler) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(profilingServer.ListenAndServe) + + // This will block until either a signal arrives or one of the grouped functions + // returns an error. + <-egCtx.Done() + + profilingServer.Shutdown(context.Background()) + // Don't forward ErrServerClosed as that indicates we're already shutting down. + if err := eg.Wait(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Error while running server", zap.Error(err)) + } } - logger.Info("Starting informers...") - if err := controller.StartInformers(ctx.Done(), informers...); err != nil { - logger.Fatalw("Failed to start informers", zap.Error(err)) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), + } + recorder = eventBroadcaster.NewRecorder( + // todo: what agent name? + scheme.Scheme, corev1.EventSource{Component: "fake-agent-name"}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() } - logger.Info("Starting controllers...") - go controller.StartAll(ctx.Done(), controllers...) - profilingServer := profiling.NewServer(profilingHandler) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(profilingServer.ListenAndServe) + // Set up leader election config + leaderElectionConfig, err := GetLeaderElectionConfig(ctx) + if err != nil { + log.Fatalf("Error loading leader election configuration: %v", err) + } + leConfig := leaderElectionConfig.GetComponentConfig(component) - // This will block until either a signal arrives or one of the grouped functions - // returns an error. - <-egCtx.Done() + if !leConfig.LeaderElect { + log.Printf("%v will not run in leader-elected mode", component) + run(ctx) + logger.Fatal("unreachable") + } - profilingServer.Shutdown(context.Background()) - // Don't forward ErrServerClosed as that indicates we're already shutting down. - if err := eg.Wait(); err != nil && err != http.ErrServerClosed { - logger.Errorw("Error while running server", zap.Error(err)) + // Create a unique identifier so that two controllers on the same host don't + // race. + id, err := kle.UniqueID() + if err != nil { + logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) } + logger.Infof("%v will run in leader-elected mode with id %v", component, id) + + rl, err := resourcelock.New(leConfig.ResourceLock, + system.Namespace(), // use namespace we are running in + component, // component is used as the resource name + kubeclient.Get(ctx).CoreV1(), + kubeclient.Get(ctx).CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + logger.Fatalw("Error creating lock: %v", err) + } + + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: leConfig.LeaseDuration, + RenewDeadline: leConfig.RenewDeadline, + RetryPeriod: leConfig.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + logger.Fatal("leaderelection lost") + }, + }, + // TODO: use health check watchdog, knative/pkg#1048 + Name: component, + }) + logger.Fatal("unreachable") } // WebhookMainWithContext runs the generic main flow for controllers and diff --git a/leaderelection/leaderelection.go b/leaderelection/leaderelection.go new file mode 100644 index 0000000000..4cccd868ff --- /dev/null +++ b/leaderelection/leaderelection.go @@ -0,0 +1,147 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "errors" + "os" + "strings" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/uuid" +) + +const ConfigMapNameEnv = "CONFIG_LEADERELECTION_NAME" + +var errEmptyLeaderElectionConfig = errors.New("empty leader election configuration") + +// NewConfigFromMap returns a Config for the given map, or an error. +func NewConfigFromMap(data map[string]string) (*Config, error) { + config := defaultConfig() + + if resourceLock, ok := data["resourceLock"]; ok { + config.ResourceLock = resourceLock + } + + if leaseDurationStr, ok := data["leaseDuration"]; ok { + if leaseDuration, err := time.ParseDuration(leaseDurationStr); err == nil { + config.LeaseDuration = leaseDuration + } + } + + if renewDeadlineStr, ok := data["renewDeadline"]; ok { + if renewDeadline, err := time.ParseDuration(renewDeadlineStr); err == nil { + config.RenewDeadline = renewDeadline + } + } + + if retryPeriodStr, ok := data["retryPeriod"]; ok { + if retryPeriod, err := time.ParseDuration(retryPeriodStr); err == nil { + config.RetryPeriod = retryPeriod + } + } + + if enabledComponents, ok := data["enabledComponents"]; ok { + tokens := strings.Split(enabledComponents, ",") + config.EnabledComponents = sets.NewString(tokens...) + } + + return &config, nil +} + +// NewConfigFromConfigMap returns a new Config from the given ConfigMap. +func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { + if configMap == nil { + config := defaultConfig() + return &config, nil + } + + return NewConfigFromMap(configMap.Data) +} + +// Config represents the leader election config for a set of components +// contained within a single namespace. Typically these will correspond to a +// single source repository, viz: serving or eventing. +type Config struct { + ResourceLock string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration + EnabledComponents sets.String +} + +func (c *Config) GetComponentConfig(name string) ComponentConfig { + if c.EnabledComponents.Has(name) { + return ComponentConfig{ + LeaderElect: true, + ResourceLock: c.ResourceLock, + LeaseDuration: c.LeaseDuration, + RenewDeadline: c.RenewDeadline, + RetryPeriod: c.RetryPeriod, + } + } + + return defaultComponentConfig() +} + +func defaultConfig() Config { + return Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString(), + } +} + +// ComponentConfig represents the leader election config for a single component. +type ComponentConfig struct { + LeaderElect bool + ResourceLock string + LeaseDuration time.Duration + RenewDeadline time.Duration + RetryPeriod time.Duration +} + +func defaultComponentConfig() ComponentConfig { + return ComponentConfig{ + LeaderElect: false, + } +} + +// ConfigMapName returns the name of the configmap to read for leader election +// settings. +func ConfigMapName() string { + cm := os.Getenv(ConfigMapNameEnv) + if cm == "" { + return "config-leader-election" + } + return cm +} + +// UniqueID returns a unique ID for use with a leader elector that prevents from +// pods running on the same host from colliding with one another. +func UniqueID() (string, error) { + id, err := os.Hostname() + if err != nil { + return "", err + } + + return (id + "_" + string(uuid.NewUUID())), nil +} diff --git a/leaderelection/leaderelection_test.go b/leaderelection/leaderelection_test.go new file mode 100644 index 0000000000..301e696036 --- /dev/null +++ b/leaderelection/leaderelection_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "reflect" + "testing" + "time" + + "k8s.io/apimachinery/pkg/util/sets" +) + +func TestNewConfigMapFromData(t *testing.T) { + cases := []struct { + name string + data map[string]string + expected *Config + err error + }{ + { + name: "disabled but OK config", + data: map[string]string{ + "resourceLock": "leases", + // values in this data come from the defaults suggested in the + // code: + // https://github.com/kubernetes/client-go/blob/kubernetes-1.16.0/tools/leaderelection/leaderelection.go + "leaseDuration": "15s", + "renewDeadline": "10s", + "retryPeriod": "2s", + }, + expected: &Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString(), + }, + }, + { + name: "OK config with controller enabled", + data: map[string]string{ + "resourceLock": "leases", + "leaseDuration": "15s", + "renewDeadline": "10s", + "retryPeriod": "2s", + "enabledComponents": "controller", + }, + expected: &Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString("controller"), + }, + }, + { + name: "config missing resourceLock field", + data: map[string]string{ + "leaseDuration": "15s", + "renewDeadline": "10s", + "retryPeriod": "2s", + "enabledComponents": "controller", + }, + expected: &Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString("controller"), + }, + }, + } + + for i := range cases { + tc := cases[i] + actualConfig, actualErr := NewConfigFromMap(tc.data) + if !reflect.DeepEqual(tc.err, actualErr) { + t.Errorf("%v: expected error %v, got %v", tc.name, tc.err, actualErr) + continue + } + + if !reflect.DeepEqual(tc.expected, actualConfig) { + t.Errorf("%v: expected config:\n%+v\ngot:\n%+v", tc.name, tc.expected, actualConfig) + continue + } + } +} + +func TestGetComponentConfig(t *testing.T) { + cases := []struct { + name string + config Config + expected ComponentConfig + }{ + { + name: "component enabled", + config: Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString("component"), + }, + expected: ComponentConfig{ + LeaderElect: true, + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + }, + }, + { + name: "component disabled", + config: Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString("not-the-component"), + }, + expected: ComponentConfig{ + LeaderElect: false, + }, + }, + } + + for i := range cases { + tc := cases[i] + actual := tc.config.GetComponentConfig("component") + if !reflect.DeepEqual(tc.expected, actual) { + t.Errorf("%v: expected:\n%+v\ngot:\n%+v", tc.name, tc.expected, actual) + } + } +} From 3a369ea2d2aaefababefe2a966cc0c92a99ef989 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Fri, 21 Feb 2020 11:44:01 -0500 Subject: [PATCH 2/8] Add new dependencies --- Gopkg.lock | 10 +- .../k8s.io/apimachinery/pkg/util/uuid/uuid.go | 27 ++ .../tools/leaderelection/healthzadaptor.go | 69 +++ .../tools/leaderelection/leaderelection.go | 397 ++++++++++++++++++ .../client-go/tools/leaderelection/metrics.go | 109 +++++ .../resourcelock/configmaplock.go | 112 +++++ .../resourcelock/endpointslock.go | 107 +++++ .../leaderelection/resourcelock/interface.go | 126 ++++++ .../leaderelection/resourcelock/leaselock.go | 124 ++++++ 9 files changed, 1079 insertions(+), 2 deletions(-) create mode 100644 vendor/k8s.io/apimachinery/pkg/util/uuid/uuid.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/metrics.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go create mode 100644 vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go diff --git a/Gopkg.lock b/Gopkg.lock index 8ec3cc89c4..ae0a987a3b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -980,7 +980,7 @@ version = "kubernetes-1.16.4" [[projects]] - digest = "1:1aaf879947e3abf264929bb0220acced357f85da5ac0f58b10f0f8a5719a41ef" + digest = "1:e49d4f18068a8b4e5ac2ae52ec9363e8509f17de8c1569a41663ef1632fc4aeb" name = "k8s.io/apimachinery" packages = [ "pkg/api/apitesting", @@ -1025,6 +1025,7 @@ "pkg/util/sets", "pkg/util/sets/types", "pkg/util/strategicpatch", + "pkg/util/uuid", "pkg/util/validation", "pkg/util/validation/field", "pkg/util/wait", @@ -1039,7 +1040,7 @@ version = "kubernetes-1.16.4" [[projects]] - digest = "1:b03297e45cd203dee7e9449141555d3973d04d5323ece6b4e20562b80185767e" + digest = "1:b485876cac57a0612f78c907bde4e34c3f327e35e705f7f59d3bd7c5cd78a688" name = "k8s.io/client-go" packages = [ "discovery", @@ -1229,6 +1230,8 @@ "tools/clientcmd/api", "tools/clientcmd/api/latest", "tools/clientcmd/api/v1", + "tools/leaderelection", + "tools/leaderelection/resourcelock", "tools/metrics", "tools/pager", "tools/record", @@ -1457,6 +1460,7 @@ "k8s.io/apimachinery/pkg/util/runtime", "k8s.io/apimachinery/pkg/util/sets", "k8s.io/apimachinery/pkg/util/sets/types", + "k8s.io/apimachinery/pkg/util/uuid", "k8s.io/apimachinery/pkg/util/validation", "k8s.io/apimachinery/pkg/util/wait", "k8s.io/apimachinery/pkg/version", @@ -1487,6 +1491,8 @@ "k8s.io/client-go/testing", "k8s.io/client-go/tools/cache", "k8s.io/client-go/tools/clientcmd", + "k8s.io/client-go/tools/leaderelection", + "k8s.io/client-go/tools/leaderelection/resourcelock", "k8s.io/client-go/tools/metrics", "k8s.io/client-go/tools/record", "k8s.io/client-go/util/retry", diff --git a/vendor/k8s.io/apimachinery/pkg/util/uuid/uuid.go b/vendor/k8s.io/apimachinery/pkg/util/uuid/uuid.go new file mode 100644 index 0000000000..1fa351aab6 --- /dev/null +++ b/vendor/k8s.io/apimachinery/pkg/util/uuid/uuid.go @@ -0,0 +1,27 @@ +/* +Copyright 2014 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package uuid + +import ( + "github.com/google/uuid" + + "k8s.io/apimachinery/pkg/types" +) + +func NewUUID() types.UID { + return types.UID(uuid.New().String()) +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go b/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go new file mode 100644 index 0000000000..b935372919 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/healthzadaptor.go @@ -0,0 +1,69 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "net/http" + "sync" + "time" +) + +// HealthzAdaptor associates the /healthz endpoint with the LeaderElection object. +// It helps deal with the /healthz endpoint being set up prior to the LeaderElection. +// This contains the code needed to act as an adaptor between the leader +// election code the health check code. It allows us to provide health +// status about the leader election. Most specifically about if the leader +// has failed to renew without exiting the process. In that case we should +// report not healthy and rely on the kubelet to take down the process. +type HealthzAdaptor struct { + pointerLock sync.Mutex + le *LeaderElector + timeout time.Duration +} + +// Name returns the name of the health check we are implementing. +func (l *HealthzAdaptor) Name() string { + return "leaderElection" +} + +// Check is called by the healthz endpoint handler. +// It fails (returns an error) if we own the lease but had not been able to renew it. +func (l *HealthzAdaptor) Check(req *http.Request) error { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + if l.le == nil { + return nil + } + return l.le.Check(l.timeout) +} + +// SetLeaderElection ties a leader election object to a HealthzAdaptor +func (l *HealthzAdaptor) SetLeaderElection(le *LeaderElector) { + l.pointerLock.Lock() + defer l.pointerLock.Unlock() + l.le = le +} + +// NewLeaderHealthzAdaptor creates a basic healthz adaptor to monitor a leader election. +// timeout determines the time beyond the lease expiry to be allowed for timeout. +// checks within the timeout period after the lease expires will still return healthy. +func NewLeaderHealthzAdaptor(timeout time.Duration) *HealthzAdaptor { + result := &HealthzAdaptor{ + timeout: timeout, + } + return result +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go b/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go new file mode 100644 index 0000000000..4be650c0c8 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/leaderelection.go @@ -0,0 +1,397 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package leaderelection implements leader election of a set of endpoints. +// It uses an annotation in the endpoints object to store the record of the +// election state. This implementation does not guarantee that only one +// client is acting as a leader (a.k.a. fencing). +// +// A client only acts on timestamps captured locally to infer the state of the +// leader election. The client does not consider timestamps in the leader +// election record to be accurate because these timestamps may not have been +// produced by a local clock. The implemention does not depend on their +// accuracy and only uses their change to indicate that another client has +// renewed the leader lease. Thus the implementation is tolerant to arbitrary +// clock skew, but is not tolerant to arbitrary clock skew rate. +// +// However the level of tolerance to skew rate can be configured by setting +// RenewDeadline and LeaseDuration appropriately. The tolerance expressed as a +// maximum tolerated ratio of time passed on the fastest node to time passed on +// the slowest node can be approximately achieved with a configuration that sets +// the same ratio of LeaseDuration to RenewDeadline. For example if a user wanted +// to tolerate some nodes progressing forward in time twice as fast as other nodes, +// the user could set LeaseDuration to 60 seconds and RenewDeadline to 30 seconds. +// +// While not required, some method of clock synchronization between nodes in the +// cluster is highly recommended. It's important to keep in mind when configuring +// this client that the tolerance to skew rate varies inversely to master +// availability. +// +// Larger clusters often have a more lenient SLA for API latency. This should be +// taken into account when configuring the client. The rate of leader transitions +// should be monitored and RetryPeriod and LeaseDuration should be increased +// until the rate is stable and acceptably low. It's important to keep in mind +// when configuring this client that the tolerance to API latency varies inversely +// to master availability. +// +// DISCLAIMER: this is an alpha API. This library will likely change significantly +// or even be removed entirely in subsequent releases. Depend on this API at +// your own risk. +package leaderelection + +import ( + "context" + "fmt" + "reflect" + "time" + + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/clock" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + rl "k8s.io/client-go/tools/leaderelection/resourcelock" + + "k8s.io/klog" +) + +const ( + JitterFactor = 1.2 +) + +// NewLeaderElector creates a LeaderElector from a LeaderElectionConfig +func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) { + if lec.LeaseDuration <= lec.RenewDeadline { + return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline") + } + if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) { + return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor") + } + if lec.LeaseDuration < 1 { + return nil, fmt.Errorf("leaseDuration must be greater than zero") + } + if lec.RenewDeadline < 1 { + return nil, fmt.Errorf("renewDeadline must be greater than zero") + } + if lec.RetryPeriod < 1 { + return nil, fmt.Errorf("retryPeriod must be greater than zero") + } + if lec.Callbacks.OnStartedLeading == nil { + return nil, fmt.Errorf("OnStartedLeading callback must not be nil") + } + if lec.Callbacks.OnStoppedLeading == nil { + return nil, fmt.Errorf("OnStoppedLeading callback must not be nil") + } + + if lec.Lock == nil { + return nil, fmt.Errorf("Lock must not be nil.") + } + le := LeaderElector{ + config: lec, + clock: clock.RealClock{}, + metrics: globalMetricsFactory.newLeaderMetrics(), + } + le.metrics.leaderOff(le.config.Name) + return &le, nil +} + +type LeaderElectionConfig struct { + // Lock is the resource that will be used for locking + Lock rl.Interface + + // LeaseDuration is the duration that non-leader candidates will + // wait to force acquire leadership. This is measured against time of + // last observed ack. + // + // A client needs to wait a full LeaseDuration without observing a change to + // the record before it can attempt to take over. When all clients are + // shutdown and a new set of clients are started with different names against + // the same leader record, they must wait the full LeaseDuration before + // attempting to acquire the lease. Thus LeaseDuration should be as short as + // possible (within your tolerance for clock skew rate) to avoid a possible + // long waits in the scenario. + // + // Core clients default this value to 15 seconds. + LeaseDuration time.Duration + // RenewDeadline is the duration that the acting master will retry + // refreshing leadership before giving up. + // + // Core clients default this value to 10 seconds. + RenewDeadline time.Duration + // RetryPeriod is the duration the LeaderElector clients should wait + // between tries of actions. + // + // Core clients default this value to 2 seconds. + RetryPeriod time.Duration + + // Callbacks are callbacks that are triggered during certain lifecycle + // events of the LeaderElector + Callbacks LeaderCallbacks + + // WatchDog is the associated health checker + // WatchDog may be null if its not needed/configured. + WatchDog *HealthzAdaptor + + // ReleaseOnCancel should be set true if the lock should be released + // when the run context is cancelled. If you set this to true, you must + // ensure all code guarded by this lease has successfully completed + // prior to cancelling the context, or you may have two processes + // simultaneously acting on the critical path. + ReleaseOnCancel bool + + // Name is the name of the resource lock for debugging + Name string +} + +// LeaderCallbacks are callbacks that are triggered during certain +// lifecycle events of the LeaderElector. These are invoked asynchronously. +// +// possible future callbacks: +// * OnChallenge() +type LeaderCallbacks struct { + // OnStartedLeading is called when a LeaderElector client starts leading + OnStartedLeading func(context.Context) + // OnStoppedLeading is called when a LeaderElector client stops leading + OnStoppedLeading func() + // OnNewLeader is called when the client observes a leader that is + // not the previously observed leader. This includes the first observed + // leader when the client starts. + OnNewLeader func(identity string) +} + +// LeaderElector is a leader election client. +type LeaderElector struct { + config LeaderElectionConfig + // internal bookkeeping + observedRecord rl.LeaderElectionRecord + observedTime time.Time + // used to implement OnNewLeader(), may lag slightly from the + // value observedRecord.HolderIdentity if the transition has + // not yet been reported. + reportedLeader string + + // clock is wrapper around time to allow for less flaky testing + clock clock.Clock + + metrics leaderMetricsAdapter + + // name is the name of the resource lock for debugging + name string +} + +// Run starts the leader election loop +func (le *LeaderElector) Run(ctx context.Context) { + defer func() { + runtime.HandleCrash() + le.config.Callbacks.OnStoppedLeading() + }() + if !le.acquire(ctx) { + return // ctx signalled done + } + ctx, cancel := context.WithCancel(ctx) + defer cancel() + go le.config.Callbacks.OnStartedLeading(ctx) + le.renew(ctx) +} + +// RunOrDie starts a client with the provided config or panics if the config +// fails to validate. +func RunOrDie(ctx context.Context, lec LeaderElectionConfig) { + le, err := NewLeaderElector(lec) + if err != nil { + panic(err) + } + if lec.WatchDog != nil { + lec.WatchDog.SetLeaderElection(le) + } + le.Run(ctx) +} + +// GetLeader returns the identity of the last observed leader or returns the empty string if +// no leader has yet been observed. +func (le *LeaderElector) GetLeader() string { + return le.observedRecord.HolderIdentity +} + +// IsLeader returns true if the last observed leader was this client else returns false. +func (le *LeaderElector) IsLeader() bool { + return le.observedRecord.HolderIdentity == le.config.Lock.Identity() +} + +// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds. +// Returns false if ctx signals done. +func (le *LeaderElector) acquire(ctx context.Context) bool { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + succeeded := false + desc := le.config.Lock.Describe() + klog.Infof("attempting to acquire leader lease %v...", desc) + wait.JitterUntil(func() { + succeeded = le.tryAcquireOrRenew() + le.maybeReportTransition() + if !succeeded { + klog.V(4).Infof("failed to acquire lease %v", desc) + return + } + le.config.Lock.RecordEvent("became leader") + le.metrics.leaderOn(le.config.Name) + klog.Infof("successfully acquired lease %v", desc) + cancel() + }, le.config.RetryPeriod, JitterFactor, true, ctx.Done()) + return succeeded +} + +// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done. +func (le *LeaderElector) renew(ctx context.Context) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + wait.Until(func() { + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline) + defer timeoutCancel() + err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) { + done := make(chan bool, 1) + go func() { + defer close(done) + done <- le.tryAcquireOrRenew() + }() + + select { + case <-timeoutCtx.Done(): + return false, fmt.Errorf("failed to tryAcquireOrRenew %s", timeoutCtx.Err()) + case result := <-done: + return result, nil + } + }, timeoutCtx.Done()) + + le.maybeReportTransition() + desc := le.config.Lock.Describe() + if err == nil { + klog.V(5).Infof("successfully renewed lease %v", desc) + return + } + le.config.Lock.RecordEvent("stopped leading") + le.metrics.leaderOff(le.config.Name) + klog.Infof("failed to renew lease %v: %v", desc, err) + cancel() + }, le.config.RetryPeriod, ctx.Done()) + + // if we hold the lease, give it up + if le.config.ReleaseOnCancel { + le.release() + } +} + +// release attempts to release the leader lease if we have acquired it. +func (le *LeaderElector) release() bool { + if !le.IsLeader() { + return true + } + leaderElectionRecord := rl.LeaderElectionRecord{ + LeaderTransitions: le.observedRecord.LeaderTransitions, + } + if err := le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to release lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true +} + +// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired, +// else it tries to renew the lease if it has already been acquired. Returns true +// on success else returns false. +func (le *LeaderElector) tryAcquireOrRenew() bool { + now := metav1.Now() + leaderElectionRecord := rl.LeaderElectionRecord{ + HolderIdentity: le.config.Lock.Identity(), + LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second), + RenewTime: now, + AcquireTime: now, + } + + // 1. obtain or create the ElectionRecord + oldLeaderElectionRecord, err := le.config.Lock.Get() + if err != nil { + if !errors.IsNotFound(err) { + klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err) + return false + } + if err = le.config.Lock.Create(leaderElectionRecord); err != nil { + klog.Errorf("error initially creating leader election record: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true + } + + // 2. Record obtained, check the Identity & Time + if !reflect.DeepEqual(le.observedRecord, *oldLeaderElectionRecord) { + le.observedRecord = *oldLeaderElectionRecord + le.observedTime = le.clock.Now() + } + if len(oldLeaderElectionRecord.HolderIdentity) > 0 && + le.observedTime.Add(le.config.LeaseDuration).After(now.Time) && + !le.IsLeader() { + klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity) + return false + } + + // 3. We're going to try to update. The leaderElectionRecord is set to it's default + // here. Let's correct it before updating. + if le.IsLeader() { + leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + } else { + leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1 + } + + // update the lock itself + if err = le.config.Lock.Update(leaderElectionRecord); err != nil { + klog.Errorf("Failed to update lock: %v", err) + return false + } + le.observedRecord = leaderElectionRecord + le.observedTime = le.clock.Now() + return true +} + +func (le *LeaderElector) maybeReportTransition() { + if le.observedRecord.HolderIdentity == le.reportedLeader { + return + } + le.reportedLeader = le.observedRecord.HolderIdentity + if le.config.Callbacks.OnNewLeader != nil { + go le.config.Callbacks.OnNewLeader(le.reportedLeader) + } +} + +// Check will determine if the current lease is expired by more than timeout. +func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error { + if !le.IsLeader() { + // Currently not concerned with the case that we are hot standby + return nil + } + // If we are more than timeout seconds after the lease duration that is past the timeout + // on the lease renew. Time to start reporting ourselves as unhealthy. We should have + // died but conditions like deadlock can prevent this. (See #70819) + if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease { + return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name) + } + + return nil +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/metrics.go b/vendor/k8s.io/client-go/tools/leaderelection/metrics.go new file mode 100644 index 0000000000..65917bf88e --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/metrics.go @@ -0,0 +1,109 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leaderelection + +import ( + "sync" +) + +// This file provides abstractions for setting the provider (e.g., prometheus) +// of metrics. + +type leaderMetricsAdapter interface { + leaderOn(name string) + leaderOff(name string) +} + +// GaugeMetric represents a single numerical value that can arbitrarily go up +// and down. +type SwitchMetric interface { + On(name string) + Off(name string) +} + +type noopMetric struct{} + +func (noopMetric) On(name string) {} +func (noopMetric) Off(name string) {} + +// defaultLeaderMetrics expects the caller to lock before setting any metrics. +type defaultLeaderMetrics struct { + // leader's value indicates if the current process is the owner of name lease + leader SwitchMetric +} + +func (m *defaultLeaderMetrics) leaderOn(name string) { + if m == nil { + return + } + m.leader.On(name) +} + +func (m *defaultLeaderMetrics) leaderOff(name string) { + if m == nil { + return + } + m.leader.Off(name) +} + +type noMetrics struct{} + +func (noMetrics) leaderOn(name string) {} +func (noMetrics) leaderOff(name string) {} + +// MetricsProvider generates various metrics used by the leader election. +type MetricsProvider interface { + NewLeaderMetric() SwitchMetric +} + +type noopMetricsProvider struct{} + +func (_ noopMetricsProvider) NewLeaderMetric() SwitchMetric { + return noopMetric{} +} + +var globalMetricsFactory = leaderMetricsFactory{ + metricsProvider: noopMetricsProvider{}, +} + +type leaderMetricsFactory struct { + metricsProvider MetricsProvider + + onlyOnce sync.Once +} + +func (f *leaderMetricsFactory) setProvider(mp MetricsProvider) { + f.onlyOnce.Do(func() { + f.metricsProvider = mp + }) +} + +func (f *leaderMetricsFactory) newLeaderMetrics() leaderMetricsAdapter { + mp := f.metricsProvider + if mp == (noopMetricsProvider{}) { + return noMetrics{} + } + return &defaultLeaderMetrics{ + leader: mp.NewLeaderMetric(), + } +} + +// SetProvider sets the metrics provider for all subsequently created work +// queues. Only the first call has an effect. +func SetProvider(metricsProvider MetricsProvider) { + globalMetricsFactory.setProvider(metricsProvider) +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go new file mode 100644 index 0000000000..785356894f --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/configmaplock.go @@ -0,0 +1,112 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +// TODO: This is almost a exact replica of Endpoints lock. +// going forwards as we self host more and more components +// and use ConfigMaps as the means to pass that configuration +// data we will likely move to deprecate the Endpoints lock. + +type ConfigMapLock struct { + // ConfigMapMeta should contain a Name and a Namespace of a + // ConfigMapMeta object that the LeaderElector will attempt to lead. + ConfigMapMeta metav1.ObjectMeta + Client corev1client.ConfigMapsGetter + LockConfig ResourceLockConfig + cm *v1.ConfigMap +} + +// Get returns the election record from a ConfigMap Annotation +func (cml *ConfigMapLock) Get() (*LeaderElectionRecord, error) { + var record LeaderElectionRecord + var err error + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Get(cml.ConfigMapMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if cml.cm.Annotations == nil { + cml.cm.Annotations = make(map[string]string) + } + if recordBytes, found := cml.cm.Annotations[LeaderElectionRecordAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (cml *ConfigMapLock) Create(ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Create(&v1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: cml.ConfigMapMeta.Name, + Namespace: cml.ConfigMapMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }) + return err +} + +// Update will update an existing annotation on a given resource. +func (cml *ConfigMapLock) Update(ler LeaderElectionRecord) error { + if cml.cm == nil { + return errors.New("configmap not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + cml.cm.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + cml.cm, err = cml.Client.ConfigMaps(cml.ConfigMapMeta.Namespace).Update(cml.cm) + return err +} + +// RecordEvent in leader election while adding meta-data +func (cml *ConfigMapLock) RecordEvent(s string) { + if cml.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", cml.LockConfig.Identity, s) + cml.LockConfig.EventRecorder.Eventf(&v1.ConfigMap{ObjectMeta: cml.cm.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (cml *ConfigMapLock) Describe() string { + return fmt.Sprintf("%v/%v", cml.ConfigMapMeta.Namespace, cml.ConfigMapMeta.Name) +} + +// returns the Identity of the lock +func (cml *ConfigMapLock) Identity() string { + return cml.LockConfig.Identity +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go new file mode 100644 index 0000000000..bfe5e8b1bb --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/endpointslock.go @@ -0,0 +1,107 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "encoding/json" + "errors" + "fmt" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" +) + +type EndpointsLock struct { + // EndpointsMeta should contain a Name and a Namespace of an + // Endpoints object that the LeaderElector will attempt to lead. + EndpointsMeta metav1.ObjectMeta + Client corev1client.EndpointsGetter + LockConfig ResourceLockConfig + e *v1.Endpoints +} + +// Get returns the election record from a Endpoints Annotation +func (el *EndpointsLock) Get() (*LeaderElectionRecord, error) { + var record LeaderElectionRecord + var err error + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Get(el.EndpointsMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + if el.e.Annotations == nil { + el.e.Annotations = make(map[string]string) + } + if recordBytes, found := el.e.Annotations[LeaderElectionRecordAnnotationKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &record); err != nil { + return nil, err + } + } + return &record, nil +} + +// Create attempts to create a LeaderElectionRecord annotation +func (el *EndpointsLock) Create(ler LeaderElectionRecord) error { + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Create(&v1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: el.EndpointsMeta.Name, + Namespace: el.EndpointsMeta.Namespace, + Annotations: map[string]string{ + LeaderElectionRecordAnnotationKey: string(recordBytes), + }, + }, + }) + return err +} + +// Update will update and existing annotation on a given resource. +func (el *EndpointsLock) Update(ler LeaderElectionRecord) error { + if el.e == nil { + return errors.New("endpoint not initialized, call get or create first") + } + recordBytes, err := json.Marshal(ler) + if err != nil { + return err + } + el.e.Annotations[LeaderElectionRecordAnnotationKey] = string(recordBytes) + el.e, err = el.Client.Endpoints(el.EndpointsMeta.Namespace).Update(el.e) + return err +} + +// RecordEvent in leader election while adding meta-data +func (el *EndpointsLock) RecordEvent(s string) { + if el.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", el.LockConfig.Identity, s) + el.LockConfig.EventRecorder.Eventf(&v1.Endpoints{ObjectMeta: el.e.ObjectMeta}, v1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (el *EndpointsLock) Describe() string { + return fmt.Sprintf("%v/%v", el.EndpointsMeta.Namespace, el.EndpointsMeta.Name) +} + +// returns the Identity of the lock +func (el *EndpointsLock) Identity() string { + return el.LockConfig.Identity +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go new file mode 100644 index 0000000000..050d41a25f --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/interface.go @@ -0,0 +1,126 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" +) + +const ( + LeaderElectionRecordAnnotationKey = "control-plane.alpha.kubernetes.io/leader" + EndpointsResourceLock = "endpoints" + ConfigMapsResourceLock = "configmaps" + LeasesResourceLock = "leases" +) + +// LeaderElectionRecord is the record that is stored in the leader election annotation. +// This information should be used for observational purposes only and could be replaced +// with a random string (e.g. UUID) with only slight modification of this code. +// TODO(mikedanese): this should potentially be versioned +type LeaderElectionRecord struct { + // HolderIdentity is the ID that owns the lease. If empty, no one owns this lease and + // all callers may acquire. Versions of this library prior to Kubernetes 1.14 will not + // attempt to acquire leases with empty identities and will wait for the full lease + // interval to expire before attempting to reacquire. This value is set to empty when + // a client voluntarily steps down. + HolderIdentity string `json:"holderIdentity"` + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime metav1.Time `json:"acquireTime"` + RenewTime metav1.Time `json:"renewTime"` + LeaderTransitions int `json:"leaderTransitions"` +} + +// EventRecorder records a change in the ResourceLock. +type EventRecorder interface { + Eventf(obj runtime.Object, eventType, reason, message string, args ...interface{}) +} + +// ResourceLockConfig common data that exists across different +// resource locks +type ResourceLockConfig struct { + // Identity is the unique string identifying a lease holder across + // all participants in an election. + Identity string + // EventRecorder is optional. + EventRecorder EventRecorder +} + +// Interface offers a common interface for locking on arbitrary +// resources used in leader election. The Interface is used +// to hide the details on specific implementations in order to allow +// them to change over time. This interface is strictly for use +// by the leaderelection code. +type Interface interface { + // Get returns the LeaderElectionRecord + Get() (*LeaderElectionRecord, error) + + // Create attempts to create a LeaderElectionRecord + Create(ler LeaderElectionRecord) error + + // Update will update and existing LeaderElectionRecord + Update(ler LeaderElectionRecord) error + + // RecordEvent is used to record events + RecordEvent(string) + + // Identity will return the locks Identity + Identity() string + + // Describe is used to convert details on current resource lock + // into a string + Describe() string +} + +// Manufacture will create a lock of a given type according to the input parameters +func New(lockType string, ns string, name string, coreClient corev1.CoreV1Interface, coordinationClient coordinationv1.CoordinationV1Interface, rlc ResourceLockConfig) (Interface, error) { + switch lockType { + case EndpointsResourceLock: + return &EndpointsLock{ + EndpointsMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + }, nil + case ConfigMapsResourceLock: + return &ConfigMapLock{ + ConfigMapMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coreClient, + LockConfig: rlc, + }, nil + case LeasesResourceLock: + return &LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Namespace: ns, + Name: name, + }, + Client: coordinationClient, + LockConfig: rlc, + }, nil + default: + return nil, fmt.Errorf("Invalid lock-type %s", lockType) + } +} diff --git a/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go new file mode 100644 index 0000000000..285f944054 --- /dev/null +++ b/vendor/k8s.io/client-go/tools/leaderelection/resourcelock/leaselock.go @@ -0,0 +1,124 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resourcelock + +import ( + "errors" + "fmt" + + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" +) + +type LeaseLock struct { + // LeaseMeta should contain a Name and a Namespace of a + // LeaseMeta object that the LeaderElector will attempt to lead. + LeaseMeta metav1.ObjectMeta + Client coordinationv1client.LeasesGetter + LockConfig ResourceLockConfig + lease *coordinationv1.Lease +} + +// Get returns the election record from a Lease spec +func (ll *LeaseLock) Get() (*LeaderElectionRecord, error) { + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Get(ll.LeaseMeta.Name, metav1.GetOptions{}) + if err != nil { + return nil, err + } + return LeaseSpecToLeaderElectionRecord(&ll.lease.Spec), nil +} + +// Create attempts to create a Lease +func (ll *LeaseLock) Create(ler LeaderElectionRecord) error { + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Create(&coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: ll.LeaseMeta.Name, + Namespace: ll.LeaseMeta.Namespace, + }, + Spec: LeaderElectionRecordToLeaseSpec(&ler), + }) + return err +} + +// Update will update an existing Lease spec. +func (ll *LeaseLock) Update(ler LeaderElectionRecord) error { + if ll.lease == nil { + return errors.New("lease not initialized, call get or create first") + } + ll.lease.Spec = LeaderElectionRecordToLeaseSpec(&ler) + var err error + ll.lease, err = ll.Client.Leases(ll.LeaseMeta.Namespace).Update(ll.lease) + return err +} + +// RecordEvent in leader election while adding meta-data +func (ll *LeaseLock) RecordEvent(s string) { + if ll.LockConfig.EventRecorder == nil { + return + } + events := fmt.Sprintf("%v %v", ll.LockConfig.Identity, s) + ll.LockConfig.EventRecorder.Eventf(&coordinationv1.Lease{ObjectMeta: ll.lease.ObjectMeta}, corev1.EventTypeNormal, "LeaderElection", events) +} + +// Describe is used to convert details on current resource lock +// into a string +func (ll *LeaseLock) Describe() string { + return fmt.Sprintf("%v/%v", ll.LeaseMeta.Namespace, ll.LeaseMeta.Name) +} + +// returns the Identity of the lock +func (ll *LeaseLock) Identity() string { + return ll.LockConfig.Identity +} + +func LeaseSpecToLeaderElectionRecord(spec *coordinationv1.LeaseSpec) *LeaderElectionRecord { + holderIdentity := "" + if spec.HolderIdentity != nil { + holderIdentity = *spec.HolderIdentity + } + leaseDurationSeconds := 0 + if spec.LeaseDurationSeconds != nil { + leaseDurationSeconds = int(*spec.LeaseDurationSeconds) + } + leaseTransitions := 0 + if spec.LeaseTransitions != nil { + leaseTransitions = int(*spec.LeaseTransitions) + } + return &LeaderElectionRecord{ + HolderIdentity: holderIdentity, + LeaseDurationSeconds: leaseDurationSeconds, + AcquireTime: metav1.Time{spec.AcquireTime.Time}, + RenewTime: metav1.Time{spec.RenewTime.Time}, + LeaderTransitions: leaseTransitions, + } +} + +func LeaderElectionRecordToLeaseSpec(ler *LeaderElectionRecord) coordinationv1.LeaseSpec { + leaseDurationSeconds := int32(ler.LeaseDurationSeconds) + leaseTransitions := int32(ler.LeaderTransitions) + return coordinationv1.LeaseSpec{ + HolderIdentity: &ler.HolderIdentity, + LeaseDurationSeconds: &leaseDurationSeconds, + AcquireTime: &metav1.MicroTime{ler.AcquireTime.Time}, + RenewTime: &metav1.MicroTime{ler.RenewTime.Time}, + LeaseTransitions: &leaseTransitions, + } +} From 92ea15b12b5d1c51b356e0be7d1d816ece3ea9e7 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Mon, 24 Feb 2020 12:13:35 -0500 Subject: [PATCH 3/8] Extract method for RunLeaderElected --- injection/sharedmain/main.go | 123 ++++++++++++++++++----------------- 1 file changed, 65 insertions(+), 58 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 188091df47..23f17fbe4d 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -183,75 +183,20 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } } - recorder := controller.GetEventRecorder(ctx) - if recorder == nil { - // Create event broadcaster - logger.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - watches := []watch.Interface{ - eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), - eventBroadcaster.StartRecordingToSink( - &typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events("")}), - } - recorder = eventBroadcaster.NewRecorder( - // todo: what agent name? - scheme.Scheme, corev1.EventSource{Component: "fake-agent-name"}) - go func() { - <-ctx.Done() - for _, w := range watches { - w.Stop() - } - }() - } - // Set up leader election config leaderElectionConfig, err := GetLeaderElectionConfig(ctx) if err != nil { - log.Fatalf("Error loading leader election configuration: %v", err) + logger.Fatalf("Error loading leader election configuration: %v", err) } leConfig := leaderElectionConfig.GetComponentConfig(component) if !leConfig.LeaderElect { - log.Printf("%v will not run in leader-elected mode", component) + logger.Infof("%v will not run in leader-elected mode", component) run(ctx) logger.Fatal("unreachable") } - // Create a unique identifier so that two controllers on the same host don't - // race. - id, err := kle.UniqueID() - if err != nil { - logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) - } - logger.Infof("%v will run in leader-elected mode with id %v", component, id) - - rl, err := resourcelock.New(leConfig.ResourceLock, - system.Namespace(), // use namespace we are running in - component, // component is used as the resource name - kubeclient.Get(ctx).CoreV1(), - kubeclient.Get(ctx).CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }) - if err != nil { - logger.Fatalw("Error creating lock: %v", err) - } - - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: leConfig.LeaseDuration, - RenewDeadline: leConfig.RenewDeadline, - RetryPeriod: leConfig.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - logger.Fatal("leaderelection lost") - }, - }, - // TODO: use health check watchdog, knative/pkg#1048 - Name: component, - }) + RunLeaderElected(ctx, logger, run, component, leConfig) logger.Fatal("unreachable") } @@ -464,3 +409,65 @@ func ControllersAndWebhooksFromCtors(ctx context.Context, return controllers, webhooks } + +// RunLeaderElected runs the given function in leader elected mode. The function +// will be run only once the leader election lock is obtained. +func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), component string, leConfig kle.ComponentConfig) { + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}), + } + recorder = eventBroadcaster.NewRecorder( + scheme.Scheme, corev1.EventSource{Component: component}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + }() + } + + // Create a unique identifier so that two controllers on the same host don't + // race. + id, err := kle.UniqueID() + if err != nil { + logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) + } + logger.Infof("%v will run in leader-elected mode with id %v", component, id) + + // rl is the resource used to hold the leader election lock. + rl, err := resourcelock.New(leConfig.ResourceLock, + system.Namespace(), // use namespace we are running in + component, // component is used as the resource name + kubeclient.Get(ctx).CoreV1(), + kubeclient.Get(ctx).CoordinationV1(), + resourcelock.ResourceLockConfig{ + Identity: id, + EventRecorder: recorder, + }) + if err != nil { + logger.Fatalw("Error creating lock: %v", err) + } + + // Execute the `run` function when we have the lock. + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: leConfig.LeaseDuration, + RenewDeadline: leConfig.RenewDeadline, + RetryPeriod: leConfig.RetryPeriod, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + logger.Fatal("leaderelection lost") + }, + }, + // TODO: use health check watchdog, knative/pkg#1048 + Name: component, + }) +} From 2c7427c21677aacdd882ab2623ef79b2f17f264b Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 26 Feb 2020 11:05:51 -0500 Subject: [PATCH 4/8] Make leader election config constructor validate --- leaderelection/leaderelection.go | 33 +++++- leaderelection/leaderelection_test.go | 152 ++++++++++++++++++-------- 2 files changed, 137 insertions(+), 48 deletions(-) diff --git a/leaderelection/leaderelection.go b/leaderelection/leaderelection.go index 4cccd868ff..320ac467ec 100644 --- a/leaderelection/leaderelection.go +++ b/leaderelection/leaderelection.go @@ -18,6 +18,7 @@ package leaderelection import ( "errors" + "fmt" "os" "strings" "time" @@ -29,40 +30,66 @@ import ( const ConfigMapNameEnv = "CONFIG_LEADERELECTION_NAME" -var errEmptyLeaderElectionConfig = errors.New("empty leader election configuration") +var ( + errEmptyLeaderElectionConfig = errors.New("empty leader election configuration") + validResourceLocks = sets.NewString("leases", "configmaps", "endpoints") +) // NewConfigFromMap returns a Config for the given map, or an error. func NewConfigFromMap(data map[string]string) (*Config, error) { - config := defaultConfig() + config := &Config{ + EnabledComponents: sets.NewString(), + } if resourceLock, ok := data["resourceLock"]; ok { + if !validResourceLocks.Has(resourceLock) { + return nil, fmt.Errorf("resourceLock: invalid value %q: valid values are \"leases\",\"configmaps\",\"endpoints\"", resourceLock) + } + config.ResourceLock = resourceLock + } else { + return nil, errors.New("resourceLock cannot be empty") } if leaseDurationStr, ok := data["leaseDuration"]; ok { if leaseDuration, err := time.ParseDuration(leaseDurationStr); err == nil { config.LeaseDuration = leaseDuration + } else { + return nil, fmt.Errorf("leaseDuration: invalid duration: %q", leaseDurationStr) } + } else { + return nil, errors.New("leaseDuration cannot be empty") } if renewDeadlineStr, ok := data["renewDeadline"]; ok { if renewDeadline, err := time.ParseDuration(renewDeadlineStr); err == nil { config.RenewDeadline = renewDeadline + } else { + return nil, fmt.Errorf("renewDeadline: invalid duration: %q", renewDeadlineStr) } + } else { + return nil, errors.New("renewDeadline cannot be empty") } if retryPeriodStr, ok := data["retryPeriod"]; ok { if retryPeriod, err := time.ParseDuration(retryPeriodStr); err == nil { config.RetryPeriod = retryPeriod + } else { + return nil, fmt.Errorf("retryPeriod: invalid duration: %q", retryPeriodStr) } + } else { + return nil, errors.New("retryPeriod cannot be empty") } + // enabledComponents are not validated here, because they are dependent on + // the component. Components should provide additional validation for this + // field. if enabledComponents, ok := data["enabledComponents"]; ok { tokens := strings.Split(enabledComponents, ",") config.EnabledComponents = sets.NewString(tokens...) } - return &config, nil + return config, nil } // NewConfigFromConfigMap returns a new Config from the given ConfigMap. diff --git a/leaderelection/leaderelection_test.go b/leaderelection/leaderelection_test.go index 301e696036..692d427695 100644 --- a/leaderelection/leaderelection_test.go +++ b/leaderelection/leaderelection_test.go @@ -17,6 +17,7 @@ limitations under the License. package leaderelection import ( + "errors" "reflect" "testing" "time" @@ -24,6 +25,29 @@ import ( "k8s.io/apimachinery/pkg/util/sets" ) +func okConfig() *Config { + return &Config{ + ResourceLock: "leases", + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + EnabledComponents: sets.NewString(), + } +} + +func okData() map[string]string { + return map[string]string{ + "resourceLock": "leases", + // values in this data come from the defaults suggested in the + // code: + // https://github.com/kubernetes/client-go/blob/kubernetes-1.16.0/tools/leaderelection/leaderelection.go + "leaseDuration": "15s", + "renewDeadline": "10s", + "retryPeriod": "2s", + "enabledComponents": "controller", + } +} + func TestNewConfigMapFromData(t *testing.T) { cases := []struct { name string @@ -33,55 +57,93 @@ func TestNewConfigMapFromData(t *testing.T) { }{ { name: "disabled but OK config", - data: map[string]string{ - "resourceLock": "leases", - // values in this data come from the defaults suggested in the - // code: - // https://github.com/kubernetes/client-go/blob/kubernetes-1.16.0/tools/leaderelection/leaderelection.go - "leaseDuration": "15s", - "renewDeadline": "10s", - "retryPeriod": "2s", - }, - expected: &Config{ - ResourceLock: "leases", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString(), - }, + data: func() map[string]string { + data := okData() + delete(data, "enabledComponents") + return data + }(), + expected: okConfig(), }, { - name: "OK config with controller enabled", - data: map[string]string{ - "resourceLock": "leases", - "leaseDuration": "15s", - "renewDeadline": "10s", - "retryPeriod": "2s", - "enabledComponents": "controller", - }, - expected: &Config{ - ResourceLock: "leases", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString("controller"), - }, + name: "OK config - controller enabled", + data: okData(), + expected: func() *Config { + config := okConfig() + config.EnabledComponents.Insert("controller") + return config + }(), }, { - name: "config missing resourceLock field", - data: map[string]string{ - "leaseDuration": "15s", - "renewDeadline": "10s", - "retryPeriod": "2s", - "enabledComponents": "controller", - }, - expected: &Config{ - ResourceLock: "leases", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - EnabledComponents: sets.NewString("controller"), - }, + name: "missing resourceLock", + data: func() map[string]string { + data := okData() + delete(data, "resourceLock") + return data + }(), + err: errors.New("resourceLock cannot be empty"), + }, + { + name: "invalid resourceLock", + data: func() map[string]string { + data := okData() + data["resourceLock"] = "flarps" + return data + }(), + err: errors.New(`resourceLock: invalid value "flarps": valid values are "leases","configmaps","endpoints"`), + }, + { + name: "missing leaseDuration", + data: func() map[string]string { + data := okData() + delete(data, "leaseDuration") + return data + }(), + err: errors.New("leaseDuration cannot be empty"), + }, + { + name: "invalid leaseDuration", + data: func() map[string]string { + data := okData() + data["leaseDuration"] = "flops" + return data + }(), + err: errors.New(`leaseDuration: invalid duration: "flops"`), + }, + { + name: "missing renewDeadline", + data: func() map[string]string { + data := okData() + delete(data, "renewDeadline") + return data + }(), + err: errors.New("renewDeadline cannot be empty"), + }, + { + name: "invalid renewDeadline", + data: func() map[string]string { + data := okData() + data["renewDeadline"] = "flops" + return data + }(), + err: errors.New(`renewDeadline: invalid duration: "flops"`), + }, + { + name: "missing retryPeriod", + data: func() map[string]string { + data := okData() + delete(data, "retryPeriod") + return data + }(), + err: errors.New("retryPeriod cannot be empty"), + }, + { + name: "invalid retryPeriod", + data: func() map[string]string { + data := okData() + data["retryPeriod"] = "flops" + return data + }(), + err: errors.New(`retryPeriod: invalid duration: "flops"`), }, } From 7aba263af42035ca1104bacd083774537f56eef0 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 26 Feb 2020 11:08:08 -0500 Subject: [PATCH 5/8] Rename leader election files --- leaderelection/{leaderelection.go => config.go} | 0 leaderelection/{leaderelection_test.go => config_test.go} | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename leaderelection/{leaderelection.go => config.go} (100%) rename leaderelection/{leaderelection_test.go => config_test.go} (100%) diff --git a/leaderelection/leaderelection.go b/leaderelection/config.go similarity index 100% rename from leaderelection/leaderelection.go rename to leaderelection/config.go diff --git a/leaderelection/leaderelection_test.go b/leaderelection/config_test.go similarity index 100% rename from leaderelection/leaderelection_test.go rename to leaderelection/config_test.go From fb698c248a2ca7c4fd63888de0e831d30d97b2b7 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 26 Feb 2020 12:00:54 -0500 Subject: [PATCH 6/8] Always start profiling server whether component has LE lock or not --- injection/sharedmain/main.go | 34 ++++++++++++++++------------------ 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 23f17fbe4d..cdb3c11c39 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -107,9 +107,9 @@ func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) { if err != nil { if apierrors.IsNotFound(err) { return kle.NewConfigFromMap(nil) - } else { - return nil, err } + + return nil, err } return kle.NewConfigFromConfigMap(leaderElectionConfigMap) @@ -148,7 +148,20 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto defer flush(logger) ctx = logging.WithLogger(ctx, logger) profilingHandler := profiling.NewHandler(logger, false) + profilingServer := profiling.NewServer(profilingHandler) + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(profilingServer.ListenAndServe) + go func() { + // This will block until either a signal arrives or one of the grouped functions + // returns an error. + <-egCtx.Done() + profilingServer.Shutdown(context.Background()) + // Don't forward ErrServerClosed as that indicates we're already shutting down. + if err := eg.Wait(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Error while running server", zap.Error(err)) + } + }() CheckK8sClientMinimumVersionOrDie(ctx, logger) run := func(ctx context.Context) { @@ -167,20 +180,6 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) - - profilingServer := profiling.NewServer(profilingHandler) - eg, egCtx := errgroup.WithContext(ctx) - eg.Go(profilingServer.ListenAndServe) - - // This will block until either a signal arrives or one of the grouped functions - // returns an error. - <-egCtx.Done() - - profilingServer.Shutdown(context.Background()) - // Don't forward ErrServerClosed as that indicates we're already shutting down. - if err := eg.Wait(); err != nil && err != http.ErrServerClosed { - logger.Errorw("Error while running server", zap.Error(err)) - } } // Set up leader election config @@ -226,6 +225,7 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf defer flush(logger) ctx = logging.WithLogger(ctx, logger) profilingHandler := profiling.NewHandler(logger, false) + profilingServer := profiling.NewServer(profilingHandler) CheckK8sClientMinimumVersionOrDie(ctx, logger) cmw := SetupConfigMapWatchOrDie(ctx, logger) @@ -244,8 +244,6 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) - profilingServer := profiling.NewServer(profilingHandler) - eg, egCtx := errgroup.WithContext(ctx) eg.Go(profilingServer.ListenAndServe) From 51e0726ba54c1b0202afc69ee81b597972c73f28 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 26 Feb 2020 12:42:53 -0500 Subject: [PATCH 7/8] Fix entering unreachable section when leader election is disabled --- injection/sharedmain/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index cdb3c11c39..5d998d4879 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -180,6 +180,8 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto } logger.Info("Starting controllers...") go controller.StartAll(ctx.Done(), controllers...) + + <-ctx.Done() } // Set up leader election config From 29186e5165e0195d6660956cd5cf5c55c8e19616 Mon Sep 17 00:00:00 2001 From: Paul Morie Date: Wed, 26 Feb 2020 16:33:15 -0500 Subject: [PATCH 8/8] Address PR feedback --- injection/sharedmain/main.go | 7 ++---- leaderelection/config.go | 46 ++++++++++++----------------------- leaderelection/config_test.go | 8 +++--- 3 files changed, 21 insertions(+), 40 deletions(-) diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 5d998d4879..1b159338e5 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -157,7 +157,6 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto <-egCtx.Done() profilingServer.Shutdown(context.Background()) - // Don't forward ErrServerClosed as that indicates we're already shutting down. if err := eg.Wait(); err != nil && err != http.ErrServerClosed { logger.Errorw("Error while running server", zap.Error(err)) } @@ -194,11 +193,9 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto if !leConfig.LeaderElect { logger.Infof("%v will not run in leader-elected mode", component) run(ctx) - logger.Fatal("unreachable") + } else { + RunLeaderElected(ctx, logger, run, component, leConfig) } - - RunLeaderElected(ctx, logger, run, component, leConfig) - logger.Fatal("unreachable") } // WebhookMainWithContext runs the generic main flow for controllers and diff --git a/leaderelection/config.go b/leaderelection/config.go index 320ac467ec..1e54705786 100644 --- a/leaderelection/config.go +++ b/leaderelection/config.go @@ -41,44 +41,28 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { EnabledComponents: sets.NewString(), } - if resourceLock, ok := data["resourceLock"]; ok { - if !validResourceLocks.Has(resourceLock) { - return nil, fmt.Errorf("resourceLock: invalid value %q: valid values are \"leases\",\"configmaps\",\"endpoints\"", resourceLock) - } - - config.ResourceLock = resourceLock + if resourceLock := data["resourceLock"]; !validResourceLocks.Has(resourceLock) { + return nil, fmt.Errorf(`resourceLock: invalid value %q: valid values are "leases","configmaps","endpoints"`, resourceLock) } else { - return nil, errors.New("resourceLock cannot be empty") + config.ResourceLock = resourceLock } - if leaseDurationStr, ok := data["leaseDuration"]; ok { - if leaseDuration, err := time.ParseDuration(leaseDurationStr); err == nil { - config.LeaseDuration = leaseDuration - } else { - return nil, fmt.Errorf("leaseDuration: invalid duration: %q", leaseDurationStr) - } + if leaseDuration, err := time.ParseDuration(data["leaseDuration"]); err != nil { + return nil, fmt.Errorf("leaseDuration: invalid duration: %q", data["leaseDuration"]) } else { - return nil, errors.New("leaseDuration cannot be empty") + config.LeaseDuration = leaseDuration } - if renewDeadlineStr, ok := data["renewDeadline"]; ok { - if renewDeadline, err := time.ParseDuration(renewDeadlineStr); err == nil { - config.RenewDeadline = renewDeadline - } else { - return nil, fmt.Errorf("renewDeadline: invalid duration: %q", renewDeadlineStr) - } + if renewDeadline, err := time.ParseDuration(data["renewDeadline"]); err != nil { + return nil, fmt.Errorf("renewDeadline: invalid duration: %q", data["renewDeadline"]) } else { - return nil, errors.New("renewDeadline cannot be empty") + config.RenewDeadline = renewDeadline } - if retryPeriodStr, ok := data["retryPeriod"]; ok { - if retryPeriod, err := time.ParseDuration(retryPeriodStr); err == nil { - config.RetryPeriod = retryPeriod - } else { - return nil, fmt.Errorf("retryPeriod: invalid duration: %q", retryPeriodStr) - } + if retryPeriod, err := time.ParseDuration(data["retryPeriod"]); err != nil { + return nil, fmt.Errorf("retryPeriod: invalid duration: %q", data["retryPeriod"]) } else { - return nil, errors.New("retryPeriod cannot be empty") + config.RetryPeriod = retryPeriod } // enabledComponents are not validated here, because they are dependent on @@ -96,7 +80,7 @@ func NewConfigFromMap(data map[string]string) (*Config, error) { func NewConfigFromConfigMap(configMap *corev1.ConfigMap) (*Config, error) { if configMap == nil { config := defaultConfig() - return &config, nil + return config, nil } return NewConfigFromMap(configMap.Data) @@ -127,8 +111,8 @@ func (c *Config) GetComponentConfig(name string) ComponentConfig { return defaultComponentConfig() } -func defaultConfig() Config { - return Config{ +func defaultConfig() *Config { + return &Config{ ResourceLock: "leases", LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, diff --git a/leaderelection/config_test.go b/leaderelection/config_test.go index 692d427695..72624463c7 100644 --- a/leaderelection/config_test.go +++ b/leaderelection/config_test.go @@ -80,7 +80,7 @@ func TestNewConfigMapFromData(t *testing.T) { delete(data, "resourceLock") return data }(), - err: errors.New("resourceLock cannot be empty"), + err: errors.New(`resourceLock: invalid value "": valid values are "leases","configmaps","endpoints"`), }, { name: "invalid resourceLock", @@ -98,7 +98,7 @@ func TestNewConfigMapFromData(t *testing.T) { delete(data, "leaseDuration") return data }(), - err: errors.New("leaseDuration cannot be empty"), + err: errors.New(`leaseDuration: invalid duration: ""`), }, { name: "invalid leaseDuration", @@ -116,7 +116,7 @@ func TestNewConfigMapFromData(t *testing.T) { delete(data, "renewDeadline") return data }(), - err: errors.New("renewDeadline cannot be empty"), + err: errors.New(`renewDeadline: invalid duration: ""`), }, { name: "invalid renewDeadline", @@ -134,7 +134,7 @@ func TestNewConfigMapFromData(t *testing.T) { delete(data, "retryPeriod") return data }(), - err: errors.New("retryPeriod cannot be empty"), + err: errors.New(`retryPeriod: invalid duration: ""`), }, { name: "invalid retryPeriod",