diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index da2994ac3c..93b298cc2c 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -42,7 +42,6 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/operator-framework/api/pkg/operators/reference" - operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" @@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return } -func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) { - ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything()) - if err != nil { - o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err) - // Couldn't list operatorGroups, assuming default upgradeStrategy - // so existing behavior is observed for failed CSVs. - return false, nil - } - if len(ogs) != 1 { - return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace) - } - return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil -} - func (o *Operator) syncResolvingNamespace(obj interface{}) error { ns, ok := obj.(*corev1.Namespace) if !ok { @@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } - failForwardEnabled, err := o.isFailForwardEnabled(namespace) + failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace)) if err != nil { return err } @@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("resolving subscriptions in namespace") // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors - steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace) if err != nil { go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error()) // If the error is constraints not satisfiable, then simply project the diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go index 80d3bdfe40..5518bf1d3d 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/operator_test.go @@ -1254,7 +1254,7 @@ func TestSyncResolvingNamespace(t *testing.T) { o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time) o.resolver = &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, tt.fields.resolveErr }, } diff --git a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go index 028d944c35..6a813d88df 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/operators/catalog/subscriptions_test.go @@ -1031,7 +1031,7 @@ func TestSyncSubscriptions(t *testing.T) { o.sourcesLastUpdate.Set(tt.fields.sourcesLastUpdate.Time) o.resolver = &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return tt.fields.resolveSteps, tt.fields.bundleLookups, tt.fields.resolveSubs, tt.fields.resolveErr }, } @@ -1168,7 +1168,7 @@ func BenchmarkSyncResolvingNamespace(b *testing.B) { }, }, resolver: &fakes.FakeStepResolver{ - ResolveStepsStub: func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { + ResolveStepsStub: func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { steps := []*v1alpha1.Step{ { Resolving: "csv.v.2", diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go new file mode 100644 index 0000000000..ba522ba9c3 --- /dev/null +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go @@ -0,0 +1,103 @@ +package resolver + +import ( + "fmt" + + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" + operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns +// True if an operatorGroup exists in the namespace and its upgradeStrategy +// is set to UnsafeFailForward and false otherwise. An error is returned if +// an more than one operatorGroup exists in the namespace. +// No error is returned if no OperatorGroups are found to keep the resolver +// backwards compatible. +func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) { + ogs, err := ogLister.List(labels.Everything()) + if err != nil || len(ogs) == 0 { + return false, nil + } + if len(ogs) != 1 { + return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil +} + +type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error + +// WithCSVPhase returns an error if the CSV is not in the given phase. +func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption { + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + if csv == nil || csv.Status.Phase != phase { + return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) + } + return nil + } +} + +// WithUniqueCSVs returns an error if the CSV has been seen before. +func WithUniqueCSVs() walkOption { + visited := map[string]struct{}{} + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + // Check if we have visited the CSV before + if _, ok := visited[csv.GetName()]; ok { + return fmt.Errorf("csv %s/%s has already been seen", csv.GetNamespace(), csv.GetName()) + } + + visited[csv.GetName()] = struct{}{} + return nil + } +} + +// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns +// the last clusterServiceVersions in the replacement chain. An error is returned if any of the +// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement +// chain is detected. +func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) { + if csv == nil { + return nil, fmt.Errorf("csv cannot be nil") + } + + for { + // Check if there is a CSV that replaces this CSVs + next, ok := csvToReplacement[csv.GetName()] + if !ok { + break + } + + // Check walk options + for _, o := range options { + if err := o(csv); err != nil { + return nil, err + } + } + + // Move along replacement chain. + csv = next + } + return csv, nil +} + +// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other +// CSVs are in the replacing phase. +func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) { + lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) + if err != nil { + return false, err + } + return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil +} + +// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. +func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion { + replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{} + for _, csv := range csvs { + if csv.Spec.Replaces != "" { + replacementMapping[csv.Spec.Replaces] = csv + } + } + return replacementMapping +} diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go index 83e4cf9206..d55d67c4fc 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go @@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu } } -func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { start := time.Now() - steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace) if err != nil { ir.failureMetricsEmitter(time.Since(start)) } else { diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go index 8179a78168..47bf8d25f1 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver_test.go @@ -17,11 +17,11 @@ const ( type fakeResolverWithError struct{} type fakeResolverWithoutError struct{} -func (r *fakeResolverWithError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *fakeResolverWithError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, errors.New("Fake error") } -func (r *fakeResolverWithoutError) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *fakeResolverWithoutError) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { return nil, nil, nil, nil } @@ -45,7 +45,7 @@ func TestInstrumentedResolverFailure(t *testing.T) { } instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithError(), changeToSuccess, changeToFailure) - instrumentedResolver.ResolveSteps("", false) + instrumentedResolver.ResolveSteps("") require.Equal(t, len(result), 1) // check that only one call was made to a change function require.Equal(t, result[0], failure) // check that the call was made to changeToFailure function } @@ -62,7 +62,7 @@ func TestInstrumentedResolverSuccess(t *testing.T) { } instrumentedResolver := NewInstrumentedResolver(newFakeResolverWithoutError(), changeToSuccess, changeToFailure) - instrumentedResolver.ResolveSteps("", false) + instrumentedResolver.ResolveSteps("") require.Equal(t, len(result), 1) // check that only one call was made to a change function require.Equal(t, result[0], success) // check that the call was made to changeToSuccess function } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 29b28ca34f..c19aba9f26 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) { return n, nil } -func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) { +func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) { var errs []error variables := make(map[solver.Identifier]solver.Variable) @@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e } preferredNamespace := namespaces[0] - existingEntryPredicates = append(existingEntryPredicates, cache.True()) - _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited) + _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited) if err != nil { return nil, err } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go index 9a80fe770c..8c95fb5078 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver_test.go @@ -191,85 +191,6 @@ func TestSolveOperators_WithSystemConstraints(t *testing.T) { } } -func WithInstalledCSV(sub *v1alpha1.Subscription, csvName string) *v1alpha1.Subscription { - sub.Status.InstalledCSV = csvName - return sub -} - -func TestSolveOperators_WithFailForward(t *testing.T) { - const namespace = "test-namespace" - catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace} - - packageASubV2 := newSub(namespace, "packageA", "alpha", catalog) - APISet := cache.APISet{opregistry.APIKey{Group: "g", Version: "v", Kind: "k", Plural: "ks"}: struct{}{}} - - // packageA provides an API - packageAV1 := genEntry("packageA.v1", "0.0.1", "", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - packageAV2 := genEntry("packageA.v2", "0.0.2", "packageA.v1", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - packageAV3 := genEntry("packageA.v3", "0.0.3", "packageA.v2", "packageA", "alpha", catalog.Name, catalog.Namespace, nil, APISet, nil, "", false) - - existingPackageAV1 := existingOperator(namespace, "packageA.v1", "packageA", "alpha", "", APISet, nil, nil, nil) - existingPackageAV2 := existingOperator(namespace, "packageA.v2", "packageA", "alpha", "packageA.v1", APISet, nil, nil, nil) - - testCases := []struct { - title string - expectedOperators []*cache.Entry - csvs []*v1alpha1.ClusterServiceVersion - subs []*v1alpha1.Subscription - snapshotEntries []*cache.Entry - failForwardPredicates []cache.Predicate - err string - }{ - { - title: "Resolver fails if v1 and v2 provide the same APIs and v1 is not omitted from the resolver", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2}, - expectedOperators: nil, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - err: "provide k (g/v)", - }, - { - title: "Resolver succeeds if v1 and v2 provide the same APIs and v1 is omitted from the resolver", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2}, - expectedOperators: nil, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))}, - err: "", - }, - { - title: "Resolver succeeds if v1 and v2 provide the same APIs, v1 is omitted from the resolver, and an upgrade for v2 exists", - snapshotEntries: []*cache.Entry{packageAV1, packageAV2, packageAV3}, - expectedOperators: []*cache.Entry{packageAV3}, - csvs: []*v1alpha1.ClusterServiceVersion{existingPackageAV1, existingPackageAV2}, - subs: []*v1alpha1.Subscription{WithInstalledCSV(packageASubV2, existingPackageAV2.Name)}, - failForwardPredicates: []cache.Predicate{cache.Not(cache.CSVNamePredicate("packageA.v1"))}, - err: "", - }, - } - - for _, testCase := range testCases { - resolver := Resolver{ - cache: cache.New(cache.StaticSourceProvider{ - catalog: &cache.Snapshot{ - Entries: testCase.snapshotEntries, - }, - cache.NewVirtualSourceKey(namespace): csvSnapshotOrPanic(namespace, testCase.subs, testCase.csvs...), - }), - log: logrus.New(), - } - operators, err := resolver.Resolve([]string{namespace}, testCase.subs, testCase.failForwardPredicates...) - - if testCase.err != "" { - require.Error(t, err) - require.Containsf(t, err.Error(), testCase.err, "Test %s failed", testCase.title) - } else { - require.NoErrorf(t, err, "Test %s failed", testCase.title) - } - require.ElementsMatch(t, testCase.expectedOperators, operators, "Test %s failed", testCase.title) - } -} - func TestDisjointChannelGraph(t *testing.T) { const namespace = "test-namespace" catalog := cache.SourceKey{Name: "test-catalog", Namespace: namespace} @@ -1521,6 +1442,7 @@ func TestSolveOperators_TransferApiOwnership(t *testing.T) { key: cache.NewVirtualSourceKey(namespace), csvLister: &csvs, subLister: fakeSubscriptionLister(p.subs), + ogLister: fakeOperatorGroupLister{}, logger: logger, }, }), diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go index d62228f2a7..0ecd201d8d 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go @@ -8,6 +8,7 @@ import ( "github.com/blang/semver/v4" "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection" @@ -20,6 +21,7 @@ import ( type csvSourceProvider struct { csvLister v1alpha1listers.ClusterServiceVersionLister subLister v1alpha1listers.SubscriptionLister + ogLister v1listers.OperatorGroupLister logger logrus.StdLogger } @@ -30,6 +32,7 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey] key: cache.NewVirtualSourceKey(namespace), csvLister: csp.csvLister.ClusterServiceVersions(namespace), subLister: csp.subLister.Subscriptions(namespace), + ogLister: csp.ogLister.OperatorGroups(namespace), logger: csp.logger, } break // first ns is assumed to be the target ns, todo: make explicit @@ -41,6 +44,7 @@ type csvSource struct { key cache.SourceKey csvLister v1alpha1listers.ClusterServiceVersionNamespaceLister subLister v1alpha1listers.SubscriptionNamespaceLister + ogLister v1listers.OperatorGroupNamespaceLister logger logrus.StdLogger } @@ -55,6 +59,11 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { return nil, err } + failForwardEnabled, err := IsFailForwardEnabled(s.ogLister) + if err != nil { + return nil, err + } + // build a catalog snapshot of CSVs without subscriptions csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription) for _, sub := range subs { @@ -75,6 +84,17 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { if csv.IsCopied() { continue } + + if failForwardEnabled { + replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs)) + if err != nil { + return nil, err + } + if csv.Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { + continue + } + } + entry, err := newEntryFromV1Alpha1CSV(csv) if err != nil { return nil, err diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go index d39b26575b..22d226669e 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs_test.go @@ -12,6 +12,7 @@ import ( "k8s.io/apimachinery/pkg/labels" opver "github.com/operator-framework/api/pkg/lib/version" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-registry/pkg/api" @@ -450,6 +451,21 @@ func (f fakeSubscriptionLister) Get(name string) (*v1alpha1.Subscription, error) return nil, errors.NewNotFound(v1alpha1.SchemeGroupVersion.WithResource("subscriptions").GroupResource(), name) } +type fakeOperatorGroupLister []*operatorsv1.OperatorGroup + +func (f fakeOperatorGroupLister) List(selector labels.Selector) ([]*operatorsv1.OperatorGroup, error) { + return f, nil +} + +func (f fakeOperatorGroupLister) Get(name string) (*operatorsv1.OperatorGroup, error) { + for _, og := range f { + if og.Name == name { + return og, nil + } + } + return nil, errors.NewNotFound(operatorsv1.SchemeGroupVersion.WithResource("operatorgroups").GroupResource(), name) +} + func TestPropertiesAnnotationHonored(t *testing.T) { src := &csvSource{ csvLister: fakeCSVLister{ @@ -462,6 +478,7 @@ func TestPropertiesAnnotationHonored(t *testing.T) { }, }, subLister: fakeSubscriptionLister{}, + ogLister: fakeOperatorGroupLister{}, } ss, err := src.Snapshot(context.Background()) require.NoError(t, err) diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index e67d27e000..d68907588a 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -9,8 +9,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" @@ -29,7 +27,7 @@ const ( var initHooks []stepResolverInitHook type StepResolver interface { - ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) } type OperatorStepResolver struct { @@ -63,6 +61,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio &csvSourceProvider{ csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), subLister: lister.OperatorsV1alpha1().SubscriptionLister(), + ogLister: lister.OperatorsV1().OperatorGroupLister(), logger: log, }, }, @@ -86,125 +85,14 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio return stepResolver } -type walkOption func(csv *v1alpha1.ClusterServiceVersion) error - -func WithCSVPhase(phase v1alpha1.ClusterServiceVersionPhase) walkOption { - return func(csv *v1alpha1.ClusterServiceVersion) error { - if csv == nil || csv.Status.Phase != phase { - return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) - } - return nil - } -} - -func WithUniqueCSVs() walkOption { - visited := map[string]struct{}{} - return func(csv *v1alpha1.ClusterServiceVersion) error { - // Check if we have visited the CSV before - if _, ok := visited[csv.GetName()]; ok { - return fmt.Errorf("infinite replacement chain detected") - } - - visited[csv.GetName()] = struct{}{} - return nil - } -} - -// walkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns -// the last clusterServiceVersions in the replacement chain. An error is returned if any of the -// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement -// chain is detected. -func WalkReplacementChain(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion, options ...walkOption) (*v1alpha1.ClusterServiceVersion, error) { - if csv == nil { - return nil, fmt.Errorf("csv cannot be nil") - } - - for { - // Check if there is a CSV that replaces this CSVs - next, ok := csvToReplacement[csv.GetName()] - if !ok { - break - } - - // Check walk options - for _, o := range options { - if err := o(csv); err != nil { - return nil, err - } - } - - // Move along replacement chain. - csv = next - } - return csv, nil -} - -// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other -// CSVs are in the replacing phase. -func isReplacementChainThatEndsInFailure(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion) (bool, error) { - lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(v1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) - if err != nil { - return false, err - } - return (lastCSV != nil && lastCSV.Status.Phase == v1alpha1.CSVPhaseFailed), nil -} - -// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. -func ReplacementMapping(csvs []*v1alpha1.ClusterServiceVersion) map[string]*v1alpha1.ClusterServiceVersion { - replacementMapping := map[string]*v1alpha1.ClusterServiceVersion{} - for _, csv := range csvs { - if csv.Spec.Replaces != "" { - replacementMapping[csv.Spec.Replaces] = csv - } - } - return replacementMapping -} - -func (r *OperatorStepResolver) cachePredicates(namespace string) ([]cache.Predicate, error) { - nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{}) - if err != nil { - return nil, err - } - - csvs, err := r.csvLister.ClusterServiceVersions(namespace).List(labels.NewSelector().Add(*nonCopiedCSVRequirement)) - if err != nil { - return nil, err - } - - predicates := []cache.Predicate{} - for i := range csvs { - replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csvs[i], ReplacementMapping(csvs)) - if err != nil { - return nil, err - } - if csvs[i].Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { - predicates = append(predicates, cache.Not(cache.CSVNamePredicate(csvs[i].GetName()))) - } - } - - return predicates, nil -} - -func (r *OperatorStepResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { subs, err := r.listSubscriptions(namespace) if err != nil { return nil, nil, nil, err } - // The resolver considers the initial set of CSVs in the namespace by their appearance - // in the catalog cache. In order to support "fail forward" upgrades, we need to omit - // CSVs that are actively being replaced from this initial set of operators. The - // predicates defined here will omit these replacing CSVs from the set. - cachePredicates := []cache.Predicate{} - if failForwardEnabled { - cachePredicates, err = r.cachePredicates(namespace) - if err != nil { - r.log.Debugf("Unable to determine CSVs to exclude: %v", err) - } - } - namespaces := []string{namespace, r.globalCatalogNamespace} - operators, err := r.resolver.Resolve(namespaces, subs, cachePredicates...) + operators, err := r.resolver.Resolve(namespaces, subs) if err != nil { return nil, nil, nil, err } diff --git a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go index be9e795082..f4a8c3a9a2 100644 --- a/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go +++ b/staging/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver_test.go @@ -14,6 +14,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-registry/pkg/api" opregistry "github.com/operator-framework/operator-registry/pkg/registry" @@ -266,7 +267,7 @@ func TestIsReplacementChainThatEndsInFailure(t *testing.T) { }, expected: out{ b: false, - err: fmt.Errorf("infinite replacement chain detected"), + err: fmt.Errorf("csv bar/foo-v1 has already been seen"), }, }, } @@ -318,12 +319,12 @@ func TestResolver(t *testing.T) { solverError solver.NotSatisfiable } type resolverTest struct { - name string - clusterState []runtime.Object - bundlesByCatalog map[resolvercache.SourceKey][]*api.Bundle - out resolverTestOut - failForwardEnabled bool + name string + clusterState []runtime.Object + bundlesByCatalog map[resolvercache.SourceKey][]*api.Bundle + out resolverTestOut } + nothing := resolverTestOut{ steps: [][]*v1alpha1.Step{}, lookups: []v1alpha1.BundleLookup{}, @@ -1097,8 +1098,8 @@ func TestResolver(t *testing.T) { existingSub(namespace, "a.v2", "a", "alpha", catalog), existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1145,8 +1146,8 @@ func TestResolver(t *testing.T) { existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v3", "a", "alpha", "a.v2", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1169,8 +1170,8 @@ func TestResolver(t *testing.T) { existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), existingOperator(namespace, "a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), existingOperator(namespace, "a.v3", "a", "alpha", "a.v2", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1181,10 +1182,8 @@ func TestResolver(t *testing.T) { steps: [][]*v1alpha1.Step{}, subs: []*v1alpha1.Subscription{}, errAssert: func(t *testing.T, err error) { - assert.IsType(t, solver.NotSatisfiable{}, err) - assert.Contains(t, err.Error(), "constraints not satisfiable") - assert.Contains(t, err.Error(), "provide k (g/v)") - assert.Contains(t, err.Error(), "exists and is not referenced by a subscription") + assert.Contains(t, err.Error(), "error using catalog @existing (in namespace catsrc-namespace): csv") + assert.Contains(t, err.Error(), "in phase Failed instead of Replacing") }, }, }, @@ -1194,8 +1193,8 @@ func TestResolver(t *testing.T) { existingSub(namespace, "a.v1", "a", "alpha", catalog), existingOperator(namespace, "b.v1", "b", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseReplacing)), existingOperator(namespace, "a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withPhase(v1alpha1.CSVPhaseFailed)), + newOperatorGroup("foo", namespace, withUpgradeStrategy(operatorsv1.UpgradeStrategyUnsafeFailForward)), }, - failForwardEnabled: true, bundlesByCatalog: map[resolvercache.SourceKey][]*api.Bundle{catalog: { bundle("a.v1", "a", "alpha", "", Provides1, nil, nil, nil, withVersion("1.0.0")), bundle("a.v2", "a", "alpha", "a.v1", Provides1, nil, nil, nil, withVersion("2.0.0")), @@ -1227,6 +1226,7 @@ func TestResolver(t *testing.T) { lister := operatorlister.NewLister() lister.OperatorsV1alpha1().RegisterSubscriptionLister(namespace, informerFactory.Operators().V1alpha1().Subscriptions().Lister()) lister.OperatorsV1alpha1().RegisterClusterServiceVersionLister(namespace, informerFactory.Operators().V1alpha1().ClusterServiceVersions().Lister()) + lister.OperatorsV1().RegisterOperatorGroupLister(namespace, informerFactory.Operators().V1().OperatorGroups().Lister()) ssp := make(resolvercache.StaticSourceProvider) for catalog, bundles := range tt.bundlesByCatalog { @@ -1245,6 +1245,7 @@ func TestResolver(t *testing.T) { key: resolvercache.NewVirtualSourceKey(namespace), csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister().ClusterServiceVersions(namespace), subLister: lister.OperatorsV1alpha1().SubscriptionLister().Subscriptions(namespace), + ogLister: lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace), logger: log, } satresolver := &Resolver{ @@ -1254,7 +1255,7 @@ func TestResolver(t *testing.T) { resolver := NewOperatorStepResolver(lister, clientFake, "", nil, log) resolver.resolver = satresolver - steps, lookups, subs, err := resolver.ResolveSteps(namespace, tt.failForwardEnabled) + steps, lookups, subs, err := resolver.ResolveSteps(namespace) if tt.out.solverError == nil { if tt.out.errAssert == nil { assert.NoError(t, err) @@ -1384,7 +1385,7 @@ func TestNamespaceResolverRBAC(t *testing.T) { } resolver := NewOperatorStepResolver(lister, clientFake, "", nil, logrus.New()) resolver.resolver = satresolver - steps, _, subs, err := resolver.ResolveSteps(namespace, tt.failForwardEnabled) + steps, _, subs, err := resolver.ResolveSteps(namespace) require.Equal(t, tt.out.err, err) requireStepsEqual(t, expectedSteps, steps) require.ElementsMatch(t, tt.out.subs, subs) @@ -1403,6 +1404,7 @@ func StartResolverInformers(namespace string, stopCh <-chan struct{}, objs ...ru informers := []cache.SharedIndexInformer{ nsInformerFactory.Operators().V1alpha1().Subscriptions().Informer(), nsInformerFactory.Operators().V1alpha1().ClusterServiceVersions().Informer(), + nsInformerFactory.Operators().V1().OperatorGroups().Informer(), } for _, informer := range informers { @@ -1443,6 +1445,27 @@ func newSub(namespace, pkg, channel string, catalog resolvercache.SourceKey, opt return s } +type ogOption func(*operatorsv1.OperatorGroup) + +func withUpgradeStrategy(upgradeStrategy operatorsv1.UpgradeStrategy) ogOption { + return func(og *operatorsv1.OperatorGroup) { + og.Spec.UpgradeStrategy = upgradeStrategy + } +} + +func newOperatorGroup(name, namespace string, option ...ogOption) *operatorsv1.OperatorGroup { + og := &operatorsv1.OperatorGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } + for _, o := range option { + o(og) + } + return og +} + func updatedSub(namespace, currentOperatorName, installedOperatorName, pkg, channel string, catalog resolvercache.SourceKey, option ...subOption) *v1alpha1.Subscription { s := &v1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ diff --git a/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go b/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go index d24d5bc16f..3b85a209a1 100644 --- a/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go +++ b/staging/operator-lifecycle-manager/pkg/fakes/fake_resolver.go @@ -9,11 +9,10 @@ import ( ) type FakeStepResolver struct { - ResolveStepsStub func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveStepsStub func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) resolveStepsMutex sync.RWMutex resolveStepsArgsForCall []struct { arg1 string - arg2 bool } resolveStepsReturns struct { result1 []*v1alpha1.Step @@ -31,17 +30,16 @@ type FakeStepResolver struct { invocationsMutex sync.RWMutex } -func (fake *FakeStepResolver) ResolveSteps(arg1 string, arg2 bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (fake *FakeStepResolver) ResolveSteps(arg1 string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { fake.resolveStepsMutex.Lock() ret, specificReturn := fake.resolveStepsReturnsOnCall[len(fake.resolveStepsArgsForCall)] fake.resolveStepsArgsForCall = append(fake.resolveStepsArgsForCall, struct { arg1 string - arg2 bool - }{arg1, arg2}) - fake.recordInvocation("ResolveSteps", []interface{}{arg1, arg2}) + }{arg1}) + fake.recordInvocation("ResolveSteps", []interface{}{arg1}) fake.resolveStepsMutex.Unlock() if fake.ResolveStepsStub != nil { - return fake.ResolveStepsStub(arg1, arg2) + return fake.ResolveStepsStub(arg1) } if specificReturn { return ret.result1, ret.result2, ret.result3, ret.result4 @@ -56,17 +54,17 @@ func (fake *FakeStepResolver) ResolveStepsCallCount() int { return len(fake.resolveStepsArgsForCall) } -func (fake *FakeStepResolver) ResolveStepsCalls(stub func(string, bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error)) { +func (fake *FakeStepResolver) ResolveStepsCalls(stub func(string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error)) { fake.resolveStepsMutex.Lock() defer fake.resolveStepsMutex.Unlock() fake.ResolveStepsStub = stub } -func (fake *FakeStepResolver) ResolveStepsArgsForCall(i int) (string, bool) { +func (fake *FakeStepResolver) ResolveStepsArgsForCall(i int) string { fake.resolveStepsMutex.RLock() defer fake.resolveStepsMutex.RUnlock() argsForCall := fake.resolveStepsArgsForCall[i] - return argsForCall.arg1, argsForCall.arg2 + return argsForCall.arg1 } func (fake *FakeStepResolver) ResolveStepsReturns(result1 []*v1alpha1.Step, result2 []v1alpha1.BundleLookup, result3 []*v1alpha1.Subscription, result4 error) { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go index da2994ac3c..93b298cc2c 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/catalog/operator.go @@ -42,7 +42,6 @@ import ( "k8s.io/client-go/util/workqueue" "github.com/operator-framework/api/pkg/operators/reference" - operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/informers/externalversions" @@ -903,20 +902,6 @@ func (o *Operator) syncCatalogSources(obj interface{}) (syncError error) { return } -func (o *Operator) isFailForwardEnabled(namespace string) (bool, error) { - ogs, err := o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace).List(labels.Everything()) - if err != nil { - o.logger.Debugf("failed to list operatorgroups in the %s namespace: %v", namespace, err) - // Couldn't list operatorGroups, assuming default upgradeStrategy - // so existing behavior is observed for failed CSVs. - return false, nil - } - if len(ogs) != 1 { - return false, fmt.Errorf("found %d operatorGroups in namespace %s, expected 1", len(ogs), namespace) - } - return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil -} - func (o *Operator) syncResolvingNamespace(obj interface{}) error { ns, ok := obj.(*corev1.Namespace) if !ok { @@ -943,7 +928,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { return err } - failForwardEnabled, err := o.isFailForwardEnabled(namespace) + failForwardEnabled, err := resolver.IsFailForwardEnabled(o.lister.OperatorsV1().OperatorGroupLister().OperatorGroups(namespace)) if err != nil { return err } @@ -998,7 +983,7 @@ func (o *Operator) syncResolvingNamespace(obj interface{}) error { logger.Debug("resolving subscriptions in namespace") // resolve a set of steps to apply to a cluster, a set of subscriptions to create/update, and any errors - steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, bundleLookups, updatedSubs, err := o.resolver.ResolveSteps(namespace) if err != nil { go o.recorder.Event(ns, corev1.EventTypeWarning, "ResolutionFailed", err.Error()) // If the error is constraints not satisfiable, then simply project the diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go new file mode 100644 index 0000000000..ba522ba9c3 --- /dev/null +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/fail_forward.go @@ -0,0 +1,103 @@ +package resolver + +import ( + "fmt" + + operatorsv1 "github.com/operator-framework/api/pkg/operators/v1" + operatorsv1alpha1 "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" + "k8s.io/apimachinery/pkg/labels" +) + +// IsFailForwardEnabled takes a namespaced operatorGroup lister and returns +// True if an operatorGroup exists in the namespace and its upgradeStrategy +// is set to UnsafeFailForward and false otherwise. An error is returned if +// an more than one operatorGroup exists in the namespace. +// No error is returned if no OperatorGroups are found to keep the resolver +// backwards compatible. +func IsFailForwardEnabled(ogLister v1listers.OperatorGroupNamespaceLister) (bool, error) { + ogs, err := ogLister.List(labels.Everything()) + if err != nil || len(ogs) == 0 { + return false, nil + } + if len(ogs) != 1 { + return false, fmt.Errorf("found %d operatorGroups, expected 1", len(ogs)) + } + return ogs[0].UpgradeStrategy() == operatorsv1.UpgradeStrategyUnsafeFailForward, nil +} + +type walkOption func(csv *operatorsv1alpha1.ClusterServiceVersion) error + +// WithCSVPhase returns an error if the CSV is not in the given phase. +func WithCSVPhase(phase operatorsv1alpha1.ClusterServiceVersionPhase) walkOption { + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + if csv == nil || csv.Status.Phase != phase { + return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) + } + return nil + } +} + +// WithUniqueCSVs returns an error if the CSV has been seen before. +func WithUniqueCSVs() walkOption { + visited := map[string]struct{}{} + return func(csv *operatorsv1alpha1.ClusterServiceVersion) error { + // Check if we have visited the CSV before + if _, ok := visited[csv.GetName()]; ok { + return fmt.Errorf("csv %s/%s has already been seen", csv.GetNamespace(), csv.GetName()) + } + + visited[csv.GetName()] = struct{}{} + return nil + } +} + +// WalkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns +// the last clusterServiceVersions in the replacement chain. An error is returned if any of the +// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement +// chain is detected. +func WalkReplacementChain(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion, options ...walkOption) (*operatorsv1alpha1.ClusterServiceVersion, error) { + if csv == nil { + return nil, fmt.Errorf("csv cannot be nil") + } + + for { + // Check if there is a CSV that replaces this CSVs + next, ok := csvToReplacement[csv.GetName()] + if !ok { + break + } + + // Check walk options + for _, o := range options { + if err := o(csv); err != nil { + return nil, err + } + } + + // Move along replacement chain. + csv = next + } + return csv, nil +} + +// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other +// CSVs are in the replacing phase. +func isReplacementChainThatEndsInFailure(csv *operatorsv1alpha1.ClusterServiceVersion, csvToReplacement map[string]*operatorsv1alpha1.ClusterServiceVersion) (bool, error) { + lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(operatorsv1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) + if err != nil { + return false, err + } + return (lastCSV != nil && lastCSV.Status.Phase == operatorsv1alpha1.CSVPhaseFailed), nil +} + +// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. +func ReplacementMapping(csvs []*operatorsv1alpha1.ClusterServiceVersion) map[string]*operatorsv1alpha1.ClusterServiceVersion { + replacementMapping := map[string]*operatorsv1alpha1.ClusterServiceVersion{} + for _, csv := range csvs { + if csv.Spec.Replaces != "" { + replacementMapping[csv.Spec.Replaces] = csv + } + } + return replacementMapping +} diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go index 83e4cf9206..d55d67c4fc 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/instrumented_resolver.go @@ -22,9 +22,9 @@ func NewInstrumentedResolver(resolver StepResolver, successMetricsEmitter, failu } } -func (ir *InstrumentedResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (ir *InstrumentedResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { start := time.Now() - steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace, failForwardEnabled) + steps, lookups, subs, err := ir.resolver.ResolveSteps(namespace) if err != nil { ir.failureMetricsEmitter(time.Since(start)) } else { diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go index 29b28ca34f..c19aba9f26 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/resolver.go @@ -55,7 +55,7 @@ func (w *debugWriter) Write(b []byte) (int, error) { return n, nil } -func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, existingEntryPredicates ...cache.Predicate) ([]*cache.Entry, error) { +func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription) ([]*cache.Entry, error) { var errs []error variables := make(map[solver.Identifier]solver.Variable) @@ -72,8 +72,7 @@ func (r *Resolver) Resolve(namespaces []string, subs []*v1alpha1.Subscription, e } preferredNamespace := namespaces[0] - existingEntryPredicates = append(existingEntryPredicates, cache.True()) - _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(existingEntryPredicates...), namespacedCache, visited) + _, existingVariables, err := r.getBundleVariables(preferredNamespace, namespacedCache.Catalog(cache.NewVirtualSourceKey(preferredNamespace)).Find(cache.True()), namespacedCache, visited) if err != nil { return nil, err } diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go index d62228f2a7..0ecd201d8d 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/source_csvs.go @@ -8,6 +8,7 @@ import ( "github.com/blang/semver/v4" "github.com/operator-framework/api/pkg/operators/v1alpha1" + v1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1" v1alpha1listers "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/listers/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/cache" "github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/projection" @@ -20,6 +21,7 @@ import ( type csvSourceProvider struct { csvLister v1alpha1listers.ClusterServiceVersionLister subLister v1alpha1listers.SubscriptionLister + ogLister v1listers.OperatorGroupLister logger logrus.StdLogger } @@ -30,6 +32,7 @@ func (csp *csvSourceProvider) Sources(namespaces ...string) map[cache.SourceKey] key: cache.NewVirtualSourceKey(namespace), csvLister: csp.csvLister.ClusterServiceVersions(namespace), subLister: csp.subLister.Subscriptions(namespace), + ogLister: csp.ogLister.OperatorGroups(namespace), logger: csp.logger, } break // first ns is assumed to be the target ns, todo: make explicit @@ -41,6 +44,7 @@ type csvSource struct { key cache.SourceKey csvLister v1alpha1listers.ClusterServiceVersionNamespaceLister subLister v1alpha1listers.SubscriptionNamespaceLister + ogLister v1listers.OperatorGroupNamespaceLister logger logrus.StdLogger } @@ -55,6 +59,11 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { return nil, err } + failForwardEnabled, err := IsFailForwardEnabled(s.ogLister) + if err != nil { + return nil, err + } + // build a catalog snapshot of CSVs without subscriptions csvSubscriptions := make(map[*v1alpha1.ClusterServiceVersion]*v1alpha1.Subscription) for _, sub := range subs { @@ -75,6 +84,17 @@ func (s *csvSource) Snapshot(ctx context.Context) (*cache.Snapshot, error) { if csv.IsCopied() { continue } + + if failForwardEnabled { + replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csv, ReplacementMapping(csvs)) + if err != nil { + return nil, err + } + if csv.Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { + continue + } + } + entry, err := newEntryFromV1Alpha1CSV(csv) if err != nil { return nil, err diff --git a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go index e67d27e000..d68907588a 100644 --- a/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go +++ b/vendor/github.com/operator-framework/operator-lifecycle-manager/pkg/controller/registry/resolver/step_resolver.go @@ -9,8 +9,6 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/operator-lifecycle-manager/pkg/api/client/clientset/versioned" @@ -29,7 +27,7 @@ const ( var initHooks []stepResolverInitHook type StepResolver interface { - ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) + ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) } type OperatorStepResolver struct { @@ -63,6 +61,7 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio &csvSourceProvider{ csvLister: lister.OperatorsV1alpha1().ClusterServiceVersionLister(), subLister: lister.OperatorsV1alpha1().SubscriptionLister(), + ogLister: lister.OperatorsV1().OperatorGroupLister(), logger: log, }, }, @@ -86,125 +85,14 @@ func NewOperatorStepResolver(lister operatorlister.OperatorLister, client versio return stepResolver } -type walkOption func(csv *v1alpha1.ClusterServiceVersion) error - -func WithCSVPhase(phase v1alpha1.ClusterServiceVersionPhase) walkOption { - return func(csv *v1alpha1.ClusterServiceVersion) error { - if csv == nil || csv.Status.Phase != phase { - return fmt.Errorf("csv %s/%s in phase %s instead of %s", csv.GetNamespace(), csv.GetName(), csv.Status.Phase, phase) - } - return nil - } -} - -func WithUniqueCSVs() walkOption { - visited := map[string]struct{}{} - return func(csv *v1alpha1.ClusterServiceVersion) error { - // Check if we have visited the CSV before - if _, ok := visited[csv.GetName()]; ok { - return fmt.Errorf("infinite replacement chain detected") - } - - visited[csv.GetName()] = struct{}{} - return nil - } -} - -// walkReplacementChain walks along the chain of clusterServiceVersions being replaced and returns -// the last clusterServiceVersions in the replacement chain. An error is returned if any of the -// clusterServiceVersions before the last is not in the replaces phase or if an infinite replacement -// chain is detected. -func WalkReplacementChain(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion, options ...walkOption) (*v1alpha1.ClusterServiceVersion, error) { - if csv == nil { - return nil, fmt.Errorf("csv cannot be nil") - } - - for { - // Check if there is a CSV that replaces this CSVs - next, ok := csvToReplacement[csv.GetName()] - if !ok { - break - } - - // Check walk options - for _, o := range options { - if err := o(csv); err != nil { - return nil, err - } - } - - // Move along replacement chain. - csv = next - } - return csv, nil -} - -// isReplacementChainThatEndsInFailure returns true if the last CSV in the chain is in the failed phase and all other -// CSVs are in the replacing phase. -func isReplacementChainThatEndsInFailure(csv *v1alpha1.ClusterServiceVersion, csvToReplacement map[string]*v1alpha1.ClusterServiceVersion) (bool, error) { - lastCSV, err := WalkReplacementChain(csv, csvToReplacement, WithCSVPhase(v1alpha1.CSVPhaseReplacing), WithUniqueCSVs()) - if err != nil { - return false, err - } - return (lastCSV != nil && lastCSV.Status.Phase == v1alpha1.CSVPhaseFailed), nil -} - -// ReplacementMapping takes a list of CSVs and returns a map that maps a CSV's name to the CSV that replaces it. -func ReplacementMapping(csvs []*v1alpha1.ClusterServiceVersion) map[string]*v1alpha1.ClusterServiceVersion { - replacementMapping := map[string]*v1alpha1.ClusterServiceVersion{} - for _, csv := range csvs { - if csv.Spec.Replaces != "" { - replacementMapping[csv.Spec.Replaces] = csv - } - } - return replacementMapping -} - -func (r *OperatorStepResolver) cachePredicates(namespace string) ([]cache.Predicate, error) { - nonCopiedCSVRequirement, err := labels.NewRequirement(v1alpha1.CopiedLabelKey, selection.DoesNotExist, []string{}) - if err != nil { - return nil, err - } - - csvs, err := r.csvLister.ClusterServiceVersions(namespace).List(labels.NewSelector().Add(*nonCopiedCSVRequirement)) - if err != nil { - return nil, err - } - - predicates := []cache.Predicate{} - for i := range csvs { - replacementChainEndsInFailure, err := isReplacementChainThatEndsInFailure(csvs[i], ReplacementMapping(csvs)) - if err != nil { - return nil, err - } - if csvs[i].Status.Phase == v1alpha1.CSVPhaseReplacing && replacementChainEndsInFailure { - predicates = append(predicates, cache.Not(cache.CSVNamePredicate(csvs[i].GetName()))) - } - } - - return predicates, nil -} - -func (r *OperatorStepResolver) ResolveSteps(namespace string, failForwardEnabled bool) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { +func (r *OperatorStepResolver) ResolveSteps(namespace string) ([]*v1alpha1.Step, []v1alpha1.BundleLookup, []*v1alpha1.Subscription, error) { subs, err := r.listSubscriptions(namespace) if err != nil { return nil, nil, nil, err } - // The resolver considers the initial set of CSVs in the namespace by their appearance - // in the catalog cache. In order to support "fail forward" upgrades, we need to omit - // CSVs that are actively being replaced from this initial set of operators. The - // predicates defined here will omit these replacing CSVs from the set. - cachePredicates := []cache.Predicate{} - if failForwardEnabled { - cachePredicates, err = r.cachePredicates(namespace) - if err != nil { - r.log.Debugf("Unable to determine CSVs to exclude: %v", err) - } - } - namespaces := []string{namespace, r.globalCatalogNamespace} - operators, err := r.resolver.Resolve(namespaces, subs, cachePredicates...) + operators, err := r.resolver.Resolve(namespaces, subs) if err != nil { return nil, nil, nil, err }