From 17825bfef66ec6d37ab494ec4d70f794b31ddb25 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Fri, 22 Jun 2018 21:07:05 +0000 Subject: [PATCH] This hoists the K8s Service reconciliation and makes it level-based. This is the next phase of our level-based migration of the Revision controller. It includes the K8s Service for the user Deployment. This is patterned in much the same way as the Deployment refactor in #1322. The most notable difference in this change is that we establish readiness from the Endpoints resource instead of the Service resource directly. In addition, I spent a bit of time cleaning up some of the `revision_test.go` code to more automatically update the informers as resources are reconciled. In particular, after `controller.Reconcile` we need to make sure all of the updates to the underlying objects are reflected in our informers, or a subsequent reconcile may act on stale state. To facilitate this, I added a helper that updates the informers with the Revision/Deployment/Service to be used after a `controller.Reconcile`. --- cmd/controller/main.go | 5 +- pkg/controller/revision/revision.go | 250 ++++++++------ pkg/controller/revision/revision_test.go | 421 +++++++++++------------ 3 files changed, 352 insertions(+), 324 deletions(-) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index cb7cb6cddf2e..c7165f928342 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -167,6 +167,7 @@ func main() { configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps() deploymentInformer := kubeInformerFactory.Apps().V1().Deployments() endpointsInformer := kubeInformerFactory.Core().V1().Endpoints() + coreServiceInformer := kubeInformerFactory.Core().V1().Services() ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses() vpaInformer := vpaInformerFactory.Poc().V1alpha1().VerticalPodAutoscalers() @@ -175,7 +176,8 @@ func main() { controllers := []controller.Interface{ configuration.NewController(opt, configurationInformer, revisionInformer, cfg), revision.NewController(opt, vpaClient, revisionInformer, buildInformer, configMapInformer, - deploymentInformer, endpointsInformer, vpaInformer, cfg, &revControllerConfig), + deploymentInformer, coreServiceInformer, endpointsInformer, vpaInformer, + cfg, &revControllerConfig), route.NewController(opt, routeInformer, configurationInformer, ingressInformer, configMapInformer, cfg, autoscaleEnableScaleToZero), service.NewController(opt, serviceInformer, configurationInformer, routeInformer, cfg), @@ -197,6 +199,7 @@ func main() { buildInformer.Informer().HasSynced, configMapInformer.Informer().HasSynced, deploymentInformer.Informer().HasSynced, + coreServiceInformer.Informer().HasSynced, endpointsInformer.Informer().HasSynced, ingressInformer.Informer().HasSynced, } { diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 6687e6e3d3b5..ddee7ba0ec0a 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -26,12 +26,12 @@ import ( "time" // TODO(mattmoor): Used by the commented checkAndUpdateDeployment logic below. - // "github.com/google/go-cmp/cmp" // "github.com/google/go-cmp/cmp/cmpopts" // "k8s.io/apimachinery/pkg/api/resource" "github.com/knative/serving/pkg" + "github.com/google/go-cmp/cmp" "github.com/josephburnett/k8sflag/pkg/k8sflag" "github.com/knative/serving/pkg/apis/serving" "github.com/knative/serving/pkg/logging" @@ -55,6 +55,7 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/runtime" appsv1listers "k8s.io/client-go/listers/apps/v1" + corev1listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" @@ -110,6 +111,8 @@ type Controller struct { revisionLister listers.RevisionLister buildLister buildlisters.BuildLister deploymentLister appsv1listers.DeploymentLister + serviceLister corev1listers.ServiceLister + endpointsLister corev1listers.EndpointsLister buildtracker *buildTracker @@ -182,6 +185,7 @@ func NewController( buildInformer buildinformers.BuildInformer, configMapInformer corev1informers.ConfigMapInformer, deploymentInformer appsv1informers.DeploymentInformer, + serviceInformer corev1informers.ServiceInformer, endpointsInformer corev1informers.EndpointsInformer, vpaInformer vpav1alpha1informers.VerticalPodAutoscalerInformer, config *rest.Config, @@ -198,6 +202,8 @@ func NewController( revisionLister: revisionInformer.Lister(), buildLister: buildInformer.Lister(), deploymentLister: deploymentInformer.Lister(), + serviceLister: serviceInformer.Lister(), + endpointsLister: endpointsInformer.Lister(), buildtracker: &buildTracker{builds: map[key]set{}}, resolver: &digestResolver{client: opt.KubeClientSet, transport: http.DefaultTransport}, controllerConfig: controllerConfig, @@ -218,12 +224,8 @@ func NewController( }) endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - c.SyncEndpoints(obj.(*corev1.Endpoints)) - }, - UpdateFunc: func(old, new interface{}) { - c.SyncEndpoints(new.(*corev1.Endpoints)) - }, + AddFunc: c.EnqueueEndpointsRevision, + UpdateFunc: controller.PassNew(c.EnqueueEndpointsRevision), }) deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ @@ -340,6 +342,10 @@ func (c *Controller) reconcile(ctx context.Context, rev *v1alpha1.Revision) erro logger.Error("Failed to create a deployment", zap.Error(err)) return err } + if err := c.reconcileService(ctx, rev); err != nil { + logger.Error("Failed to create k8s service", zap.Error(err)) + return err + } switch rev.Spec.ServingState { case v1alpha1.RevisionServingStateActive: @@ -380,72 +386,19 @@ func (c *Controller) EnqueueBuildTrackers(obj interface{}) { } } -func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) { - eName := endpoint.Name - namespace := endpoint.Namespace - // Lookup and see if this endpoints corresponds to a service that - // we own and hence the Revision that created this service. - revName := lookupServiceOwner(endpoint) - if revName == "" { - return - } - logger := loggerWithRevisionInfo(c.Logger, namespace, revName) - - rev, err := c.revisionLister.Revisions(namespace).Get(revName) - if err != nil { - logger.Error("Error fetching revision", zap.Error(err)) - return - } - - // Check to see if endpoint is the service endpoint - if eName != controller.GetServingK8SServiceNameForRevision(rev) { - return - } - - // Check to see if the revision has already been marked as ready or failed - // and if it is, then there's no need to do anything to it. - if c := rev.Status.GetCondition(v1alpha1.RevisionConditionReady); c != nil && c.Status != corev1.ConditionUnknown { - return - } - - // Don't modify the informer's copy. - rev = rev.DeepCopy() - - if getIsServiceReady(endpoint) { - logger.Infof("Endpoint %q is ready", eName) - rev.Status.MarkResourcesAvailable() - rev.Status.MarkContainerHealthy() - if _, err := c.updateStatus(rev); err != nil { - logger.Error("Error marking revision ready", zap.Error(err)) - return - } - c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name) - return - } - - revisionAge := time.Now().Sub(getRevisionLastTransitionTime(rev)) - if revisionAge < serviceTimeoutDuration { - return +func (c *Controller) EnqueueEndpointsRevision(obj interface{}) { + endpoints := obj.(*corev1.Endpoints) + // Use the label on the Endpoints (from Service) to determine whether it is + // owned by a Revision, and if so queue that Revision. + if revisionName, ok := endpoints.Labels[serving.RevisionLabelKey]; ok { + c.EnqueueKey(endpoints.Namespace + "/" + revisionName) } - rev.Status.MarkServiceTimeout() - if _, err := c.updateStatus(rev); err != nil { - logger.Error("Error marking revision failed", zap.Error(err)) - return - } - c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name) - return } func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) - // Delete the user resources - if err := c.deleteService(ctx, rev); err != nil { - logger.Error("Failed to delete k8s service", zap.Error(err)) - return err - } - // Delete the resources we set up to autoscale the user resources. if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil { logger.Error("Failed to delete autoscaler Deployment", zap.Error(err)) @@ -469,12 +422,6 @@ func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revis func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) - // Set up the user resources - if err := c.reconcileService(ctx, rev); err != nil { - logger.Error("Failed to create k8s service", zap.Error(err)) - return err - } - // Set up resources to autoscale the user resources. if err := c.reconcileAutoscalerDeployment(ctx, rev); err != nil { logger.Error("Failed to create autoscaler Deployment", zap.Error(err)) @@ -626,47 +573,138 @@ func (c *Controller) deleteDeployment(ctx context.Context, deployment *appsv1.De return nil } -func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error { - logger := logging.FromContext(ctx) - serviceName := controller.GetServingK8SServiceNameForRevision(rev) - ns := controller.GetServingNamespaceName(rev.Namespace) - - err := c.KubeClientSet.CoreV1().Services(ns).Delete(serviceName, fgDeleteOptions) - if apierrs.IsNotFound(err) { - return nil - } else if err != nil { - logger.Errorf("service.Delete for %q failed: %s", serviceName, err) - return err - } - logger.Infof("Deleted service %q", serviceName) - return nil -} - func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) ns := controller.GetServingNamespaceName(rev.Namespace) - sc := c.KubeClientSet.CoreV1().Services(ns) serviceName := controller.GetServingK8SServiceNameForRevision(rev) rev.Status.ServiceName = serviceName - if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil { - if !apierrs.IsNotFound(err) { - logger.Errorf("services.Get for %q failed: %s", serviceName, err) + service, err := c.serviceLister.Services(ns).Get(serviceName) + switch rev.Spec.ServingState { + case v1alpha1.RevisionServingStateActive: + // When Active, the Service should exist and have a particular specification. + if apierrs.IsNotFound(err) { + // If it does not exist, then create it. + rev.Status.MarkDeploying("Deploying") + service, err = c.createService(ctx, rev) + if err != nil { + logger.Errorf("Error creating Service %q: %v", serviceName, err) + return err + } + logger.Infof("Created Service %q", serviceName) + } else if err != nil { + logger.Errorf("Error reconciling Active Service %q: %v", serviceName, err) return err + } else { + // If it exists, then make sure if looks as we expect. + // It may change if a user edits things around our controller, which we + // should not allow, or if our expectations of how the service should look + // changes (e.g. we update our controller with new sidecars). + var updated bool + service, updated, err = c.checkAndUpdateService(ctx, rev, service) + if err != nil { + logger.Errorf("Error updating Service %q: %v", serviceName, err) + return err + } + if updated { + logger.Infof("Updated Service %q", serviceName) + rev.Status.MarkDeploying("Updating") + } } - logger.Infof("serviceName %q doesn't exist, creating", serviceName) - } else { - // TODO(vaikas): Check that the service is legit and matches what we expect - // to have there. - logger.Infof("Found existing service %q", serviceName) + + // We cannot determine readiness from the Service directly. Instead, we look up + // the backing Endpoints resource and check it for healthy pods. The name of the + // Endpoints resource matches the Service it backs. + endpoints, err := c.endpointsLister.Endpoints(ns).Get(serviceName) + if apierrs.IsNotFound(err) { + // If it isn't found, then we need to wait for the Service controller to + // create it. + rev.Status.MarkDeploying("Deploying") + return nil + } else if err != nil { + logger.Errorf("Error checking Active Endpoints %q: %v", serviceName, err) + return err + } + // If the endpoints resource indicates that the Service it sits in front of is ready, + // then surface this in our Revision status as resources available (pods were scheduled) + // and container healthy (endpoints should be gated by any provided readiness checks). + if getIsServiceReady(endpoints) { + rev.Status.MarkResourcesAvailable() + rev.Status.MarkContainerHealthy() + // TODO(mattmoor): How to ensure this only fires once? + c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", + "Revision becomes ready upon endpoint %q becoming ready", serviceName) + } else { + // If the endpoints is NOT ready, then check whether it is taking unreasonably + // long to become ready and if so mark our revision as having timed out waiting + // for the Service to become ready. + revisionAge := time.Now().Sub(getRevisionLastTransitionTime(rev)) + if revisionAge >= serviceTimeoutDuration { + rev.Status.MarkServiceTimeout() + // TODO(mattmoor): How to ensure this only fires once? + c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", + "Revision did not become ready due to endpoint %q", serviceName) + } + } + return nil + + case v1alpha1.RevisionServingStateReserve, v1alpha1.RevisionServingStateRetired: + // When Reserve or Retired, we remove the underlying Service. + if apierrs.IsNotFound(err) { + // If it does not exist, then we have nothing to do. + return nil + } + if err := c.deleteService(ctx, service); err != nil { + logger.Errorf("Error deleting Service %q: %v", serviceName, err) + return err + } + logger.Infof("Deleted Service %q", serviceName) + rev.Status.MarkInactive() + return nil + + default: + logger.Errorf("Unknown serving state: %v", rev.Spec.ServingState) return nil } +} +func (c *Controller) createService(ctx context.Context, rev *v1alpha1.Revision) (*corev1.Service, error) { + ns := controller.GetServingNamespaceName(rev.Namespace) + + // Create the service. service := MakeRevisionK8sService(rev) - logger.Infof("Creating service: %q", service.Name) - _, err := sc.Create(service) - return err + + return c.KubeClientSet.CoreV1().Services(ns).Create(service) +} + +func (c *Controller) checkAndUpdateService(ctx context.Context, rev *v1alpha1.Revision, service *corev1.Service) (*corev1.Service, bool, error) { + logger := logging.FromContext(ctx) + + desiredService := MakeRevisionK8sService(rev) + + if equality.Semantic.DeepEqual(desiredService.Spec, service.Spec) { + return service, false, nil + } + logger.Infof("Reconciling service diff (-desired, +observed): %v", + cmp.Diff(desiredService.Spec, service.Spec)) + service.Spec = desiredService.Spec + + d, err := c.KubeClientSet.CoreV1().Services(service.Namespace).Update(service) + return d, true, err +} + +func (c *Controller) deleteService(ctx context.Context, svc *corev1.Service) error { + logger := logging.FromContext(ctx) + + err := c.KubeClientSet.CoreV1().Services(svc.Namespace).Delete(svc.Name, fgDeleteOptions) + if apierrs.IsNotFound(err) { + return nil + } else if err != nil { + logger.Errorf("service.Delete for %q failed: %s", svc.Name, err) + return err + } + return nil } func (c *Controller) reconcileFluentdConfigMap(ctx context.Context, rev *v1alpha1.Revision) error { @@ -874,18 +912,6 @@ func (c *Controller) updateStatus(rev *v1alpha1.Revision) (*v1alpha1.Revision, e return rev, nil } -// Given an endpoint see if it's managed by us and return the -// revision that created it. -// TODO: Consider using OwnerReferences. -// https://github.com/kubernetes/sample-controller/blob/master/controller.go#L373-L384 -func lookupServiceOwner(endpoint *corev1.Endpoints) string { - // see if there's a label on this object marking it as ours. - if revisionName, ok := endpoint.Labels[serving.RevisionLabelKey]; ok { - return revisionName - } - return "" -} - func (c *Controller) addConfigMapEvent(obj interface{}) { configMap := obj.(*corev1.ConfigMap) if configMap.Namespace != pkg.GetServingSystemNamespace() || configMap.Name != controller.GetNetworkConfigMapName() { diff --git a/pkg/controller/revision/revision_test.go b/pkg/controller/revision/revision_test.go index b1e64f00bbdc..63c4d20dd0a1 100644 --- a/pkg/controller/revision/revision_test.go +++ b/pkg/controller/revision/revision_test.go @@ -258,6 +258,7 @@ func newTestControllerWithConfig(t *testing.T, controllerConfig *ControllerConfi buildInformer.Build().V1alpha1().Builds(), servingSystemInformer.Core().V1().ConfigMaps(), kubeInformer.Apps().V1().Deployments(), + kubeInformer.Core().V1().Services(), kubeInformer.Core().V1().Endpoints(), vpaInformer.Poc().V1alpha1().VerticalPodAutoscalers(), &rest.Config{}, @@ -284,18 +285,82 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) ( return newTestControllerWithConfig(t, &testControllerConfig, elaObjects...) } -func createRevision(elaClient *fakeclientset.Clientset, elaInformer informers.SharedInformerFactory, controller *Controller, rev *v1alpha1.Revision) { +func createRevision(t *testing.T, + kubeClient *fakekubeclientset.Clientset, kubeInformer kubeinformers.SharedInformerFactory, + elaClient *fakeclientset.Clientset, elaInformer informers.SharedInformerFactory, + controller *Controller, rev *v1alpha1.Revision) *v1alpha1.Revision { + t.Helper() elaClient.ServingV1alpha1().Revisions(rev.Namespace).Create(rev) // Since Reconcile looks in the lister, we need to add it to the informer elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - controller.Reconcile(KeyOrDie(rev)) + + if err := controller.Reconcile(KeyOrDie(rev)); err == nil { + rev, _, _ = addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + } + return rev } -func updateRevision(elaClient *fakeclientset.Clientset, elaInformer informers.SharedInformerFactory, controller *Controller, rev *v1alpha1.Revision) { +func updateRevision(t *testing.T, + kubeClient *fakekubeclientset.Clientset, kubeInformer kubeinformers.SharedInformerFactory, + elaClient *fakeclientset.Clientset, elaInformer informers.SharedInformerFactory, + controller *Controller, rev *v1alpha1.Revision) { + t.Helper() elaClient.ServingV1alpha1().Revisions(rev.Namespace).Update(rev) elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Update(rev) - controller.Reconcile(KeyOrDie(rev)) + if err := controller.Reconcile(KeyOrDie(rev)); err == nil { + addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + } +} + +func makeBackingEndpoints(t *testing.T, kubeClient *fakekubeclientset.Clientset, + kubeInformer kubeinformers.SharedInformerFactory, service *corev1.Service) *corev1.Endpoints { + endpoints := &corev1.Endpoints{ + ObjectMeta: service.ObjectMeta, + } + kubeClient.CoreV1().Endpoints(service.Namespace).Create(endpoints) + kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(endpoints) + return endpoints +} + +func addResourcesToInformers(t *testing.T, + kubeClient *fakekubeclientset.Clientset, kubeInformer kubeinformers.SharedInformerFactory, + elaClient *fakeclientset.Clientset, elaInformer informers.SharedInformerFactory, + rev *v1alpha1.Revision) (*v1alpha1.Revision, *appsv1.Deployment, *corev1.Service) { + t.Helper() + + rev, err := elaClient.ServingV1alpha1().Revisions(rev.Namespace).Get(rev.Name, metav1.GetOptions{}) + if err != nil { + t.Errorf("Revisions.Get(%v) = %v", rev.Name, err) + } + elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) + + haveBuild := (rev.Spec.BuildName != "") + inActive := (rev.Spec.ServingState != "Active") + + ns := ctrl.GetServingNamespaceName(rev.Namespace) + + deploymentName := ctrl.GetRevisionDeploymentName(rev) + deployment, err := kubeClient.AppsV1().Deployments(ns).Get(deploymentName, metav1.GetOptions{}) + if apierrs.IsNotFound(err) && (haveBuild || inActive) { + // If we're doing a Build this won't exist yet. + } else if err != nil { + t.Errorf("Deployments.Get(%v) = %v", deploymentName, err) + } else { + kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + } + + serviceName := ctrl.GetServingK8SServiceNameForRevision(rev) + service, err := kubeClient.CoreV1().Services(ns).Get(serviceName, metav1.GetOptions{}) + if apierrs.IsNotFound(err) && (haveBuild || inActive) { + // If we're doing a Build this won't exist yet. + } else if err != nil { + t.Errorf("Services.Get(%v) = %v", serviceName, err) + } else { + kubeInformer.Core().V1().Services().Informer().GetIndexer().Add(service) + } + + return rev, deployment, service } type fixedResolver struct { @@ -312,7 +377,7 @@ func (r *fixedResolver) Resolve(deploy *appsv1.Deployment) error { func TestCreateRevCreatesStuff(t *testing.T) { controllerConfig := getTestControllerConfig() - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) // Resolve image references to this "digest" digest := "foo@sha256:deadbeef" @@ -325,7 +390,7 @@ func TestCreateRevCreatesStuff(t *testing.T) { *ctrl.NewConfigurationControllerRef(config), ) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // This function is used to verify pass through of container environment // variables. @@ -624,7 +689,7 @@ func (r *errorResolver) Resolve(deploy *appsv1.Deployment) error { } func TestResolutionFailed(t *testing.T) { - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) // Unconditionally return this error during resolution. errorMessage := "I am the expected error message, hear me ROAR!" @@ -637,7 +702,7 @@ func TestResolutionFailed(t *testing.T) { *ctrl.NewConfigurationControllerRef(config), ) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) rev, err := elaClient.ServingV1alpha1().Revisions(testNamespace).Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -663,7 +728,7 @@ func TestResolutionFailed(t *testing.T) { func TestCreateRevDoesNotSetUpFluentdSidecarIfVarLogCollectionDisabled(t *testing.T) { controllerConfig := getTestControllerConfig() controllerConfig.EnableVarLogCollection = false - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) rev := getTestRevision() config := getTestConfiguration() rev.OwnerReferences = append( @@ -671,7 +736,7 @@ func TestCreateRevDoesNotSetUpFluentdSidecarIfVarLogCollectionDisabled(t *testin *ctrl.NewConfigurationControllerRef(config), ) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Look for the revision deployment. expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) @@ -704,7 +769,7 @@ func TestCreateRevDoesNotSetUpFluentdSidecarIfVarLogCollectionDisabled(t *testin } func TestCreateRevUpdateConfigMap_NewData(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() fluentdConfigSource := makeFullFluentdConfig(testFluentdSidecarOutputConfig) @@ -719,7 +784,7 @@ func TestCreateRevUpdateConfigMap_NewData(t *testing.T) { } kubeClient.CoreV1().ConfigMaps(testNamespace).Create(existingConfigMap) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Look for the config map. configMap, err := kubeClient.CoreV1().ConfigMaps(testNamespace).Get(fluentdConfigMapName, metav1.GetOptions{}) @@ -733,7 +798,7 @@ func TestCreateRevUpdateConfigMap_NewData(t *testing.T) { } func TestCreateRevUpdateConfigMap_NewRevOwnerReference(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() revRef := *newRevisionNonControllerRef(rev) oldRev := getTestRevision() @@ -753,7 +818,7 @@ func TestCreateRevUpdateConfigMap_NewRevOwnerReference(t *testing.T) { } kubeClient.CoreV1().ConfigMaps(testNamespace).Create(existingConfigMap) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Look for the config map. configMap, err := kubeClient.CoreV1().ConfigMaps(testNamespace).Get(fluentdConfigMapName, metav1.GetOptions{}) @@ -769,11 +834,11 @@ func TestCreateRevUpdateConfigMap_NewRevOwnerReference(t *testing.T) { func TestCreateRevWithWithLoggingURL(t *testing.T) { controllerConfig := getTestControllerConfig() controllerConfig.LoggingURLTemplate = "http://logging.test.com?filter=${REVISION_UID}" - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) rev := getTestRevision() - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) createdRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -789,11 +854,11 @@ func TestCreateRevWithWithLoggingURL(t *testing.T) { func TestCreateRevWithVPA(t *testing.T) { controllerConfig := getTestControllerConfig() controllerConfig.AutoscaleEnableVerticalPodAutoscaling = k8sflag.Bool("", true) - _, _, elaClient, vpaClient, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, vpaClient, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) rev := getTestRevision() - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) createdVpa, err := vpaClient.PocV1alpha1().VerticalPodAutoscalers(testNamespace).Get(ctrl.GetRevisionVpaName(rev), metav1.GetOptions{}) if err != nil { @@ -813,15 +878,15 @@ func TestCreateRevWithVPA(t *testing.T) { func TestUpdateRevWithWithUpdatedLoggingURL(t *testing.T) { controllerConfig := getTestControllerConfig() controllerConfig.LoggingURLTemplate = "http://old-logging.test.com?filter=${REVISION_UID}" - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) rev := getTestRevision() - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Update controllers logging URL controllerConfig.LoggingURLTemplate = "http://new-logging.test.com?filter=${REVISION_UID}" - updateRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) updatedRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -835,7 +900,7 @@ func TestUpdateRevWithWithUpdatedLoggingURL(t *testing.T) { } func TestCreateRevPreservesAppLabel(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() rev.Labels[appLabelKey] = "app-label-that-should-stay-unchanged" elaClient.ServingV1alpha1().Revisions(testNamespace).Create(rev) @@ -844,12 +909,9 @@ func TestCreateRevPreservesAppLabel(t *testing.T) { controller.Reconcile(KeyOrDie(rev)) - // Look for the revision deployment. - expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get ela deployment: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev, deployment, service := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + expectedLabels := sumMaps( rev.Labels, map[string]string{ @@ -865,12 +927,6 @@ func TestCreateRevPreservesAppLabel(t *testing.T) { t.Errorf("Label not set correctly in pod template: expected %v got %v.", expectedLabels, labels) } - // Look for the revision service. - expectedServiceName := fmt.Sprintf("%s-service", rev.Name) - service, err := kubeClient.CoreV1().Services(testNamespace).Get(expectedServiceName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision service: %v", err) - } if labels := service.ObjectMeta.Labels; !reflect.DeepEqual(labels, expectedLabels) { t.Errorf("Label not set correctly for revision service: expected %v got %v.", expectedLabels, labels) @@ -901,7 +957,7 @@ func TestCreateRevPreservesAppLabel(t *testing.T) { } func TestCreateRevWithBuildNameWaits(t *testing.T) { - _, buildClient, elaClient, _, controller, _, buildInformer, elaInformer, _, _ := newTestController(t) + kubeClient, buildClient, elaClient, _, controller, kubeInformer, buildInformer, elaInformer, _, _ := newTestController(t) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) bld := &buildv1alpha1.Build{ @@ -932,7 +988,7 @@ func TestCreateRevWithBuildNameWaits(t *testing.T) { // Direct the Revision to wait for this build to complete. rev.Spec.BuildName = bld.Name - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) waitRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { @@ -955,7 +1011,7 @@ func TestCreateRevWithBuildNameWaits(t *testing.T) { } func TestCreateRevWithFailedBuildNameFails(t *testing.T) { - kubeClient, buildClient, elaClient, _, controller, _, buildInformer, elaInformer, _, _ := newTestController(t) + kubeClient, buildClient, elaClient, _, controller, kubeInformer, buildInformer, elaInformer, _, _ := newTestController(t) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) reason := "Foo" @@ -987,11 +1043,7 @@ func TestCreateRevWithFailedBuildNameFails(t *testing.T) { rev := getTestRevision() // Direct the Revision to wait for this build to complete. rev.Spec.BuildName = bld.Name - revClient.Create(rev) - // Since Reconcile looks in the lister, we need to add it to the informer - elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - - controller.Reconcile(KeyOrDie(rev)) + rev = createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // After the initial update to the revision, we should be // watching for this build to complete, so make it complete, but @@ -1010,6 +1062,9 @@ func TestCreateRevWithFailedBuildNameFails(t *testing.T) { controller.EnqueueBuildTrackers(bld) controller.Reconcile(KeyOrDie(rev)) + // Make sure that the changes from the Reconcile are reflected in our Informers. + failedRev, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + failedRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("Couldn't get revision: %v", err) @@ -1039,8 +1094,7 @@ func TestCreateRevWithFailedBuildNameFails(t *testing.T) { } func TestCreateRevWithCompletedBuildNameCompletes(t *testing.T) { - kubeClient, buildClient, elaClient, _, controller, _, buildInformer, elaInformer, _, _ := newTestController(t) - revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) + kubeClient, buildClient, elaClient, _, controller, kubeInformer, buildInformer, elaInformer, _, _ := newTestController(t) h := NewHooks() // Look for the build complete event. Events are delivered asynchronously so @@ -1079,11 +1133,8 @@ func TestCreateRevWithCompletedBuildNameCompletes(t *testing.T) { rev := getTestRevision() // Direct the Revision to wait for this build to complete. rev.Spec.BuildName = bld.Name - revClient.Create(rev) - // Since Reconcile looks in the lister, we need to add it to the informer - elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - controller.Reconcile(KeyOrDie(rev)) + rev = createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // After the initial update to the revision, we should be // watching for this build to complete, so make it complete @@ -1101,10 +1152,8 @@ func TestCreateRevWithCompletedBuildNameCompletes(t *testing.T) { controller.EnqueueBuildTrackers(bld) controller.Reconcile(KeyOrDie(rev)) - completedRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + completedRev, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) // The next update we receive should tell us that the build completed. for _, ct := range []v1alpha1.RevisionConditionType{"BuildSucceeded"} { @@ -1127,8 +1176,7 @@ func TestCreateRevWithCompletedBuildNameCompletes(t *testing.T) { } func TestCreateRevWithInvalidBuildNameFails(t *testing.T) { - _, buildClient, elaClient, _, controller, _, buildInformer, elaInformer, _, _ := newTestController(t) - revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) + kubeClient, buildClient, elaClient, _, controller, kubeInformer, buildInformer, elaInformer, _, _ := newTestController(t) reason := "Foo" errMessage := "a long human-readable error message." @@ -1155,10 +1203,8 @@ func TestCreateRevWithInvalidBuildNameFails(t *testing.T) { rev := getTestRevision() // Direct the Revision to wait for this build to complete. rev.Spec.BuildName = bld.Name - revClient.Create(rev) - // Since Reconcile looks in the lister, we need to add it to the informer - elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - controller.Reconcile(KeyOrDie(rev)) + + rev = createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // After the initial update to the revision, we should be // watching for this build to complete, so make it complete, but @@ -1177,10 +1223,8 @@ func TestCreateRevWithInvalidBuildNameFails(t *testing.T) { controller.EnqueueBuildTrackers(bld) controller.Reconcile(KeyOrDie(rev)) - failedRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + failedRev, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) for _, ct := range []v1alpha1.RevisionConditionType{"BuildSucceeded", "Ready"} { got := failedRev.Status.GetCondition(ct) @@ -1209,30 +1253,16 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) controller.Reconcile(KeyOrDie(rev)) - rev, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } - - // Look for revision's deployment. - deploymentNameToLook := ctrl.GetRevisionDeploymentName(rev) - - deployment, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get ela deployment: %v", err) - } - elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev, deployment, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) if len(deployment.OwnerReferences) != 1 && rev.Name != deployment.OwnerReferences[0].Name { t.Errorf("expected owner references to have 1 ref with name %s", rev.Name) } controller.Reconcile(KeyOrDie(rev)) - rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev2Inspect, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { got := rev2Inspect.Status.GetCondition(ct) @@ -1251,7 +1281,7 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { // TODO(mattmoor): This is meant to test the checkAndUpdateDeployment logic in the, // Revision controller. However, this logic is commented out because in practice it // fights with the defaulting logic for a Deployment. -// func TestDeploymentCorrection(t *testing.T) { +// func TestDeploymentReconciliation(t *testing.T) { // kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) // revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) @@ -1263,18 +1293,9 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { // elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) // controller.Reconcile(KeyOrDie(rev)) -// rev, err := revClient.Get(rev.Name, metav1.GetOptions{}) -// if err != nil { -// t.Fatalf("Couldn't get revision: %v", err) -// } - -// // Look for revision's deployment. -// deploymentNameToLook := ctrl.GetRevisionDeploymentName(rev) - -// deployment, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) -// if err != nil { -// t.Fatalf("Couldn't get ela deployment: %v", err) -// } +// // Make sure that the changes from the Reconcile are reflected in our Informers. +// rev, deployment, service := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) +// makeBackingEndpoints(t, kubeClient, kubeInformer, service) // // First make a change that we don't expect the Revision controller to reconcile. // var tmp int32 = 37 @@ -1284,22 +1305,14 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { // // Lastly, make an edit we expect the controller to revert. // deployment.Spec.Template.Spec.Containers[0].Image = "busybox" -// elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) // kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) // controller.Reconcile(KeyOrDie(rev)) -// got, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) -// if err != nil { -// t.Fatalf("Couldn't get ela deployment: %v", err) -// } +// // Make sure that the changes from the Reconcile are reflected in our Informers. +// rev2Inspect, got, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) // if diff := cmp.Diff(want, got, cmpopts.IgnoreUnexported(resource.Quantity{})); diff != "" { -// t.Errorf("Unexpected revision conditions diff (-want +got): %v", diff) -// } - -// rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) -// if err != nil { -// t.Fatalf("Couldn't get revision: %v", err) +// t.Errorf("Unexpected deployment diff (-want +got): %v", diff) // } // for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { // got := rev2Inspect.Status.GetCondition(ct) @@ -1315,7 +1328,7 @@ func TestCreateRevWithProgressDeadlineSecondsStuff(t *testing.T) { // } // } -func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { +func TestReconciliation(t *testing.T) { kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) @@ -1327,14 +1340,54 @@ func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) controller.Reconcile(KeyOrDie(rev)) - // Look for revision's deployment. - deploymentNameToLook := ctrl.GetRevisionDeploymentName(rev) + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev, _, service := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + makeBackingEndpoints(t, kubeClient, kubeInformer, service) - deployment, err := kubeClient.Apps().Deployments(testNamespace).Get(deploymentNameToLook, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get ela deployment: %v", err) + want := service.DeepCopy() + // Make an edit we expect the controller to revert. + service.Spec.Selector = map[string]string{ + "not-the": "same", } + kubeInformer.Core().V1().Services().Informer().GetIndexer().Add(service) + controller.Reconcile(KeyOrDie(rev)) + + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev2Inspect, _, got := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected service diff (-want +got): %v", diff) + } + for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { + got := rev2Inspect.Status.GetCondition(ct) + want := &v1alpha1.RevisionCondition{ + Type: ct, + Status: corev1.ConditionUnknown, + Reason: "Updating", + LastTransitionTime: got.LastTransitionTime, + } + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("Unexpected revision conditions diff (-want +got): %v", diff) + } + } +} + +func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) + revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) + + rev := getTestRevision() + + revClient.Create(rev) + + // Since Reconcile looks in the lister, we need to add it to the informer + elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) + controller.Reconcile(KeyOrDie(rev)) + + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev, deployment, service := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) + makeBackingEndpoints(t, kubeClient, kubeInformer, service) + if len(deployment.OwnerReferences) != 1 && rev.Name != deployment.OwnerReferences[0].Name { t.Errorf("expected owner references to have 1 ref with name %s", rev.Name) } @@ -1348,10 +1401,8 @@ func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) controller.Reconcile(KeyOrDie(rev)) - rev2Inspect, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev2Inspect, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { got := rev2Inspect.Status.GetCondition(ct) @@ -1369,8 +1420,7 @@ func TestCreateRevWithProgressDeadlineExceeded(t *testing.T) { } func TestMarkRevReadyUponEndpointBecomesReady(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) - revClient := elaClient.ServingV1alpha1().Revisions(testNamespace) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() h := NewHooks() @@ -1379,15 +1429,7 @@ func TestMarkRevReadyUponEndpointBecomesReady(t *testing.T) { expectedMessage := "Revision becomes ready upon endpoint \"test-rev-service\" becoming ready" h.OnCreate(&kubeClient.Fake, "events", ExpectNormalEventDelivery(t, expectedMessage)) - revClient.Create(rev) - // Since Reconcile looks in the lister, we need to add it to the informer - elaInformer.Serving().V1alpha1().Revisions().Informer().GetIndexer().Add(rev) - controller.Reconcile(KeyOrDie(rev)) - - deployingRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + deployingRev := createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // The revision is not marked ready until an endpoint is created. for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { @@ -1404,12 +1446,12 @@ func TestMarkRevReadyUponEndpointBecomesReady(t *testing.T) { } endpoints := getTestReadyEndpoints(rev.Name) - controller.SyncEndpoints(endpoints) + kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(endpoints) + controller.EnqueueEndpointsRevision(endpoints) + controller.Reconcile(KeyOrDie(rev)) - readyRev, err := revClient.Get(rev.Name, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get revision: %v", err) - } + // Make sure that the changes from the Reconcile are reflected in our Informers. + readyRev, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) // After reconciling the endpoint, the revision should be ready. for _, ct := range []v1alpha1.RevisionConditionType{"Ready"} { @@ -1431,49 +1473,17 @@ func TestMarkRevReadyUponEndpointBecomesReady(t *testing.T) { } func TestDoNotUpdateRevIfRevIsAlreadyReady(t *testing.T) { - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() - // Mark the revision already ready. - rev.Status.Conditions = []v1alpha1.RevisionCondition{{ - Type: "Ready", - Status: corev1.ConditionTrue, - Reason: "ServiceReady", - }} - - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Create endpoints owned by this Revision. endpoints := getTestReadyEndpoints(rev.Name) + kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(endpoints) + controller.Reconcile(KeyOrDie(rev)) - // No revision updates. - elaClient.Fake.PrependReactor("update", "revisions", - func(a kubetesting.Action) (bool, runtime.Object, error) { - t.Error("Revision was updated unexpectedly") - return true, nil, nil - }, - ) - - controller.SyncEndpoints(endpoints) -} - -func TestDoNotUpdateRevIfRevIsMarkedAsFailed(t *testing.T) { - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) - rev := getTestRevision() - // Mark the revision already ready. - rev.Status.Conditions = []v1alpha1.RevisionCondition{{ - Type: "ResourcesAvailable", - Status: corev1.ConditionFalse, - Reason: "ExceededReadinessChecks", - }, { - Type: "Ready", - Status: corev1.ConditionFalse, - Reason: "ExceededReadinessChecks", - }} - - createRevision(elaClient, elaInformer, controller, rev) - - // Create endpoints owned by this Revision. - endpoints := getTestReadyEndpoints(rev.Name) + // Make sure that the changes from the Reconcile are reflected in our Informers. + rev, _, _ = addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) // No revision updates. elaClient.Fake.PrependReactor("update", "revisions", @@ -1483,31 +1493,32 @@ func TestDoNotUpdateRevIfRevIsMarkedAsFailed(t *testing.T) { }, ) - controller.SyncEndpoints(endpoints) + controller.Reconcile(KeyOrDie(rev)) } func TestMarkRevAsFailedIfEndpointHasNoAddressesAfterSomeDuration(t *testing.T) { - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() + rev = createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) + creationTime := time.Now().Add(-10 * time.Minute) rev.ObjectMeta.CreationTimestamp = metav1.NewTime(creationTime) - rev.Status.Conditions = []v1alpha1.RevisionCondition{{ - Type: "Ready", - Status: corev1.ConditionUnknown, - Reason: "Deploying", - }} + for i := range rev.Status.Conditions { + rev.Status.Conditions[i].LastTransitionTime = rev.ObjectMeta.CreationTimestamp + } - createRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Create endpoints owned by this Revision. endpoints := getTestNotReadyEndpoints(rev.Name) - controller.SyncEndpoints(endpoints) + kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(endpoints) + controller.Reconcile(KeyOrDie(rev)) - currentRev, _ := elaClient.ServingV1alpha1().Revisions(testNamespace).Get(rev.Name, metav1.GetOptions{}) + // Make sure that the changes from the Reconcile are reflected in our Informers. + currentRev, _, _ := addResourcesToInformers(t, kubeClient, kubeInformer, elaClient, elaInformer, rev) - // t.Errorf("GOT: %v", currentRev.Status.Conditions) for _, ct := range []v1alpha1.RevisionConditionType{"ResourcesAvailable", "Ready"} { got := currentRev.Status.GetCondition(ct) want := &v1alpha1.RevisionCondition{ @@ -1524,10 +1535,10 @@ func TestMarkRevAsFailedIfEndpointHasNoAddressesAfterSomeDuration(t *testing.T) } func TestAuxiliaryEndpointDoesNotUpdateRev(t *testing.T) { - _, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Create endpoints owned by this Revision. endpoints := getTestAuxiliaryReadyEndpoints(rev.Name) @@ -1540,7 +1551,8 @@ func TestAuxiliaryEndpointDoesNotUpdateRev(t *testing.T) { }, ) - controller.SyncEndpoints(endpoints) + kubeInformer.Core().V1().Endpoints().Informer().GetIndexer().Add(endpoints) + controller.Reconcile(KeyOrDie(rev)) } func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { @@ -1549,23 +1561,16 @@ func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) { // Create revision and verify that the k8s resources are created as // appropriate. - createRevision(elaClient, elaInformer, controller, rev) - - expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get ela deployment: %v", err) - } - kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Now, update the revision serving state to Retired, and force another // run of the controller. rev.Spec.ServingState = v1alpha1.RevisionServingStateRetired - updateRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. - deployment, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) - + expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) + deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err == nil { t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment) } @@ -1577,34 +1582,28 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { // Create revision and verify that the k8s resources are created as // appropriate. - createRevision(elaClient, elaInformer, controller, rev) - - expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) - if err != nil { - t.Fatalf("Couldn't get ela deployment: %v", err) - } - kubeInformer.Apps().V1().Deployments().Informer().GetIndexer().Add(deployment) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Now, update the revision serving state to Reserve, and force another // run of the controller. rev.Spec.ServingState = v1alpha1.RevisionServingStateReserve - updateRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. - deployment, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) + deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) if err == nil { t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment) } } func TestRetiredToActiveRevisionCreatesStuff(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() // Create revision. The k8s resources should not be created. rev.Spec.ServingState = v1alpha1.RevisionServingStateRetired - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) @@ -1616,7 +1615,7 @@ func TestRetiredToActiveRevisionCreatesStuff(t *testing.T) { // Now, update the revision serving state to Active, and force another // run of the controller. rev.Spec.ServingState = v1alpha1.RevisionServingStateActive - updateRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the resources to be created. _, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) @@ -1626,12 +1625,12 @@ func TestRetiredToActiveRevisionCreatesStuff(t *testing.T) { } func TestReserveToActiveRevisionCreatesStuff(t *testing.T) { - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestController(t) rev := getTestRevision() // Create revision. The k8s resources should not be created. rev.Spec.ServingState = v1alpha1.RevisionServingStateReserve - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the deployment to be gone. expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) @@ -1643,7 +1642,7 @@ func TestReserveToActiveRevisionCreatesStuff(t *testing.T) { // Now, update the revision serving state to Active, and force another // run of the controller. rev.Spec.ServingState = v1alpha1.RevisionServingStateActive - updateRevision(elaClient, elaInformer, controller, rev) + updateRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) // Expect the resources to be created. _, err = kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) @@ -1715,7 +1714,7 @@ func TestIstioOutboundIPRangesInjection(t *testing.T) { func getPodAnnotationsForConfig(t *testing.T, configMapValue string, configAnnotationOverride string, updateRandomConfigMap bool) map[string]string { controllerConfig := getTestControllerConfig() - kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) + kubeClient, _, elaClient, _, controller, kubeInformer, _, elaInformer, _, _ := newTestControllerWithConfig(t, &controllerConfig) // Resolve image references to this "digest" digest := "foo@sha256:deadbeef" @@ -1753,7 +1752,7 @@ func getPodAnnotationsForConfig(t *testing.T, configMapValue string, configAnnot *ctrl.NewConfigurationControllerRef(config), ) - createRevision(elaClient, elaInformer, controller, rev) + createRevision(t, kubeClient, kubeInformer, elaClient, elaInformer, controller, rev) expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{})