From 789ed6f9e6a4b03dd5e6544e394e9f5e0c32fb9e Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Fri, 22 Jun 2018 04:36:06 +0000 Subject: [PATCH] Make Deployment reconciliation level-based. This starts to push the current `switch` over `ServingState` in the main `Reconcile` method into the methods that reconcile individual resources. As `reconcileFoo` handles the complete reconciliation of the `Revision` `Foo` resource, we will hoist it above the switch until the `{create,delete}K8sResources` methods are empty, at which point we will delete the switch. --- pkg/controller/configuration/configuration.go | 12 +- pkg/controller/revision/revision.go | 302 ++++++++++-------- pkg/controller/revision/revision_test.go | 102 +++++- pkg/controller/route/route.go | 12 +- pkg/controller/service/service.go | 11 +- 5 files changed, 279 insertions(+), 160 deletions(-) diff --git a/pkg/controller/configuration/configuration.go b/pkg/controller/configuration/configuration.go index d9b3edc7b31b..35a4e515bda4 100644 --- a/pkg/controller/configuration/configuration.go +++ b/pkg/controller/configuration/configuration.go @@ -31,6 +31,7 @@ import ( "github.com/knative/serving/pkg/logging/logkey" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -107,7 +108,7 @@ func (c *Controller) Reconcile(key string) error { ctx := logging.WithLogger(context.TODO(), logger) // Get the Configuration resource with this namespace/name - config, err := c.configurationLister.Configurations(namespace).Get(name) + original, err := c.configurationLister.Configurations(namespace).Get(name) if errors.IsNotFound(err) { // The resource no longer exists, in which case we stop processing. runtime.HandleError(fmt.Errorf("configuration %q in work queue no longer exists", key)) @@ -117,12 +118,17 @@ func (c *Controller) Reconcile(key string) error { } // Don't modify the informer's copy. - config = config.DeepCopy() + config := original.DeepCopy() // Reconcile this copy of the configuration and then write back any status // updates regardless of whether the reconciliation errored out. err = c.reconcile(ctx, config) - if _, err := c.updateStatus(config); err != nil { + if equality.Semantic.DeepEqual(original.Status, config.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := c.updateStatus(config); err != nil { logger.Warn("Failed to update configuration status", zap.Error(err)) return err } diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 365c5ce7e692..d01fe02d1fae 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -25,6 +25,11 @@ import ( "sync" "time" + // TODO(mattmoor): Used by the commented checkAndUpdateDeployment logic below. + // "github.com/google/go-cmp/cmp" + // "github.com/google/go-cmp/cmp/cmpopts" + // "k8s.io/apimachinery/pkg/api/resource" + "github.com/knative/serving/pkg" "github.com/josephburnett/k8sflag/pkg/k8sflag" @@ -32,6 +37,7 @@ import ( "github.com/knative/serving/pkg/logging" "github.com/knative/serving/pkg/logging/logkey" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/api/equality" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -48,6 +54,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" + appsv1listers "k8s.io/client-go/listers/apps/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -100,8 +107,9 @@ type Controller struct { vpaClient vpa.Interface // lister indexes properties about Revision - revisionLister listers.RevisionLister - buildLister buildlisters.BuildLister + revisionLister listers.RevisionLister + buildLister buildlisters.BuildLister + deploymentLister appsv1listers.DeploymentLister buildtracker *buildTracker @@ -189,6 +197,7 @@ func NewController( vpaClient: vpaClient, revisionLister: revisionInformer.Lister(), buildLister: buildInformer.Lister(), + deploymentLister: deploymentInformer.Lister(), buildtracker: &buildTracker{builds: map[key]set{}}, resolver: &digestResolver{client: opt.KubeClientSet, transport: http.DefaultTransport}, controllerConfig: controllerConfig, @@ -217,12 +226,11 @@ func NewController( }, }) - deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - c.SyncDeployment(obj.(*appsv1.Deployment)) - }, - UpdateFunc: func(old, new interface{}) { - c.SyncDeployment(new.(*appsv1.Deployment)) + deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: controller.Filter("Revision"), + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: c.EnqueueControllerOf, + UpdateFunc: controller.PassNew(c.EnqueueControllerOf), }, }) @@ -263,7 +271,7 @@ func (c *Controller) Reconcile(key string) error { logger.Info("Running reconcile Revision") // Get the Revision resource with this namespace/name - rev, err := c.revisionLister.Revisions(namespace).Get(name) + original, err := c.revisionLister.Revisions(namespace).Get(name) // The resource may no longer exist, in which case we stop processing. if errors.IsNotFound(err) { runtime.HandleError(fmt.Errorf("revision %q in work queue no longer exists", key)) @@ -272,12 +280,17 @@ func (c *Controller) Reconcile(key string) error { return err } // Don't modify the informer's copy. - rev = rev.DeepCopy() + rev := original.DeepCopy() // Reconcile this copy of the revision and then write back any status // updates regardless of whether the reconciliation errored out. err = c.reconcile(ctx, rev) - if _, err := c.updateStatus(rev); err != nil { + if equality.Semantic.DeepEqual(original.Status, rev.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := c.updateStatus(rev); err != nil { logger.Warn("Failed to update revision status", zap.Error(err)) return err } @@ -322,9 +335,14 @@ func (c *Controller) reconcile(ctx context.Context, rev *v1alpha1.Revision) erro if bc == nil || bc.Status == corev1.ConditionTrue { // There is no build, or the build completed successfully. + // Set up the user resources + if err := c.reconcileDeployment(ctx, rev); err != nil { + logger.Error("Failed to create a deployment", zap.Error(err)) + return err + } + switch rev.Spec.ServingState { case v1alpha1.RevisionServingStateActive: - logger.Info("Creating or reconciling resources for revision") return c.createK8SResources(ctx, rev) case v1alpha1.RevisionServingStateReserve: @@ -338,6 +356,7 @@ func (c *Controller) reconcile(ctx context.Context, rev *v1alpha1.Revision) erro logger.Errorf("Unknown serving state: %v", rev.Spec.ServingState) } } + return nil } @@ -361,40 +380,6 @@ func (c *Controller) EnqueueBuildTrackers(obj interface{}) { } } -func (c *Controller) SyncDeployment(deployment *appsv1.Deployment) { - cond := getDeploymentProgressCondition(deployment) - if cond == nil { - return - } - - or := metav1.GetControllerOf(deployment) - if or == nil || or.Kind != "Revision" { - return - } - - // Get the handle of Revision in context - revName := or.Name - namespace := deployment.Namespace - logger := loggerWithRevisionInfo(c.Logger, namespace, revName) - - rev, err := c.revisionLister.Revisions(namespace).Get(revName) - if err != nil { - logger.Error("Error fetching revision", zap.Error(err)) - return - } - //Set the revision condition reason to ProgressDeadlineExceeded - rev.Status.MarkProgressDeadlineExceeded( - fmt.Sprintf("Unable to create pods for more than %d seconds.", progressDeadlineSeconds)) - - logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions) - if _, err := c.updateStatus(rev); err != nil { - logger.Error("Error recording revision completion", zap.Error(err)) - return - } - c.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", "Revision %s not ready due to Deployment timeout", revName) - return -} - func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) { eName := endpoint.Name namespace := endpoint.Namespace @@ -454,39 +439,26 @@ func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) { func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) - logger.Info("Deleting the resources for revision") - if err := c.deleteDeployment(ctx, rev); err != nil { - logger.Error("Failed to delete a deployment", zap.Error(err)) + // Delete the user resources + if err := c.deleteService(ctx, rev); err != nil { + logger.Error("Failed to delete k8s service", zap.Error(err)) return err } - logger.Info("Deleted deployment") + // Delete the resources we set up to autoscale the user resources. if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil { logger.Error("Failed to delete autoscaler Deployment", zap.Error(err)) return err } - logger.Info("Deleted autoscaler Deployment") - if err := c.deleteAutoscalerService(ctx, rev); err != nil { logger.Error("Failed to delete autoscaler Service", zap.Error(err)) return err } - logger.Info("Deleted autoscaler Service") - - if c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() { - if err := c.deleteVpa(ctx, rev); err != nil { - logger.Error("Failed to delete VPA", zap.Error(err)) - return err - } - logger.Info("Deleted VPA") - } - - if err := c.deleteService(ctx, rev); err != nil { - logger.Error("Failed to delete k8s service", zap.Error(err)) + if err := c.deleteVpa(ctx, rev); err != nil { + logger.Error("Failed to delete VPA", zap.Error(err)) return err } - logger.Info("Deleted service") // And the deployment is no longer ready, so update that rev.Status.MarkInactive() @@ -496,13 +468,14 @@ func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revis func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) - // Fire off a Deployment.. - if err := c.reconcileDeployment(ctx, rev); err != nil { - logger.Error("Failed to create a deployment", zap.Error(err)) + + // Set up the user resources + if err := c.reconcileService(ctx, rev); err != nil { + logger.Error("Failed to create k8s service", zap.Error(err)) return err } - // Autoscale the service + // Set up resources to autoscale the user resources. if err := c.reconcileAutoscalerDeployment(ctx, rev); err != nil { logger.Error("Failed to create autoscaler Deployment", zap.Error(err)) return err @@ -511,86 +484,93 @@ func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revis logger.Error("Failed to create autoscaler Service", zap.Error(err)) return err } - if c.controllerConfig.EnableVarLogCollection { - if err := c.reconcileFluentdConfigMap(ctx, rev); err != nil { - logger.Error("Failed to create fluent config map", zap.Error(err)) - return err - } - } - - // Vertically autoscale the revision pods - if c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() { - if err := c.reconcileVpa(ctx, rev); err != nil { - logger.Error("Failed to create the vertical pod autoscaler for Deployment", zap.Error(err)) - return err - } - } - - // Create k8s service - if err := c.reconcileService(ctx, rev); err != nil { - logger.Error("Failed to create k8s service", zap.Error(err)) + if err := c.reconcileVpa(ctx, rev); err != nil { + logger.Error("Failed to create the vertical pod autoscaler for Deployment", zap.Error(err)) return err } - // Check to see if the revision has already been marked as ready and - // don't mark it if it's already ready. - // TODO: could always fetch the endpoint again and double-check it is still - // ready. - if rev.Status.IsReady() { - return nil - } - - // Checking existing revision condition to see if it is the initial deployment or - // during the reactivating process. If a revision is in condition "Inactive" or "Activating", - // we need to route traffic to the activator; if a revision is in condition "Deploying", - // we need to route traffic to the revision directly. - reason := "Deploying" - if cond := rev.Status.GetCondition(v1alpha1.RevisionConditionReady); cond != nil { - if (cond.Reason == "Inactive" && cond.Status == corev1.ConditionFalse) || - (cond.Reason == "Activating" && cond.Status == corev1.ConditionUnknown) { - reason = "Activating" - } - } - rev.Status.MarkDeploying(reason) - - return nil -} - -func (c *Controller) deleteDeployment(ctx context.Context, rev *v1alpha1.Revision) error { - logger := logging.FromContext(ctx) - deploymentName := controller.GetRevisionDeploymentName(rev) - ns := controller.GetServingNamespaceName(rev.Namespace) - - err := c.KubeClientSet.AppsV1().Deployments(ns).Delete(deploymentName, fgDeleteOptions) - if apierrs.IsNotFound(err) { - return nil - } else if err != nil { - logger.Errorf("deployments.Delete for %q failed: %s", deploymentName, err) + // Ensure our namespace has the configuration for the fluentd sidecar. + if err := c.reconcileFluentdConfigMap(ctx, rev); err != nil { + logger.Error("Failed to create fluent config map", zap.Error(err)) return err } - logger.Infof("Deleted Deployment %q", deploymentName) + return nil } func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) ns := controller.GetServingNamespaceName(rev.Namespace) - dc := c.KubeClientSet.AppsV1().Deployments(ns) - // First, check if deployment exists already. deploymentName := controller.GetRevisionDeploymentName(rev) - if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil { - if !apierrs.IsNotFound(err) { - logger.Errorf("deployments.Get for %q failed: %s", deploymentName, err) + deployment, err := c.deploymentLister.Deployments(ns).Get(deploymentName) + switch rev.Spec.ServingState { + case v1alpha1.RevisionServingStateActive: + // When Active, the Deployment should exist and have a particular specification. + if apierrs.IsNotFound(err) { + // If it does not exist, then create it. + rev.Status.MarkDeploying("Deploying") + deployment, err = c.createDeployment(ctx, rev) + if err != nil { + logger.Errorf("Error creating Deployment %q: %v", deploymentName, err) + return err + } + logger.Infof("Created Deployment %q", deploymentName) + } else if err != nil { + logger.Errorf("Error reconciling Active Deployment %q: %v", deploymentName, err) return err + } else { + // TODO(mattmoor): Don't reconcile Deployments until we can avoid fighting with + // its defaulter. + // // If it exists, then make sure if looks as we expect. + // // It may change if a user edits things around our controller, which we + // // should not allow, or if our expectations of how the deployment should look + // // changes (e.g. we update our controller with new sidecars). + // var updated bool + // deployment, updated, err = c.checkAndUpdateDeployment(ctx, rev, deployment) + // if err != nil { + // logger.Errorf("Error updating Deployment %q: %v", deploymentName, err) + // return err + // } + // if updated { + // logger.Infof("Updated Deployment %q", deploymentName) + // rev.Status.MarkDeploying("Updating") + // } } - logger.Infof("Deployment %q doesn't exist, creating", deploymentName) - } else { - // TODO(mattmoor): Compare the deployments and update if it has changed - // out from under us. - logger.Infof("Found existing deployment %q", deploymentName) + + // Now that we have a Deployment, determine whether there is any relevant + // status to surface in the Revision. + if cond := getDeploymentProgressCondition(deployment); cond != nil { + rev.Status.MarkProgressDeadlineExceeded(fmt.Sprintf( + "Unable to create pods for more than %d seconds.", progressDeadlineSeconds)) + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "ProgressDeadlineExceeded", + "Revision %s not ready due to Deployment timeout", rev.Name) + } + return nil + + case v1alpha1.RevisionServingStateReserve, v1alpha1.RevisionServingStateRetired: + // When Reserve or Retired, we remove the underlying Deployment. + if apierrs.IsNotFound(err) { + // If it does not exist, then we have nothing to do. + return nil + } + if err := c.deleteDeployment(ctx, deployment); err != nil { + logger.Errorf("Error deleting Deployment %q: %v", deploymentName, err) + return err + } + logger.Infof("Deleted Deployment %q", deploymentName) + rev.Status.MarkInactive() + return nil + + default: + logger.Errorf("Unknown serving state: %v", rev.Spec.ServingState) return nil } +} + +func (c *Controller) createDeployment(ctx context.Context, rev *v1alpha1.Revision) (*appsv1.Deployment, error) { + logger := logging.FromContext(ctx) + ns := controller.GetServingNamespaceName(rev.Namespace) // Create the deployment. deployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig) @@ -599,13 +579,51 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi if err := c.resolver.Resolve(deployment); err != nil { logger.Error("Error resolving deployment", zap.Error(err)) rev.Status.MarkContainerMissing(err.Error()) - return err + return nil, fmt.Errorf("Error resolving container to digest: %v", err) } - logger.Infof("Creating Deployment: %q", deployment.Name) - _, createErr := dc.Create(deployment) + return c.KubeClientSet.AppsV1().Deployments(ns).Create(deployment) +} + +// TODO(mattmoor): See the comment at the commented call site above. +// func (c *Controller) checkAndUpdateDeployment(ctx context.Context, rev *v1alpha1.Revision, deployment *appsv1.Deployment) (*appsv1.Deployment, bool, error) { +// logger := logging.FromContext(ctx) + +// desiredDeployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig) - return createErr +// // Copy the userContainerImage digest and the replica count. +// // We don't want autoscaling differences or user updates to the image tag +// // to trigger redeployments. +// desiredDeployment.Spec.Replicas = deployment.Spec.Replicas +// pod := desiredDeployment.Spec.Template.Spec +// for i := range pod.Containers { +// if pod.Containers[i].Name == userContainerName { +// pod.Containers[i].Image = desiredDeployment.Spec.Template.Spec.Containers[i].Image +// } +// } + +// if equality.Semantic.DeepEqual(desiredDeployment.Spec, deployment.Spec) { +// return deployment, false, nil +// } +// logger.Infof("Reconciling deployment diff (-desired, +observed): %v", +// cmp.Diff(desiredDeployment.Spec, deployment.Spec, cmpopts.IgnoreUnexported(resource.Quantity{}))) +// deployment.Spec = desiredDeployment.Spec + +// d, err := c.KubeClientSet.AppsV1().Deployments(deployment.Namespace).Update(deployment) +// return d, true, err +// } + +func (c *Controller) deleteDeployment(ctx context.Context, deployment *appsv1.Deployment) error { + logger := logging.FromContext(ctx) + + err := c.KubeClientSet.AppsV1().Deployments(deployment.Namespace).Delete(deployment.Name, fgDeleteOptions) + if apierrs.IsNotFound(err) { + return nil + } else if err != nil { + logger.Errorf("deployments.Delete for %q failed: %s", deployment.Name, err) + return err + } + return nil } func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error { @@ -629,6 +647,7 @@ func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revisio ns := controller.GetServingNamespaceName(rev.Namespace) sc := c.KubeClientSet.CoreV1().Services(ns) serviceName := controller.GetServingK8SServiceNameForRevision(rev) + rev.Status.ServiceName = serviceName if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil { @@ -652,6 +671,9 @@ func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revisio func (c *Controller) reconcileFluentdConfigMap(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) + if !c.controllerConfig.EnableVarLogCollection { + return nil + } ns := rev.Namespace // One ConfigMap for Fluentd sidecar per namespace. It has multiple owner @@ -789,6 +811,10 @@ func (c *Controller) reconcileAutoscalerDeployment(ctx context.Context, rev *v1a func (c *Controller) deleteVpa(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) + if !c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() { + return nil + } + vpaName := controller.GetRevisionVpaName(rev) ns := rev.Namespace @@ -805,6 +831,10 @@ func (c *Controller) deleteVpa(ctx context.Context, rev *v1alpha1.Revision) erro func (c *Controller) reconcileVpa(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) + if !c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() { + return nil + } + vpaName := controller.GetRevisionVpaName(rev) vs := c.vpaClient.PocV1alpha1().VerticalPodAutoscalers(rev.Namespace) _, err := vs.Get(vpaName, metav1.GetOptions{}) diff --git a/pkg/controller/revision/revision_test.go b/pkg/controller/revision/revision_test.go index 92308cfa8b3f..a09dcb00f9b5 100644 --- a/pkg/controller/revision/revision_test.go +++ b/pkg/controller/revision/revision_test.go @@ -1230,11 +1230,9 @@ func TestCreateRevWithInvalidBuildNameFails(t *testing.T) { } func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) - var testProgressDeadlineSeconds int32 = 10 - rev := getTestRevision() revClient.Create(rev) @@ -1243,6 +1241,11 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) controller.Reconcile(KeyOrDie(rev)) + rev, err := revClient.Get(rev.Name, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Couldn't get revision: %v", err) + } + // Look for revision's deployment. deploymentNameToLook := ctrl.GetRevisionDeploymentName(rev) @@ -1250,14 +1253,13 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { if err != nil { t.Fatalf("Couldn't get ela deployment: %v", err) } + elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) + kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) if len(deployment.OwnerReferences) != 1 && rev.Name != deployment.OwnerReferences[0].Name { t.Errorf("expected owner references to have 1 ref with name %s", rev.Name) } - - //set ProgressDeadlineSeconds on Dep spec - deployment.Spec.ProgressDeadlineSeconds = &testProgressDeadlineSeconds - controller.SyncDeployment(deployment) + controller.Reconcile(KeyOrDie(rev)) rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -1278,8 +1280,75 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { } } +// TODO(mattmoor): This is meant to test the checkAndUpdateDeployment logic in the, +// Revision controller. However, this logic is commented out because in practice it +// fights with the defaulting logic for a Deployment. +// func TestDeploymentCorrection(t *testing.T) { +// kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) +// revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) + +// rev := getTestRevision() + +// revClient.Create(rev) + +// // Since Reconcile looks in the lister, we need to add it to the informer +// elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) +// controller.Reconcile(KeyOrDie(rev)) + +// rev, err := revClient.Get(rev.Name, metav1.GetOptions{}) +// if err != nil { +// t.Fatalf("Couldn't get revision: %v", err) +// } + +// // Look for revision's deployment. +// deploymentNameToLook := ctrl.GetRevisionDeploymentName(rev) + +// deployment, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) +// if err != nil { +// t.Fatalf("Couldn't get ela deployment: %v", err) +// } + +// // First make a change that we don't expect the Revision controller to reconcile. +// var tmp int32 = 37 +// deployment.Spec.Replicas = &tmp +// // Then create a copy to compare against. +// want := deployment.DeepCopy() +// // Lastly, make an edit we expect the controller to revert. +// deployment.Spec.Template.Spec.Containers[0].Image = "busybox" + +// elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) +// kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + +// controller.Reconcile(KeyOrDie(rev)) + +// got, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) +// if err != nil { +// t.Fatalf("Couldn't get ela deployment: %v", err) +// } +// if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { +// t.Errorf("Unexpected revision conditions diff (-want +got): %v", diff) +// } + +// rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) +// if err != nil { +// t.Fatalf("Couldn't get revision: %v", err) +// } +// for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { +// got := rev2Inspect.Status.GetCondition(ct) +// want := &v1alpha1.RevisionCondition{ +// Type: ct, +// Status: corev1.ConditionUnknown, +// Reason: "Updating", +// LastTransitionTime: got.LastTransitionTime, +// } +// if diff := cmp.Diff(want, got); diff != "" { +// t.Errorf("Unexpected revision conditions diff (-want +got): %v", diff) +// } +// } +// } + func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) rev := getTestRevision() @@ -1308,7 +1377,8 @@ func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { Status: corev1.ConditionFalse, Reason: "ProgressDeadlineExceeded", }} - controller.SyncDeployment(deployment) + kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + controller.Reconcile(KeyOrDie(rev)) rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -1506,7 +1576,7 @@ func TestAuxiliaryEndpointDoesNotUpdateRev(t *testing.T) { } func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() // Create revision and verify that the k8s resources are created as @@ -1514,10 +1584,11 @@ func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { createRevision(elaClient, elaInformer, controller, rev) expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - _, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err != nil { t.Fatalf("Couldn't get ela deployment: %v", err) } + kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) // Now, update the revision serving state to Retired, and force another // run of the controller. @@ -1525,7 +1596,7 @@ func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { updateRevision(elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + deployment, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err == nil { t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment) @@ -1533,7 +1604,7 @@ func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { } func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() // Create revision and verify that the k8s resources are created as @@ -1541,10 +1612,11 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { createRevision(elaClient, elaInformer, controller, rev) expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - _, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err != nil { t.Fatalf("Couldn't get ela deployment: %v", err) } + kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) // Now, update the revision serving state to Reserve, and force another // run of the controller. @@ -1552,7 +1624,7 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { updateRevision(elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + deployment, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err == nil { t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment) } diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index aef32bd4619c..ce3a1b3f985c 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -28,6 +28,7 @@ import ( "github.com/josephburnett/k8sflag/pkg/k8sflag" corev1 "k8s.io/api/core/v1" v1beta1 "k8s.io/api/extensions/v1beta1" + "k8s.io/apimachinery/pkg/api/equality" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/runtime" @@ -177,7 +178,7 @@ func (c *Controller) updateRouteEvent(key string) error { ctx := logging.WithLogger(context.TODO(), logger) // Get the Route resource with this namespace/name - route, err := c.routeLister.Routes(namespace).Get(name) + original, err := c.routeLister.Routes(namespace).Get(name) if err != nil { // The resource may no longer exist, in which case we stop // processing. @@ -188,12 +189,17 @@ func (c *Controller) updateRouteEvent(key string) error { return err } // Don't modify the informers copy - route = route.DeepCopy() + route := original.DeepCopy() // Reconcile this copy of the route and then write back any status // updates regardless of whether the reconciliation errored out. err = c.reconcile(ctx, route) - if _, err := c.updateStatus(ctx, route); err != nil { + if equality.Semantic.DeepEqual(original.Status, route.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := c.updateStatus(ctx, route); err != nil { return err } return err diff --git a/pkg/controller/service/service.go b/pkg/controller/service/service.go index 4ea046361599..38990816666c 100644 --- a/pkg/controller/service/service.go +++ b/pkg/controller/service/service.go @@ -122,7 +122,7 @@ func (c *Controller) Reconcile(key string) error { ctx := logging.WithLogger(context.TODO(), logger) // Get the Service resource with this namespace/name - service, err := c.serviceLister.Services(namespace).Get(name) + original, err := c.serviceLister.Services(namespace).Get(name) if apierrs.IsNotFound(err) { // The resource may no longer exist, in which case we stop processing. runtime.HandleError(fmt.Errorf("service %q in work queue no longer exists", key)) @@ -132,12 +132,17 @@ func (c *Controller) Reconcile(key string) error { } // Don't modify the informers copy - service = service.DeepCopy() + service := original.DeepCopy() // Reconcile this copy of the service and then write back any status // updates regardless of whether the reconciliation errored out. err = c.reconcile(ctx, service) - if _, err := c.updateStatus(service); err != nil { + if equality.Semantic.DeepEqual(original.Status, service.Status) { + // If we didn't change anything then don't call updateStatus. + // This is important because the copy we loaded from the informer's + // cache may be stale and we don't want to overwrite a prior update + // to status with this stale state. + } else if _, err := c.updateStatus(service); err != nil { logger.Warn("Failed to update service status", zap.Error(err)) return err }