diff --git a/pkg/apis/flows/v1/parallel_types.go b/pkg/apis/flows/v1/parallel_types.go index 2dc32963616..986da0ff381 100644 --- a/pkg/apis/flows/v1/parallel_types.go +++ b/pkg/apis/flows/v1/parallel_types.go @@ -29,6 +29,7 @@ import ( ) // +genclient +// +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // Parallel defines conditional branches that will be wired in // series through Channels and Subscriptions. diff --git a/pkg/apis/flows/v1/sequence_types.go b/pkg/apis/flows/v1/sequence_types.go index df7b71c9e8f..b0ac4448ad0 100644 --- a/pkg/apis/flows/v1/sequence_types.go +++ b/pkg/apis/flows/v1/sequence_types.go @@ -30,6 +30,7 @@ import ( ) // +genclient +// +genreconciler // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // Sequence defines a sequence of Subscribers that will be wired in // series through Channels and Subscriptions. diff --git a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go index 20b5f3aab72..f533f092749 100644 --- a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go @@ -102,6 +102,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.AgentName != "" { agentName = opts.AgentName } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/parallel/reconciler.go b/pkg/client/injection/reconciler/flows/v1/parallel/reconciler.go index 4a60a9e5178..0db4840eae8 100644 --- a/pkg/client/injection/reconciler/flows/v1/parallel/reconciler.go +++ b/pkg/client/injection/reconciler/flows/v1/parallel/reconciler.go @@ -108,6 +108,10 @@ type reconcilerImpl struct { // finalizerName is the name of the finalizer to reconcile. finalizerName string + + // skipStatusUpdates configures whether or not this reconciler automatically updates + // the status of the reconciled resource. + skipStatusUpdates bool } // Check that our Reconciler implements controller.Reconciler @@ -160,6 +164,9 @@ func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versio if opts.FinalizerName != "" { rec.finalizerName = opts.FinalizerName } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } } return rec @@ -259,19 +266,26 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { } // Synchronize the status. - if equality.Semantic.DeepEqual(original.Status, resource.Status) { + switch { + case r.skipStatusUpdates: + // This reconciler implementation is configured to skip resource updates. + // This may mean this reconciler does not observe spec, but reconciles external changes. + case equality.Semantic.DeepEqual(original.Status, resource.Status): // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the injectionInformer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. - } else if !isLeader { + case !isLeader: + // High-availability reconcilers may have many replicas watching the resource, but only + // the elected leader is expected to write modifications. logger.Warn("Saw status changes when we aren't the leader!") - // TODO: Consider logging the diff at Debug? - } else if err = r.updateStatus(original, resource); err != nil { - logger.Warnw("Failed to update resource status", zap.Error(err)) - r.Recorder.Eventf(resource, corev1.EventTypeWarning, "UpdateFailed", - "Failed to update status for %q: %v", resource.Name, err) - return err + default: + if err = r.updateStatus(original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, corev1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } } // Report the reconciler event, if any. diff --git a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go index 252e9d964b1..f64d0221c6c 100644 --- a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go @@ -102,6 +102,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.AgentName != "" { agentName = opts.AgentName } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/sequence/reconciler.go b/pkg/client/injection/reconciler/flows/v1/sequence/reconciler.go index cb9c07453a1..0284c85a0a6 100644 --- a/pkg/client/injection/reconciler/flows/v1/sequence/reconciler.go +++ b/pkg/client/injection/reconciler/flows/v1/sequence/reconciler.go @@ -108,6 +108,10 @@ type reconcilerImpl struct { // finalizerName is the name of the finalizer to reconcile. finalizerName string + + // skipStatusUpdates configures whether or not this reconciler automatically updates + // the status of the reconciled resource. + skipStatusUpdates bool } // Check that our Reconciler implements controller.Reconciler @@ -160,6 +164,9 @@ func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versio if opts.FinalizerName != "" { rec.finalizerName = opts.FinalizerName } + if opts.SkipStatusUpdates { + rec.skipStatusUpdates = true + } } return rec @@ -259,19 +266,26 @@ func (r *reconcilerImpl) Reconcile(ctx context.Context, key string) error { } // Synchronize the status. - if equality.Semantic.DeepEqual(original.Status, resource.Status) { + switch { + case r.skipStatusUpdates: + // This reconciler implementation is configured to skip resource updates. + // This may mean this reconciler does not observe spec, but reconciles external changes. + case equality.Semantic.DeepEqual(original.Status, resource.Status): // If we didn't change anything then don't call updateStatus. // This is important because the copy we loaded from the injectionInformer's // cache may be stale and we don't want to overwrite a prior update // to status with this stale state. - } else if !isLeader { + case !isLeader: + // High-availability reconcilers may have many replicas watching the resource, but only + // the elected leader is expected to write modifications. logger.Warn("Saw status changes when we aren't the leader!") - // TODO: Consider logging the diff at Debug? - } else if err = r.updateStatus(original, resource); err != nil { - logger.Warnw("Failed to update resource status", zap.Error(err)) - r.Recorder.Eventf(resource, corev1.EventTypeWarning, "UpdateFailed", - "Failed to update status for %q: %v", resource.Name, err) - return err + default: + if err = r.updateStatus(original, resource); err != nil { + logger.Warnw("Failed to update resource status", zap.Error(err)) + r.Recorder.Eventf(resource, corev1.EventTypeWarning, "UpdateFailed", + "Failed to update status for %q: %v", resource.Name, err) + return err + } } // Report the reconciler event, if any.