diff --git a/leader/leader.go b/leader/leader.go index 2d1c895..55f3778 100644 --- a/leader/leader.go +++ b/leader/leader.go @@ -31,34 +31,53 @@ import ( logf "sigs.k8s.io/controller-runtime/pkg/log" ) -type runModeType string - -const ( - localRunMode runModeType = "local" -) - -// forceRunModeEnv indicates if the operator should be forced to run in either local -// or cluster mode (currently only used for local mode) -var forceRunModeEnv = "OSDK_FORCE_RUN_MODE" - -// errNoNamespace indicates that a namespace could not be found for the current +// ErrNoNamespace indicates that a namespace could not be found for the current // environment -var errNoNamespace = fmt.Errorf("namespace not found for current environment") - -// errRunLocal indicates that the operator is set to run in local mode (this error -// is returned by functions that only work on operators running in cluster mode) -var errRunLocal = fmt.Errorf("operator run mode forced to local") +var ErrNoNamespace = fmt.Errorf("namespace not found for current environment") // podNameEnvVar is the constant for env variable POD_NAME // which is the name of the current pod. const podNameEnvVar = "POD_NAME" +var readNamespace = func() ([]byte, error) { + return ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") +} + var log = logf.Log.WithName("leader") // maxBackoffInterval defines the maximum amount of time to wait between // attempts to become the leader. const maxBackoffInterval = time.Second * 16 +type Option func(*Config) error + +type Config struct { + Client crclient.Client +} + +func (c *Config) setDefaults() error { + if c.Client == nil { + config, err := config.GetConfig() + if err != nil { + return err + } + + client, err := crclient.New(config, crclient.Options{}) + if err != nil { + return err + } + c.Client = client + } + return nil +} + +func WithClient(cl crclient.Client) Option { + return func(c *Config) error { + c.Client = cl + return nil + } +} + // Become ensures that the current pod is the leader within its namespace. If // run outside a cluster, it will skip leader election and return nil. It // continuously tries to create a ConfigMap with the provided name and the @@ -66,29 +85,27 @@ const maxBackoffInterval = time.Second * 16 // the same name, so the pod that successfully creates the ConfigMap is the // leader. Upon termination of that pod, the garbage collector will delete the // ConfigMap, enabling a different pod to become the leader. -func Become(ctx context.Context, lockName string) error { +func Become(ctx context.Context, lockName string, opts ...Option) error { log.Info("Trying to become the leader.") - ns, err := getOperatorNamespace() - if err != nil { - if err == errNoNamespace || err == errRunLocal { - log.Info("Skipping leader election; not running in a cluster.") - return nil + config := Config{} + + for _, opt := range opts { + if err := opt(&config); err != nil { + return err } - return err } - config, err := config.GetConfig() - if err != nil { + if err := config.setDefaults(); err != nil { return err } - client, err := crclient.New(config, crclient.Options{}) + ns, err := getOperatorNamespace() if err != nil { return err } - owner, err := myOwnerRef(ctx, client, ns) + owner, err := myOwnerRef(ctx, config.Client, ns) if err != nil { return err } @@ -96,7 +113,7 @@ func Become(ctx context.Context, lockName string) error { // check for existing lock from this pod, in case we got restarted existing := &corev1.ConfigMap{} key := crclient.ObjectKey{Namespace: ns, Name: lockName} - err = client.Get(ctx, key, existing) + err = config.Client.Get(ctx, key, existing) switch { case err == nil: @@ -126,7 +143,7 @@ func Become(ctx context.Context, lockName string) error { // try to create a lock backoff := time.Second for { - err := client.Create(ctx, cm) + err := config.Client.Create(ctx, cm) switch { case err == nil: log.Info("Became the leader.") @@ -134,7 +151,7 @@ func Become(ctx context.Context, lockName string) error { case apierrors.IsAlreadyExists(err): // refresh the lock so we use current leader key := crclient.ObjectKey{Namespace: ns, Name: lockName} - if err := client.Get(ctx, key, existing); err != nil { + if err := config.Client.Get(ctx, key, existing); err != nil { log.Info("Leader lock configmap not found.") continue // configmap got lost ... just wait a bit } @@ -148,7 +165,7 @@ func Become(ctx context.Context, lockName string) error { default: leaderPod := &corev1.Pod{} key = crclient.ObjectKey{Namespace: ns, Name: existingOwners[0].Name} - err = client.Get(ctx, key, leaderPod) + err = config.Client.Get(ctx, key, leaderPod) switch { case apierrors.IsNotFound(err): log.Info("Leader pod has been deleted, waiting for garbage collection to remove the lock.") @@ -158,7 +175,7 @@ func Become(ctx context.Context, lockName string) error { log.Info("Operator pod with leader lock has been evicted.", "leader", leaderPod.Name) log.Info("Deleting evicted leader.") // Pod may not delete immediately, continue with backoff - err := client.Delete(ctx, leaderPod) + err := config.Client.Delete(ctx, leaderPod) if err != nil { log.Error(err, "Leader pod could not be deleted.") } @@ -210,13 +227,10 @@ func isPodEvicted(pod corev1.Pod) bool { // getOperatorNamespace returns the namespace the operator should be running in. func getOperatorNamespace() (string, error) { - if isRunModeLocal() { - return "", errRunLocal - } - nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + nsBytes, err := readNamespace() if err != nil { if os.IsNotExist(err) { - return "", errNoNamespace + return "", ErrNoNamespace } return "", err } @@ -225,17 +239,10 @@ func getOperatorNamespace() (string, error) { return ns, nil } -func isRunModeLocal() bool { - return os.Getenv(forceRunModeEnv) == string(localRunMode) -} - // getPod returns a Pod object that corresponds to the pod in which the code // is currently running. // It expects the environment variable POD_NAME to be set by the downwards API. func getPod(ctx context.Context, client crclient.Client, ns string) (*corev1.Pod, error) { - if isRunModeLocal() { - return nil, errRunLocal - } podName := os.Getenv(podNameEnvVar) if podName == "" { return nil, fmt.Errorf("required env %s not set, please configure downward API", podNameEnvVar) diff --git a/leader/leader_suite_test.go b/leader/leader_suite_test.go new file mode 100644 index 0000000..9c23d3c --- /dev/null +++ b/leader/leader_suite_test.go @@ -0,0 +1,13 @@ +package leader + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestLeader(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Leader Suite") +} diff --git a/leader/leader_test.go b/leader/leader_test.go new file mode 100644 index 0000000..acf85b4 --- /dev/null +++ b/leader/leader_test.go @@ -0,0 +1,198 @@ +package leader + +import ( + "context" + "errors" + "os" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + crclient "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +var _ = Describe("Leader election", func() { + + Describe("Become", func() { + var ( + client crclient.Client + ) + BeforeEach(func() { + client = fake.NewFakeClient( + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "leader-test", + Namespace: "testns", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "leader-test", + }, + }, + }, + }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "leader-test", + Namespace: "testns", + OwnerReferences: []metav1.OwnerReference{ + { + APIVersion: "v1", + Kind: "Pod", + Name: "leader-test", + }, + }, + }, + }, + ) + }) + It("should return an error when POD_NAME is not set", func() { + os.Unsetenv("POD_NAME") + err := Become(context.TODO(), "leader-test") + Expect(err).ShouldNot(BeNil()) + }) + It("should return an ErrNoNamespace", func() { + os.Setenv("POD_NAME", "leader-test") + readNamespace = func() ([]byte, error) { + return nil, os.ErrNotExist + } + err := Become(context.TODO(), "leader-test", WithClient(client)) + Expect(err).ShouldNot(BeNil()) + Expect(err).To(Equal(ErrNoNamespace)) + Expect(errors.Is(err, ErrNoNamespace)).To(Equal(true)) + }) + It("should not return an error", func() { + os.Setenv("POD_NAME", "leader-test") + readNamespace = func() ([]byte, error) { + return []byte("testns"), nil + } + + err := Become(context.TODO(), "leader-test", WithClient(client)) + Expect(err).Should(BeNil()) + }) + }) + Describe("isPodEvicted", func() { + var ( + leaderPod *corev1.Pod + ) + BeforeEach(func() { + leaderPod = &corev1.Pod{} + }) + It("should return false with an empty status", func() { + Expect(isPodEvicted(*leaderPod)).To(Equal(false)) + }) + It("should return false if reason is incorrect", func() { + leaderPod.Status.Phase = corev1.PodFailed + leaderPod.Status.Reason = "invalid" + Expect(isPodEvicted(*leaderPod)).To(Equal(false)) + }) + It("should return false if pod is in the wrong phase", func() { + leaderPod.Status.Phase = corev1.PodRunning + Expect(isPodEvicted(*leaderPod)).To(Equal(false)) + }) + It("should return true when Phase and Reason are set", func() { + leaderPod.Status.Phase = corev1.PodFailed + leaderPod.Status.Reason = "Evicted" + Expect(isPodEvicted(*leaderPod)).To(Equal(true)) + }) + }) + Describe("getOperatorNamespace", func() { + It("should return error when namespace not found", func() { + readNamespace = func() ([]byte, error) { + return nil, os.ErrNotExist + } + namespace, err := getOperatorNamespace() + Expect(err).To(Equal(ErrNoNamespace)) + Expect(namespace).To(Equal("")) + }) + It("should return namespace", func() { + readNamespace = func() ([]byte, error) { + return []byte("testnamespace"), nil + } + + // test + namespace, err := getOperatorNamespace() + Expect(err).Should(BeNil()) + Expect(namespace).To(Equal("testnamespace")) + }) + It("should trim whitespace from namespace", func() { + readNamespace = func() ([]byte, error) { + return []byte(" testnamespace "), nil + } + + // test + namespace, err := getOperatorNamespace() + Expect(err).Should(BeNil()) + Expect(namespace).To(Equal("testnamespace")) + }) + }) + Describe("myOwnerRef", func() { + var ( + client crclient.Client + ) + BeforeEach(func() { + client = fake.NewFakeClient( + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + Namespace: "testns", + }, + }, + ) + }) + It("should return an error when POD_NAME is not set", func() { + os.Unsetenv("POD_NAME") + _, err := myOwnerRef(context.TODO(), client, "") + Expect(err).ShouldNot(BeNil()) + }) + It("should return an error if no pod is found", func() { + os.Setenv("POD_NAME", "thisisnotthepodyourelookingfor") + _, err := myOwnerRef(context.TODO(), client, "") + Expect(err).ShouldNot(BeNil()) + }) + It("should return the owner reference without error", func() { + os.Setenv("POD_NAME", "mypod") + owner, err := myOwnerRef(context.TODO(), client, "testns") + Expect(err).Should(BeNil()) + Expect(owner.APIVersion).To(Equal("v1")) + Expect(owner.Kind).To(Equal("Pod")) + Expect(owner.Name).To(Equal("mypod")) + }) + }) + Describe("getPod", func() { + var ( + client crclient.Client + ) + BeforeEach(func() { + client = fake.NewFakeClient( + &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "mypod", + Namespace: "testns", + }, + }, + ) + }) + It("should return an error when POD_NAME is not set", func() { + os.Unsetenv("POD_NAME") + _, err := getPod(context.TODO(), nil, "") + Expect(err).ShouldNot(BeNil()) + }) + It("should return an error if no pod is found", func() { + os.Setenv("POD_NAME", "thisisnotthepodyourelookingfor") + _, err := getPod(context.TODO(), client, "") + Expect(err).ShouldNot(BeNil()) + }) + It("should return the pod with the given name", func() { + os.Setenv("POD_NAME", "mypod") + pod, err := getPod(context.TODO(), client, "testns") + Expect(err).Should(BeNil()) + Expect(pod).ShouldNot(BeNil()) + Expect(pod.TypeMeta.APIVersion).To(Equal("v1")) + Expect(pod.TypeMeta.Kind).To(Equal("Pod")) + }) + }) +})