Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/reconciler/autoscaling/hpa/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err = c.UpdateStatus(pa); err != nil {
} else if err = c.UpdateStatus(original, pa); err != nil {
logger.Warnw("Failed to update pa status", zap.Error(err))
c.Recorder.Eventf(pa, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update status for PA %q: %v", pa.Name, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/autoscaling/kpa/kpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err = c.UpdateStatus(pa); err != nil {
} else if err = c.UpdateStatus(original, pa); err != nil {
logger.Warnw("Failed to update pa status", zap.Error(err))
c.Recorder.Eventf(pa, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update status for PA %q: %v", pa.Name, err)
Expand Down
32 changes: 19 additions & 13 deletions pkg/reconciler/autoscaling/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,18 +138,24 @@ func (c *Base) ReconcileMetric(ctx context.Context, pa *pav1alpha1.PodAutoscaler
}

// UpdateStatus updates the status of the given PodAutoscaler.
func (c *Base) UpdateStatus(desired *pav1alpha1.PodAutoscaler) (*pav1alpha1.PodAutoscaler, error) {
pa, err := c.PALister.PodAutoscalers(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}
// If there's nothing to update, just return.
if reflect.DeepEqual(pa.Status, desired.Status) {
return pa, nil
}
// Don't modify the informers copy
existing := pa.DeepCopy()
existing.Status = desired.Status
func (c *Base) UpdateStatus(existing *pav1alpha1.PodAutoscaler, desired *pav1alpha1.PodAutoscaler) error {
existing = existing.DeepCopy()
return reconciler.RetryUpdateConflicts(func(attempts int) (err error) {
// The first iteration tries to use the informer's state, subsequent attempts fetch the latest state via API.
if attempts > 0 {
existing, err = c.ServingClientSet.AutoscalingV1alpha1().PodAutoscalers(desired.Namespace).Get(desired.Name, metav1.GetOptions{})
if err != nil {
return err
}
}

// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, desired.Status) {
Copy link
Copy Markdown
Member

@dprotaso dprotaso Dec 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

equality.Semantic.DeepEqual

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this wasn't in the prior code - feel free to leave this as a separate PR or I can do it

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, let's leave it separate. Was wondering the same while doing it though.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kmp.Diff ? :)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned, won't fix in this PR. I don't want to inadvertedly and subtly break things in this PR which is already kinda invasive. Let's make that a separate change please.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to doing separately.

return nil
}

return c.ServingClientSet.AutoscalingV1alpha1().PodAutoscalers(pa.Namespace).UpdateStatus(existing)
existing.Status = desired.Status
_, err = c.ServingClientSet.AutoscalingV1alpha1().PodAutoscalers(existing.Namespace).UpdateStatus(existing)
return err
})
}
34 changes: 20 additions & 14 deletions pkg/reconciler/certificate/certificate.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err := c.updateStatus(knCert); err != nil {
} else if err := c.updateStatus(original, knCert); err != nil {
logger.Warnw("Failed to update certificate status", zap.Error(err))
c.Recorder.Eventf(knCert, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update status for Certificate %s: %v", key, err)
Expand Down Expand Up @@ -173,18 +173,24 @@ func (c *Reconciler) reconcileCMCertificate(ctx context.Context, knCert *v1alpha
return cmCert, nil
}

func (c *Reconciler) updateStatus(desired *v1alpha1.Certificate) (*v1alpha1.Certificate, error) {
cert, err := c.knCertificateLister.Certificates(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}
// If there's nothing to update, just return.
if reflect.DeepEqual(cert.Status, desired.Status) {
return cert, nil
}
// Don't modify the informers copy
existing := cert.DeepCopy()
existing.Status = desired.Status
func (c *Reconciler) updateStatus(existing *v1alpha1.Certificate, desired *v1alpha1.Certificate) error {
existing = existing.DeepCopy()
return reconciler.RetryUpdateConflicts(func(attempts int) (err error) {
// The first iteration tries to use the informer's state, subsequent attempts fetch the latest state via API.
if attempts > 0 {
existing, err = c.ServingClientSet.NetworkingV1alpha1().Certificates(desired.Namespace).Get(desired.Name, metav1.GetOptions{})
if err != nil {
return err
}
}

// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, desired.Status) {
return nil
}

return c.ServingClientSet.NetworkingV1alpha1().Certificates(existing.Namespace).UpdateStatus(existing)
existing.Status = desired.Status
_, err = c.ServingClientSet.NetworkingV1alpha1().Certificates(existing.Namespace).UpdateStatus(existing)
return err
})
}
35 changes: 21 additions & 14 deletions pkg/reconciler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (c *Reconciler) Reconcile(ctx context.Context, key string) error {
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if _, err = c.updateStatus(config); err != nil {
} else if err = c.updateStatus(original, config); err != nil {
logger.Warnw("Failed to update configuration status", zap.Error(err))
c.Recorder.Eventf(config, corev1.EventTypeWarning, "UpdateFailed", "Failed to update status: %v", err)
return err
Expand Down Expand Up @@ -341,17 +341,24 @@ func (c *Reconciler) createRevision(ctx context.Context, config *v1alpha1.Config
return created, nil
}

func (c *Reconciler) updateStatus(desired *v1alpha1.Configuration) (*v1alpha1.Configuration, error) {
config, err := c.configurationLister.Configurations(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}
// If there's nothing to update, just return.
if reflect.DeepEqual(config.Status, desired.Status) {
return config, nil
}
// Don't modify the informers copy
existing := config.DeepCopy()
existing.Status = desired.Status
return c.ServingClientSet.ServingV1alpha1().Configurations(desired.Namespace).UpdateStatus(existing)
func (c *Reconciler) updateStatus(existing *v1alpha1.Configuration, desired *v1alpha1.Configuration) error {
existing = existing.DeepCopy()
return reconciler.RetryUpdateConflicts(func(attempts int) (err error) {
// The first iteration tries to use the informer's state, subsequent attempts fetch the latest state via API.
if attempts > 0 {
existing, err = c.ServingClientSet.ServingV1alpha1().Configurations(desired.Namespace).Get(desired.Name, metav1.GetOptions{})
if err != nil {
return err
}
}

// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, desired.Status) {
return nil
}

existing.Status = desired.Status
_, err = c.ServingClientSet.ServingV1alpha1().Configurations(desired.Namespace).UpdateStatus(existing)
return err
})
}
34 changes: 20 additions & 14 deletions pkg/reconciler/ingress/ingress.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else {
if _, err = r.updateStatus(ingress); err != nil {
if err = r.updateStatus(original, ingress); err != nil {
logger.Warnw("Failed to update Ingress status", zap.Error(err))
r.Recorder.Eventf(ingress, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update status for Ingress %q: %v", ingress.GetName(), err)
Expand Down Expand Up @@ -306,20 +306,26 @@ func (r *Reconciler) reconcileDeletion(ctx context.Context, ia *v1alpha1.Ingress

// Update the Status of the Ingress. Caller is responsible for checking
// for semantic differences before calling.
func (r *Reconciler) updateStatus(desired *v1alpha1.Ingress) (*v1alpha1.Ingress, error) {
ingress, err := r.ingressLister.Ingresses(desired.GetNamespace()).Get(desired.GetName())
if err != nil {
return nil, err
}
func (r *Reconciler) updateStatus(existing *v1alpha1.Ingress, desired *v1alpha1.Ingress) error {
existing = existing.DeepCopy()
return reconciler.RetryUpdateConflicts(func(attempts int) (err error) {
// The first iteration tries to use the informer's state, subsequent attempts fetch the latest state via API.
if attempts > 0 {
existing, err = r.ServingClientSet.NetworkingV1alpha1().Ingresses(desired.GetNamespace()).Get(desired.GetName(), metav1.GetOptions{})
if err != nil {
return err
}
}

// If there's nothing to update, just return.
if reflect.DeepEqual(ingress.Status, desired.Status) {
return ingress, nil
}
// Don't modify the informers copy
existing := ingress.DeepCopy()
existing.Status = desired.Status
return r.ServingClientSet.NetworkingV1alpha1().Ingresses(existing.GetNamespace()).UpdateStatus(existing)
// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, desired.Status) {
return nil
}

existing.Status = desired.Status
_, err = r.ServingClientSet.NetworkingV1alpha1().Ingresses(existing.GetNamespace()).UpdateStatus(existing)
return err
})
}

func (r *Reconciler) ensureFinalizer(ia *v1alpha1.Ingress) error {
Expand Down
36 changes: 22 additions & 14 deletions pkg/reconciler/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package metric
import (
"context"
"fmt"
"reflect"

"go.uber.org/zap"
"knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
Expand All @@ -32,6 +33,7 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -81,7 +83,7 @@ func (r *reconciler) Reconcile(ctx context.Context, key string) error {

if !equality.Semantic.DeepEqual(original.Status, metric.Status) {
// Change of status, need to update the object.
if uErr := r.updateStatus(metric); uErr != nil {
if uErr := r.updateStatus(original, metric); uErr != nil {
logger.Warnw("Failed to update metric status", zap.Error(uErr))
r.Recorder.Eventf(metric, corev1.EventTypeWarning, "UpdateFailed",
"Failed to update metric status: %v", uErr)
Expand All @@ -102,18 +104,24 @@ func (r *reconciler) reconcileCollection(ctx context.Context, metric *v1alpha1.M
return nil
}

func (r *reconciler) updateStatus(m *v1alpha1.Metric) error {
ex, err := r.metricLister.Metrics(m.Namespace).Get(m.Name)
if err != nil {
// If something deleted metric while we were reconciling ¯\(°_o)/¯.
func (r *reconciler) updateStatus(existing *v1alpha1.Metric, desired *v1alpha1.Metric) error {
existing = existing.DeepCopy()
return rbase.RetryUpdateConflicts(func(attempts int) (err error) {
// The first iteration tries to use the informer's state, subsequent attempts fetch the latest state via API.
if attempts > 0 {
existing, err = r.ServingClientSet.AutoscalingV1alpha1().Metrics(desired.Namespace).Get(desired.Name, metav1.GetOptions{})
if err != nil {
return err
}
}

// If there's nothing to update, just return.
if reflect.DeepEqual(existing.Status, desired.Status) {
return nil
}

existing.Status = desired.Status
_, err = r.ServingClientSet.AutoscalingV1alpha1().Metrics(existing.Namespace).UpdateStatus(existing)
return err
}
if equality.Semantic.DeepEqual(ex.Status, m.Status) {
// no-op
return nil
}
ex = ex.DeepCopy()
ex.Status = m.Status
_, err = r.ServingClientSet.AutoscalingV1alpha1().Metrics(ex.Namespace).UpdateStatus(ex)
return err
})
}
32 changes: 32 additions & 0 deletions pkg/reconciler/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"k8s.io/client-go/util/retry"
)

// RetryUpdateConflicts retries the inner function if it returns conflict errors.
// This can be used to retry status updates without constantly reenqueuing keys.
func RetryUpdateConflicts(updater func(int) error) error {
attempts := 0
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
Comment thread
vagababov marked this conversation as resolved.
err := updater(attempts)
attempts++
return err
})
}
80 changes: 80 additions & 0 deletions pkg/reconciler/retry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
Copyright 2019 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package reconciler

import (
"errors"
"testing"

apierrs "k8s.io/apimachinery/pkg/api/errors"

v1alpha1 "knative.dev/serving/pkg/apis/autoscaling/v1alpha1"
)

func TestRetryUpdateConflicts(t *testing.T) {
errAny := errors.New("foo")
errConflict := apierrs.NewConflict(v1alpha1.Resource("foo"), "bar", errAny)

tests := []struct {
name string
returns []error
want error
wantAttempts int
}{{
name: "all good",
returns: []error{nil},
want: nil,
wantAttempts: 1,
}, {
name: "not retry on non-conflict error",
returns: []error{errAny},
want: errAny,
wantAttempts: 1,
}, {
name: "retry up to 5 times on conflicts",
returns: []error{errConflict, errConflict, errConflict, errConflict, errConflict, errConflict},
want: errConflict,
wantAttempts: 5,
}, {
name: "eventually succeed",
returns: []error{errConflict, errConflict, nil},
want: nil,
wantAttempts: 3,
}, {
name: "eventually fail",
returns: []error{errConflict, errConflict, errAny},
want: errAny,
wantAttempts: 3,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
attempts := 0
got := RetryUpdateConflicts(func(i int) error {
attempts++
return test.returns[i]
})

if got != test.want {
t.Errorf("RetryUpdateConflicts() = %v, want %v", got, test.want)
}
if attempts != test.wantAttempts {
t.Errorf("attempts = %d, want %d", attempts, test.wantAttempts)
}
})
}
}
Loading