From 909e778e7607f4e8bb60ed801f40fb9a54fb3162 Mon Sep 17 00:00:00 2001 From: Stefan Prodan Date: Tue, 28 Mar 2023 15:10:03 +0300 Subject: [PATCH] runtime: Add reconciler sharding capability based on label selector Add `--watch-label-selector` flag to runtime. When specified the reconcilers will only watch for changes of those resources with matching labels. This enables horizontal scaling of Flux controller, where each controller can be deployed multiple times with a unique label selector which is used as the sharding key. Signed-off-by: Stefan Prodan --- runtime/controller/watch.go | 74 ++++++++++++++++ runtime/controller/watch_test.go | 140 +++++++++++++++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 runtime/controller/watch.go create mode 100644 runtime/controller/watch_test.go diff --git a/runtime/controller/watch.go b/runtime/controller/watch.go new file mode 100644 index 000000000..0ea7322c7 --- /dev/null +++ b/runtime/controller/watch.go @@ -0,0 +1,74 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" +) + +const ( + flagWatchWatchAllNamespaces = "watch-all-namespaces" + flagWatchLabelSelector = "watch-label-selector" +) + +// WatchOptions defines the configurable options for reconciler resources watcher. +type WatchOptions struct { + // AllNamespaces defines the watch filter at namespace level. + // If set to false, the reconciler will only watch the runtime namespace for resource changes. + AllNamespaces bool + + // LabelSelector defines the watch filter based on matching label expressions. + // When set, the reconciler will only watch for changes of those resources with matching labels. + // Docs: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#list-and-watch-filtering. + LabelSelector string +} + +// BindFlags will parse the given pflag.FlagSet for the controller and +// set the WatchOptions accordingly. +func (o *WatchOptions) BindFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.AllNamespaces, flagWatchWatchAllNamespaces, true, + "Watch for resources in all namespaces, if set to false it will only watch the runtime namespace.") + fs.StringVar(&o.LabelSelector, flagWatchLabelSelector, "", + "Watch for resources with matching labels e.g. 'sharding.fluxcd.io/shard=shard1'.") +} + +// GetWatchLabelSelector parses the label selector option from WatchOptions +// and returns an error if the expression is invalid. +func GetWatchLabelSelector(opts WatchOptions) (*metav1.LabelSelector, error) { + if opts.LabelSelector == "" { + return nil, nil + } + + return metav1.ParseToLabelSelector(opts.LabelSelector) +} + +// GetWatchSelector parses the label selector option from WatchOptions and returns the label selector. +// If the WatchOptions contain no selectors, then a match everything is returned. +func GetWatchSelector(opts WatchOptions) (labels.Selector, error) { + ls, err := GetWatchLabelSelector(opts) + if err != nil { + return nil, err + } + + if ls == nil { + return labels.Everything(), nil + } + + return metav1.LabelSelectorAsSelector(ls) +} diff --git a/runtime/controller/watch_test.go b/runtime/controller/watch_test.go new file mode 100644 index 000000000..5869152f9 --- /dev/null +++ b/runtime/controller/watch_test.go @@ -0,0 +1,140 @@ +/* +Copyright 2023 The Flux authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "testing" + + . "github.com/onsi/gomega" + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func Test_WatchOptions_BindFlags(t *testing.T) { + objects := []client.Object{ + &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "t0", + }, + }, + &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "t1", + Labels: map[string]string{ + "sharding.fluxcd.io/shard": "shard1", + }, + }, + }, + &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + Kind: "ConfigMap", + APIVersion: "v1", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "t2", + Labels: map[string]string{ + "sharding.fluxcd.io/shard": "shard2", + }, + }, + }, + } + + tests := []struct { + name string + commandLine []string + objects []client.Object + expectedMatch []string + }{ + { + name: "empty flag selects objects", + commandLine: []string{""}, + objects: objects, + expectedMatch: []string{"t0", "t1", "t2"}, + }, + { + name: "flag selects objects by label key val", + commandLine: []string{"--watch-label-selector=sharding.fluxcd.io/shard=shard1"}, + objects: objects, + expectedMatch: []string{"t1"}, + }, + { + name: "flag selects objects by label exclusion expression", + commandLine: []string{"--watch-label-selector=sharding.fluxcd.io/shard, sharding.fluxcd.io/shard notin (shard1)"}, + objects: objects, + expectedMatch: []string{"t2"}, + }, + { + name: "flag selects objects by label inclusion expression", + commandLine: []string{"--watch-label-selector=sharding.fluxcd.io/shard in (shard1, shard2)"}, + objects: objects, + expectedMatch: []string{"t1", "t2"}, + }, + { + name: "flag selects objects with no matching labels", + commandLine: []string{"--watch-label-selector=sharding.fluxcd.io/shard notin (shard1, shard2)"}, + objects: objects, + expectedMatch: []string{"t0"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + g := NewWithT(t) + + f := pflag.NewFlagSet("test", pflag.ContinueOnError) + opts := WatchOptions{} + opts.BindFlags(f) + + err := f.Parse(tt.commandLine) + g.Expect(err).NotTo(HaveOccurred()) + + sel, err := GetWatchSelector(opts) + g.Expect(err).NotTo(HaveOccurred()) + + for _, object := range tt.objects { + if sel.Matches(labels.Set(object.GetLabels())) { + var found bool + for _, match := range tt.expectedMatch { + if object.GetName() == match { + found = true + } + } + g.Expect(found).To(BeTrue()) + } else { + var found bool + for _, match := range tt.expectedMatch { + if object.GetName() == match { + found = true + } + } + g.Expect(found).ToNot(BeTrue()) + } + } + }) + } +}