From da2b9b24c5c0d65bb1e43fe30ab2be7345e13c10 Mon Sep 17 00:00:00 2001 From: Alexander Greene Date: Mon, 18 Apr 2022 10:15:32 -0700 Subject: [PATCH 1/2] Identify fail forward in csvSources (#2743) This commit undos changes to the Resolve and ResolveSteps methods and updats the csvSourceProvider to infer whether or not fail forward is enabled in a namespace. Signed-off-by: Alexander Greene Upstream-repository: operator-lifecycle-manager Upstream-commit: 462ce61fb6b0ac4e5c623d173bd38fed18b6468f --- .../controller/operators/catalog/operator.go | 19 +-- .../operators/catalog/operator_test.go | 2 +- .../operators/catalog/subscriptions_test.go | 4 +- .../registry/resolver/fail_forward.go | 103 +++++++++++++++ .../resolver/instrumented_resolver.go | 4 +- .../resolver/instrumented_resolver_test.go | 8 +- .../controller/registry/resolver/resolver.go | 5 +- .../registry/resolver/resolver_test.go | 80 +----------- .../registry/resolver/source_csvs.go | 20 +++ .../registry/resolver/source_csvs_test.go | 17 +++ .../registry/resolver/step_resolver.go | 120 +----------------- .../registry/resolver/step_resolver_test.go | 54 +++++--- .../pkg/fakes/fake_resolver.go | 18 ++- .../controller/operators/catalog/operator.go | 19 +-- .../registry/resolver/fail_forward.go | 103 +++++++++++++++ .../resolver/instrumented_resolver.go | 4 +- .../controller/registry/resolver/resolver.go | 5 +- .../registry/resolver/source_csvs.go | 20 +++ .../registry/resolver/step_resolver.go | 120 +----------------- 19 files changed, 337 insertions(+), 388 deletions(-) create mode 100644 staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go create mode 100644 vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index da2994ac3c..93b298cc2c 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -42,7 +42,6 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/operator-framework/api/pkg/operators/reference" - operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" @@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return } -func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) { - ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything()) - if err != nil { - o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err) - // Couldn't list operatorGroups, assuming default upgradeStrategy - // so existing behavior is observed for failed CSVs. - return false, nil - } - if len(ogs) != 1 { - return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace) - } - return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil -} - func (o *Operator) syncResolvingNamespace(obj interface{}) error { ns, ok := obj.(*corev1.Namespace) if !ok { @@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } - failForwardEnabled, err := o.isFailForwardEnabled(namespace) + failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace)) if err != nil { return err } @@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("resolving subscriptions in namespace") // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors - steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace) if err != nil { go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error()) // If the error is constraints not satisfiable, then simply project the diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go index 80d3bdfe40..5518bf1d3d 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go @@ -1254,7 +1254,7 @@ func TestSyncResolvingNamespace(t *testing.T) { o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time) o.resolver = &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, tt.fields.resolveErr }, } diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go index 028d944c35..6a813d88df 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go @@ -1031,7 +1031,7 @@ func TestSyncSubscriptions(t *testing.T) { o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time) o.resolver = &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return tt.fields.resolveSteps, tt.fields.bundleLookups, tt.fields.resolveSubs, tt.fields.resolveErr }, } @@ -1168,7 +1168,7 @@ func BenchmarkSyncResolvingNamespace(b *testing.B) { }, }, resolver: &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { steps := []*v1alpha1.Step{ { Resolving: "csv.v.2", diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go new file mode 100644 index 0000000000..ba522ba9c3 --- /dev/null +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go @@ -0,0 +1,103 @@ +package resolver + +import ( + "fmt" + + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" + operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns +// True if an operatorGroup exists in the namespace and its upgradeStrategy +// is set to UnsafeFailForward and false otherwise. An error is returned if +// an more than one operatorGroup exists in the namespace. +// No error is returned if no OperatorGroups are found to keep the resolver +// backwards compatible. +func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) { + ogs, err := ogLister.List(labels.Everything()) + if err != nil || len(ogs) == 0 { + return false, nil + } + if len(ogs) != 1 { + return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil +} + +type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error + +// WithCSVPhase returns an error if the CSV is not in the given phase. +func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption { + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + if csv == nil || csv.Status.Phase != phase { + return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) + } + return nil + } +} + +// WithUniqueCSVs returns an error if the CSV has been seen before. +func WithUniqueCSVs() walkOption { + visited := map[string]struct{}{} + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + // Check if we have visited the CSV before + if _, ok := visited[csv.GetName()]; ok { + return fmt.Errorf("csv %s/%s has already been seen", csv.GetNamespace(), csv.GetName()) + } + + visited[csv.GetName()] = struct{}{} + return nil + } +} + +// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns +// the last clusterServiceVersions in the replacement chain. An error is returned if any of the +// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement +// chain is detected. +func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) { + if csv == nil { + return nil, fmt.Errorf("csv cannot be nil") + } + + for { + // Check if there is a CSV that replaces this CSVs + next, ok := csvToReplacement[csv.GetName()] + if !ok { + break + } + + // Check walk options + for _, o := range options { + if err := o(csv); err != nil { + return nil, err + } + } + + // Move along replacement chain. + csv = next + } + return csv, nil +} + +// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other +// CSVs are in the replacing phase. +func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) { + lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) + if err != nil { + return false, err + } + return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil +} + +// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. +func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion { + replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{} + for _, csv := range csvs { + if csv.Spec.Replaces != "" { + replacementMapping[csv.Spec.Replaces] = csv + } + } + return replacementMapping +} diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go index 83e4cf9206..d55d67c4fc 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go @@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu } } -func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { start := time.Now() - steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace) if err != nil { ir.failureMetricsEmitter(time.Since(start)) } else { diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go index 8179a78168..47bf8d25f1 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go @@ -17,11 +17,11 @@ const ( type fakeResolverWithError struct{} type fakeResolverWithoutError struct{} -func (r *fakeResolverWithError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, errors.New("Fake error") } -func (r *fakeResolverWithoutError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, nil } @@ -45,7 +45,7 @@ func TestInstrumentedResolverFailure(t *testing.T) { } instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithError(), changeToSuccess, changeToFailure) - instrumentedResolver.ResolveSteps("", false) + instrumentedResolver.ResolveSteps("") require.Equal(t, len(result), 1) // check that only one call was made to a change function require.Equal(t, result[0], failure) // check that the call was made to changeToFailure function } @@ -62,7 +62,7 @@ func TestInstrumentedResolverSuccess(t *testing.T) { } instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithoutError(), changeToSuccess, changeToFailure) - instrumentedResolver.ResolveSteps("", false) + instrumentedResolver.ResolveSteps("") require.Equal(t, len(result), 1) // check that only one call was made to a change function require.Equal(t, result[0], success) // check that the call was made to changeToSuccess function } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 29b28ca34f..c19aba9f26 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) { return n, nil } -func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) { +func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) { var errs []error variables := make(map[solver.Identifier]solver.Variable) @@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e } preferredNamespace := namespaces[0] - existingEntryPredicates = append(existingEntryPredicates, cache.True()) - _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited) + _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited) if err != nil { return nil, err } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go index 9a80fe770c..8c95fb5078 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go @@ -191,85 +191,6 @@ func TestSolveOperators_WithSystemConstraints(t *testing.T) { } } -func WithInstalledCSV(sub *v1alpha1.Subscription, csvName string) *v1alpha1.Subscription { - sub.Status.InstalledCSV = csvName - return sub -} - -func TestSolveOperators_WithFailForward(t *testing.T) { - const namespace = "test-namespace" - catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace} - - packageASubV2 := newSub(namespace, "packageA", "alpha", catalog) - APISet := cache.APISet{opregistry.APIKey{Group: "g", Version: "v", Kind: "k", Plural: "ks"}: struct{}{}} - - // packageA provides an API - packageAV1 := genEntry("packageA.v1", "0.0.1", "", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - packageAV2 := genEntry("packageA.v2", "0.0.2", "packageA.v1", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - packageAV3 := genEntry("packageA.v3", "0.0.3", "packageA.v2", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - - existingPackageAV1 := existingOperator(namespace, "packageA.v1", "packageA", "alpha", "", APISet, nil, nil, nil) - existingPackageAV2 := existingOperator(namespace, "packageA.v2", "packageA", "alpha", "packageA.v1", APISet, nil, nil, nil) - - testCases := []struct { - title string - expectedOperators []*cache.Entry - csvs []*v1alpha1.ClusterServiceVersion - subs []*v1alpha1.Subscription - snapshotEntries []*cache.Entry - failForwardPredicates []cache.Predicate - err string - }{ - { - title: "Resolver fails if v1 and v2 provide the same APIs and v1 is not omitted from the resolver", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2}, - expectedOperators: nil, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - err: "provide k (g/v)", - }, - { - title: "Resolver succeeds if v1 and v2 provide the same APIs and v1 is omitted from the resolver", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2}, - expectedOperators: nil, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))}, - err: "", - }, - { - title: "Resolver succeeds if v1 and v2 provide the same APIs, v1 is omitted from the resolver, and an upgrade for v2 exists", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2, packageAV3}, - expectedOperators: []*cache.Entry{packageAV3}, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))}, - err: "", - }, - } - - for _, testCase := range testCases { - resolver := Resolver{ - cache: cache.New(cache.StaticSourceProvider{ - catalog: &cache.Snapshot{ - Entries: testCase.snapshotEntries, - }, - cache.NewVirtualSourceKey(namespace): csvSnapshotOrPanic(namespace, testCase.subs, testCase.csvs...), - }), - log: logrus.New(), - } - operators, err := resolver.Resolve([]string{namespace}, testCase.subs, testCase.failForwardPredicates...) - - if testCase.err != "" { - require.Error(t, err) - require.Containsf(t, err.Error(), testCase.err, "Test %s failed", testCase.title) - } else { - require.NoErrorf(t, err, "Test %s failed", testCase.title) - } - require.ElementsMatch(t, testCase.expectedOperators, operators, "Test %s failed", testCase.title) - } -} - func TestDisjointChannelGraph(t *testing.T) { const namespace = "test-namespace" catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace} @@ -1521,6 +1442,7 @@ func TestSolveOperators_TransferApiOwnership(t *testing.T) { key: cache.NewVirtualSourceKey(namespace), csvLister: &csvs, subLister: fakeSubscriptionLister(p.subs), + ogLister: fakeOperatorGroupLister{}, logger: logger, }, }), diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go index d62228f2a7..0ecd201d8d 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go @@ -8,6 +8,7 @@ import ( "github.com/blang/semver/v4" "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection" @@ -20,6 +21,7 @@ import ( type csvSourceProvider struct { csvLister v1alpha1listers.ClusterServiceVersionLister subLister v1alpha1listers.SubscriptionLister + ogLister v1listers.OperatorGroupLister logger logrus.StdLogger } @@ -30,6 +32,7 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey] key: cache.NewVirtualSourceKey(namespace), csvLister: csp.csvLister.ClusterServiceVersions(namespace), subLister: csp.subLister.Subscriptions(namespace), + ogLister: csp.ogLister.OperatorGroups(namespace), logger: csp.logger, } break // first ns is assumed to be the target ns, todo: make explicit @@ -41,6 +44,7 @@ type csvSource struct { key cache.SourceKey csvLister v1alpha1listers.ClusterServiceVersionNamespaceLister subLister v1alpha1listers.SubscriptionNamespaceLister + ogLister v1listers.OperatorGroupNamespaceLister logger logrus.StdLogger } @@ -55,6 +59,11 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { return nil, err } + failForwardEnabled, err := IsFailForwardEnabled(s.ogLister) + if err != nil { + return nil, err + } + // build a catalog snapshot of CSVs without subscriptions csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription) for _, sub := range subs { @@ -75,6 +84,17 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { if csv.IsCopied() { continue } + + if failForwardEnabled { + replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs)) + if err != nil { + return nil, err + } + if csv.Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { + continue + } + } + entry, err := newEntryFromV1Alpha1CSV(csv) if err != nil { return nil, err diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go index d39b26575b..22d226669e 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/labels" opver "github.com/operator-framework/api/pkg/lib/version" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-registry/pkg/api" @@ -450,6 +451,21 @@ func (f fakeSubscriptionLister) Get(name string) (*v1alpha1.Subscription, error) return nil, errors.NewNotFound(v1alpha1.SchemeGroupVersion.WithResource("subscriptions").GroupResource(), name) } +type fakeOperatorGroupLister []*operatorsv1.OperatorGroup + +func (f fakeOperatorGroupLister) List(selector labels.Selector) ([]*operatorsv1.OperatorGroup, error) { + return f, nil +} + +func (f fakeOperatorGroupLister) Get(name string) (*operatorsv1.OperatorGroup, error) { + for _, og := range f { + if og.Name == name { + return og, nil + } + } + return nil, errors.NewNotFound(operatorsv1.SchemeGroupVersion.WithResource("operatorgroups").GroupResource(), name) +} + func TestPropertiesAnnotationHonored(t *testing.T) { src := &csvSource{ csvLister: fakeCSVLister{ @@ -462,6 +478,7 @@ func TestPropertiesAnnotationHonored(t *testing.T) { }, }, subLister: fakeSubscriptionLister{}, + ogLister: fakeOperatorGroupLister{}, } ss, err := src.Snapshot(context.Background()) require.NoError(t, err) diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index e67d27e000..d68907588a 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -9,8 +9,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" @@ -29,7 +27,7 @@ const ( var initHooks []stepResolverInitHook type StepResolver interface { - ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) } type OperatorStepResolver struct { @@ -63,6 +61,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio &csvSourceProvider{ csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), subLister: lister.OperatorsV1alpha1().SubscriptionLister(), + ogLister: lister.OperatorsV1().OperatorGroupLister(), logger: log, }, }, @@ -86,125 +85,14 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio return stepResolver } -type walkOption func(csv *v1alpha1.ClusterServiceVersion) error - -func WithCSVPhase(phase v1alpha1.ClusterServiceVersionPhase) walkOption { - return func(csv *v1alpha1.ClusterServiceVersion) error { - if csv == nil || csv.Status.Phase != phase { - return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) - } - return nil - } -} - -func WithUniqueCSVs() walkOption { - visited := map[string]struct{}{} - return func(csv *v1alpha1.ClusterServiceVersion) error { - // Check if we have visited the CSV before - if _, ok := visited[csv.GetName()]; ok { - return fmt.Errorf("infinite replacement chain detected") - } - - visited[csv.GetName()] = struct{}{} - return nil - } -} - -// walkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns -// the last clusterServiceVersions in the replacement chain. An error is returned if any of the -// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement -// chain is detected. -func WalkReplacementChain(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion, options ...walkOption) (*v1alpha1.ClusterServiceVersion, error) { - if csv == nil { - return nil, fmt.Errorf("csv cannot be nil") - } - - for { - // Check if there is a CSV that replaces this CSVs - next, ok := csvToReplacement[csv.GetName()] - if !ok { - break - } - - // Check walk options - for _, o := range options { - if err := o(csv); err != nil { - return nil, err - } - } - - // Move along replacement chain. - csv = next - } - return csv, nil -} - -// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other -// CSVs are in the replacing phase. -func isReplacementChainThatEndsInFailure(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion) (bool, error) { - lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(v1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) - if err != nil { - return false, err - } - return (lastCSV != nil && lastCSV.Status.Phase == v1alpha1.CSVPhaseFailed), nil -} - -// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. -func ReplacementMapping(csvs []*v1alpha1.ClusterServiceVersion) map[string]*v1alpha1.ClusterServiceVersion { - replacementMapping := map[string]*v1alpha1.ClusterServiceVersion{} - for _, csv := range csvs { - if csv.Spec.Replaces != "" { - replacementMapping[csv.Spec.Replaces] = csv - } - } - return replacementMapping -} - -func (r *OperatorStepResolver) cachePredicates(namespace string) ([]cache.Predicate, error) { - nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{}) - if err != nil { - return nil, err - } - - csvs, err := r.csvLister.ClusterServiceVersions(namespace).List(labels.NewSelector().Add(*nonCopiedCSVRequirement)) - if err != nil { - return nil, err - } - - predicates := []cache.Predicate{} - for i := range csvs { - replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csvs[i], ReplacementMapping(csvs)) - if err != nil { - return nil, err - } - if csvs[i].Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { - predicates = append(predicates, cache.Not(cache.CSVNamePredicate(csvs[i].GetName()))) - } - } - - return predicates, nil -} - -func (r *OperatorStepResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { subs, err := r.listSubscriptions(namespace) if err != nil { return nil, nil, nil, err } - // The resolver considers the initial set of CSVs in the namespace by their appearance - // in the catalog cache. In order to support "fail forward" upgrades, we need to omit - // CSVs that are actively being replaced from this initial set of operators. The - // predicates defined here will omit these replacing CSVs from the set. - cachePredicates := []cache.Predicate{} - if failForwardEnabled { - cachePredicates, err = r.cachePredicates(namespace) - if err != nil { - r.log.Debugf("Unable to determine CSVs to exclude: %v", err) - } - } - namespaces := []string{namespace, r.globalCatalogNamespace} - operators, err := r.resolver.Resolve(namespaces, subs, cachePredicates...) + operators, err := r.resolver.Resolve(namespaces, subs) if err != nil { return nil, nil, nil, err } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go index be9e795082..50e39ee23b 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-registry/pkg/api" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -266,7 +267,7 @@ func TestIsReplacementChainThatEndsInFailure(t *testing.T) { }, expected: out{ b: false, - err: fmt.Errorf("infinite replacement chain detected"), + err: fmt.Errorf("csv bar/foo-v1 has already been seen"), }, }, } @@ -318,12 +319,12 @@ func TestResolver(t *testing.T) { solverError solver.NotSatisfiable } type resolverTest struct { - name string - clusterState []runtime.Object - bundlesByCatalog map[resolvercache.SourceKey][]*api.Bundle - out resolverTestOut - failForwardEnabled bool + name string + clusterState []runtime.Object + bundlesByCatalog map[resolvercache.SourceKey][]*api.Bundle + out resolverTestOut } + nothing := resolverTestOut{ steps: [][]*v1alpha1.Step{}, lookups: []v1alpha1.BundleLookup{}, @@ -1097,8 +1098,8 @@ func TestResolver(t *testing.T) { existingSub(namespace, "a.v2", "a", "alpha", catalog), existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1145,8 +1146,8 @@ func TestResolver(t *testing.T) { existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v3", "a", "alpha", "a.v2", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1169,8 +1170,8 @@ func TestResolver(t *testing.T) { existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), existingOperator(namespace, "a.v3", "a", "alpha", "a.v2", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1181,10 +1182,7 @@ func TestResolver(t *testing.T) { steps: [][]*v1alpha1.Step{}, subs: []*v1alpha1.Subscription{}, errAssert: func(t *testing.T, err error) { - assert.IsType(t, solver.NotSatisfiable{}, err) - assert.Contains(t, err.Error(), "constraints not satisfiable") - assert.Contains(t, err.Error(), "provide k (g/v)") - assert.Contains(t, err.Error(), "exists and is not referenced by a subscription") + assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv catsrc-namespace/a.v1 in phase Failed instead of Replacing") }, }, }, @@ -1194,8 +1192,8 @@ func TestResolver(t *testing.T) { existingSub(namespace, "a.v1", "a", "alpha", catalog), existingOperator(namespace, "b.v1", "b", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1227,6 +1225,7 @@ func TestResolver(t *testing.T) { lister := operatorlister.NewLister() lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister()) lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister()) + lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister()) ssp := make(resolvercache.StaticSourceProvider) for catalog, bundles := range tt.bundlesByCatalog { @@ -1245,6 +1244,7 @@ func TestResolver(t *testing.T) { key: resolvercache.NewVirtualSourceKey(namespace), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(namespace), subLister: lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace), + ogLister: lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace), logger: log, } satresolver := &Resolver{ @@ -1254,7 +1254,7 @@ func TestResolver(t *testing.T) { resolver := NewOperatorStepResolver(lister, clientFake, "", nil, log) resolver.resolver = satresolver - steps, lookups, subs, err := resolver.ResolveSteps(namespace, tt.failForwardEnabled) + steps, lookups, subs, err := resolver.ResolveSteps(namespace) if tt.out.solverError == nil { if tt.out.errAssert == nil { assert.NoError(t, err) @@ -1384,7 +1384,7 @@ func TestNamespaceResolverRBAC(t *testing.T) { } resolver := NewOperatorStepResolver(lister, clientFake, "", nil, logrus.New()) resolver.resolver = satresolver - steps, _, subs, err := resolver.ResolveSteps(namespace, tt.failForwardEnabled) + steps, _, subs, err := resolver.ResolveSteps(namespace) require.Equal(t, tt.out.err, err) requireStepsEqual(t, expectedSteps, steps) require.ElementsMatch(t, tt.out.subs, subs) @@ -1403,6 +1403,7 @@ func StartResolverInformers(namespace string, stopCh <-chan struct{}, objs ...ru informers := []cache.SharedIndexInformer{ nsInformerFactory.Operators().V1alpha1().Subscriptions().Informer(), nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Informer(), + nsInformerFactory.Operators().V1().OperatorGroups().Informer(), } for _, informer := range informers { @@ -1443,6 +1444,27 @@ func newSub(namespace, pkg, channel string, catalog resolvercache.SourceKey, opt return s } +type ogOption func(*operatorsv1.OperatorGroup) + +func withUpgradeStrategy(upgradeStrategy operatorsv1.UpgradeStrategy) ogOption { + return func(og *operatorsv1.OperatorGroup) { + og.Spec.UpgradeStrategy = upgradeStrategy + } +} + +func newOperatorGroup(name, namespace string, option ...ogOption) *operatorsv1.OperatorGroup { + og := &operatorsv1.OperatorGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, o := range option { + o(og) + } + return og +} + func updatedSub(namespace, currentOperatorName, installedOperatorName, pkg, channel string, catalog resolvercache.SourceKey, option ...subOption) *v1alpha1.Subscription { s := &v1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ diff --git a/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go b/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go index d24d5bc16f..3b85a209a1 100644 --- a/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go @@ -9,11 +9,10 @@ import ( ) type FakeStepResolver struct { - ResolveStepsStub func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveStepsStub func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) resolveStepsMutex sync.RWMutex resolveStepsArgsForCall []struct { arg1 string - arg2 bool } resolveStepsReturns struct { result1 []*v1alpha1.Step @@ -31,17 +30,16 @@ type FakeStepResolver struct { invocationsMutex sync.RWMutex } -func (fake *FakeStepResolver) ResolveSteps(arg1 string, arg2 bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (fake *FakeStepResolver) ResolveSteps(arg1 string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { fake.resolveStepsMutex.Lock() ret, specificReturn := fake.resolveStepsReturnsOnCall[len(fake.resolveStepsArgsForCall)] fake.resolveStepsArgsForCall = append(fake.resolveStepsArgsForCall, struct { arg1 string - arg2 bool - }{arg1, arg2}) - fake.recordInvocation("ResolveSteps", []interface{}{arg1, arg2}) + }{arg1}) + fake.recordInvocation("ResolveSteps", []interface{}{arg1}) fake.resolveStepsMutex.Unlock() if fake.ResolveStepsStub != nil { - return fake.ResolveStepsStub(arg1, arg2) + return fake.ResolveStepsStub(arg1) } if specificReturn { return ret.result1, ret.result2, ret.result3, ret.result4 @@ -56,17 +54,17 @@ func (fake *FakeStepResolver) ResolveStepsCallCount() int { return len(fake.resolveStepsArgsForCall) } -func (fake *FakeStepResolver) ResolveStepsCalls(stub func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error)) { +func (fake *FakeStepResolver) ResolveStepsCalls(stub func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error)) { fake.resolveStepsMutex.Lock() defer fake.resolveStepsMutex.Unlock() fake.ResolveStepsStub = stub } -func (fake *FakeStepResolver) ResolveStepsArgsForCall(i int) (string, bool) { +func (fake *FakeStepResolver) ResolveStepsArgsForCall(i int) string { fake.resolveStepsMutex.RLock() defer fake.resolveStepsMutex.RUnlock() argsForCall := fake.resolveStepsArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1 } func (fake *FakeStepResolver) ResolveStepsReturns(result1 []*v1alpha1.Step, result2 []v1alpha1.BundleLookup, result3 []*v1alpha1.Subscription, result4 error) { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index da2994ac3c..93b298cc2c 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -42,7 +42,6 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/operator-framework/api/pkg/operators/reference" - operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" @@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return } -func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) { - ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything()) - if err != nil { - o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err) - // Couldn't list operatorGroups, assuming default upgradeStrategy - // so existing behavior is observed for failed CSVs. - return false, nil - } - if len(ogs) != 1 { - return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace) - } - return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil -} - func (o *Operator) syncResolvingNamespace(obj interface{}) error { ns, ok := obj.(*corev1.Namespace) if !ok { @@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } - failForwardEnabled, err := o.isFailForwardEnabled(namespace) + failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace)) if err != nil { return err } @@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("resolving subscriptions in namespace") // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors - steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace) if err != nil { go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error()) // If the error is constraints not satisfiable, then simply project the diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go new file mode 100644 index 0000000000..ba522ba9c3 --- /dev/null +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go @@ -0,0 +1,103 @@ +package resolver + +import ( + "fmt" + + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" + operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns +// True if an operatorGroup exists in the namespace and its upgradeStrategy +// is set to UnsafeFailForward and false otherwise. An error is returned if +// an more than one operatorGroup exists in the namespace. +// No error is returned if no OperatorGroups are found to keep the resolver +// backwards compatible. +func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) { + ogs, err := ogLister.List(labels.Everything()) + if err != nil || len(ogs) == 0 { + return false, nil + } + if len(ogs) != 1 { + return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil +} + +type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error + +// WithCSVPhase returns an error if the CSV is not in the given phase. +func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption { + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + if csv == nil || csv.Status.Phase != phase { + return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) + } + return nil + } +} + +// WithUniqueCSVs returns an error if the CSV has been seen before. +func WithUniqueCSVs() walkOption { + visited := map[string]struct{}{} + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + // Check if we have visited the CSV before + if _, ok := visited[csv.GetName()]; ok { + return fmt.Errorf("csv %s/%s has already been seen", csv.GetNamespace(), csv.GetName()) + } + + visited[csv.GetName()] = struct{}{} + return nil + } +} + +// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns +// the last clusterServiceVersions in the replacement chain. An error is returned if any of the +// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement +// chain is detected. +func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) { + if csv == nil { + return nil, fmt.Errorf("csv cannot be nil") + } + + for { + // Check if there is a CSV that replaces this CSVs + next, ok := csvToReplacement[csv.GetName()] + if !ok { + break + } + + // Check walk options + for _, o := range options { + if err := o(csv); err != nil { + return nil, err + } + } + + // Move along replacement chain. + csv = next + } + return csv, nil +} + +// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other +// CSVs are in the replacing phase. +func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) { + lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) + if err != nil { + return false, err + } + return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil +} + +// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. +func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion { + replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{} + for _, csv := range csvs { + if csv.Spec.Replaces != "" { + replacementMapping[csv.Spec.Replaces] = csv + } + } + return replacementMapping +} diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go index 83e4cf9206..d55d67c4fc 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go @@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu } } -func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { start := time.Now() - steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace) if err != nil { ir.failureMetricsEmitter(time.Since(start)) } else { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 29b28ca34f..c19aba9f26 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) { return n, nil } -func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) { +func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) { var errs []error variables := make(map[solver.Identifier]solver.Variable) @@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e } preferredNamespace := namespaces[0] - existingEntryPredicates = append(existingEntryPredicates, cache.True()) - _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited) + _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited) if err != nil { return nil, err } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go index d62228f2a7..0ecd201d8d 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go @@ -8,6 +8,7 @@ import ( "github.com/blang/semver/v4" "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection" @@ -20,6 +21,7 @@ import ( type csvSourceProvider struct { csvLister v1alpha1listers.ClusterServiceVersionLister subLister v1alpha1listers.SubscriptionLister + ogLister v1listers.OperatorGroupLister logger logrus.StdLogger } @@ -30,6 +32,7 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey] key: cache.NewVirtualSourceKey(namespace), csvLister: csp.csvLister.ClusterServiceVersions(namespace), subLister: csp.subLister.Subscriptions(namespace), + ogLister: csp.ogLister.OperatorGroups(namespace), logger: csp.logger, } break // first ns is assumed to be the target ns, todo: make explicit @@ -41,6 +44,7 @@ type csvSource struct { key cache.SourceKey csvLister v1alpha1listers.ClusterServiceVersionNamespaceLister subLister v1alpha1listers.SubscriptionNamespaceLister + ogLister v1listers.OperatorGroupNamespaceLister logger logrus.StdLogger } @@ -55,6 +59,11 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { return nil, err } + failForwardEnabled, err := IsFailForwardEnabled(s.ogLister) + if err != nil { + return nil, err + } + // build a catalog snapshot of CSVs without subscriptions csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription) for _, sub := range subs { @@ -75,6 +84,17 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { if csv.IsCopied() { continue } + + if failForwardEnabled { + replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs)) + if err != nil { + return nil, err + } + if csv.Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { + continue + } + } + entry, err := newEntryFromV1Alpha1CSV(csv) if err != nil { return nil, err diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index e67d27e000..d68907588a 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -9,8 +9,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" @@ -29,7 +27,7 @@ const ( var initHooks []stepResolverInitHook type StepResolver interface { - ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) } type OperatorStepResolver struct { @@ -63,6 +61,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio &csvSourceProvider{ csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), subLister: lister.OperatorsV1alpha1().SubscriptionLister(), + ogLister: lister.OperatorsV1().OperatorGroupLister(), logger: log, }, }, @@ -86,125 +85,14 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio return stepResolver } -type walkOption func(csv *v1alpha1.ClusterServiceVersion) error - -func WithCSVPhase(phase v1alpha1.ClusterServiceVersionPhase) walkOption { - return func(csv *v1alpha1.ClusterServiceVersion) error { - if csv == nil || csv.Status.Phase != phase { - return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) - } - return nil - } -} - -func WithUniqueCSVs() walkOption { - visited := map[string]struct{}{} - return func(csv *v1alpha1.ClusterServiceVersion) error { - // Check if we have visited the CSV before - if _, ok := visited[csv.GetName()]; ok { - return fmt.Errorf("infinite replacement chain detected") - } - - visited[csv.GetName()] = struct{}{} - return nil - } -} - -// walkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns -// the last clusterServiceVersions in the replacement chain. An error is returned if any of the -// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement -// chain is detected. -func WalkReplacementChain(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion, options ...walkOption) (*v1alpha1.ClusterServiceVersion, error) { - if csv == nil { - return nil, fmt.Errorf("csv cannot be nil") - } - - for { - // Check if there is a CSV that replaces this CSVs - next, ok := csvToReplacement[csv.GetName()] - if !ok { - break - } - - // Check walk options - for _, o := range options { - if err := o(csv); err != nil { - return nil, err - } - } - - // Move along replacement chain. - csv = next - } - return csv, nil -} - -// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other -// CSVs are in the replacing phase. -func isReplacementChainThatEndsInFailure(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion) (bool, error) { - lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(v1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) - if err != nil { - return false, err - } - return (lastCSV != nil && lastCSV.Status.Phase == v1alpha1.CSVPhaseFailed), nil -} - -// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. -func ReplacementMapping(csvs []*v1alpha1.ClusterServiceVersion) map[string]*v1alpha1.ClusterServiceVersion { - replacementMapping := map[string]*v1alpha1.ClusterServiceVersion{} - for _, csv := range csvs { - if csv.Spec.Replaces != "" { - replacementMapping[csv.Spec.Replaces] = csv - } - } - return replacementMapping -} - -func (r *OperatorStepResolver) cachePredicates(namespace string) ([]cache.Predicate, error) { - nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{}) - if err != nil { - return nil, err - } - - csvs, err := r.csvLister.ClusterServiceVersions(namespace).List(labels.NewSelector().Add(*nonCopiedCSVRequirement)) - if err != nil { - return nil, err - } - - predicates := []cache.Predicate{} - for i := range csvs { - replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csvs[i], ReplacementMapping(csvs)) - if err != nil { - return nil, err - } - if csvs[i].Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { - predicates = append(predicates, cache.Not(cache.CSVNamePredicate(csvs[i].GetName()))) - } - } - - return predicates, nil -} - -func (r *OperatorStepResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { subs, err := r.listSubscriptions(namespace) if err != nil { return nil, nil, nil, err } - // The resolver considers the initial set of CSVs in the namespace by their appearance - // in the catalog cache. In order to support "fail forward" upgrades, we need to omit - // CSVs that are actively being replaced from this initial set of operators. The - // predicates defined here will omit these replacing CSVs from the set. - cachePredicates := []cache.Predicate{} - if failForwardEnabled { - cachePredicates, err = r.cachePredicates(namespace) - if err != nil { - r.log.Debugf("Unable to determine CSVs to exclude: %v", err) - } - } - namespaces := []string{namespace, r.globalCatalogNamespace} - operators, err := r.resolver.Resolve(namespaces, subs, cachePredicates...) + operators, err := r.resolver.Resolve(namespaces, subs) if err != nil { return nil, nil, nil, err } From 72d6b3efdd5d33a47a0b4d3c5fff471376459213 Mon Sep 17 00:00:00 2001 From: Alexander Greene Date: Mon, 18 Apr 2022 11:41:53 -0700 Subject: [PATCH 2/2] Address fail forward unit test failures (#2747) The broken replacement chain unit test occasionally fails because it is possible for either the a.v1 or a.v2 operators to cause the resolver to fail. This commit updates the test to check that the resolver fails because a csv is in the failed state versus checking that a specific csv is in the failed state. Signed-off-by: Alexander Greene Upstream-repository: operator-lifecycle-manager Upstream-commit: c36183ef17f757dbf1f96a2619a2a91184afc579 --- .../pkg/controller/registry/resolver/step_resolver_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go index 50e39ee23b..f4a8c3a9a2 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go @@ -1182,7 +1182,8 @@ func TestResolver(t *testing.T) { steps: [][]*v1alpha1.Step{}, subs: []*v1alpha1.Subscription{}, errAssert: func(t *testing.T, err error) { - assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv catsrc-namespace/a.v1 in phase Failed instead of Replacing") + assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv") + assert.Contains(t, err.Error(), "in phase Failed instead of Replacing") }, }, },