diff --git a/cmd/activator/main.go b/cmd/activator/main.go index 6430a2ba5755..79494e1ad5c0 100644 --- a/cmd/activator/main.go +++ b/cmd/activator/main.go @@ -34,30 +34,33 @@ import ( ) const ( - maxRetry = 60 + maxRetry = 30 retryInterval = 1 * time.Second ) type activationHandler struct { - act activator.Activator - logger *zap.SugaredLogger + act activator.Activator + logger *zap.SugaredLogger + transport http.RoundTripper + transport2 http.RoundTripper } // retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is // a small delay for k8s to include the ready IP in service. // https://github.com/knative/serving/issues/660#issuecomment-384062553 type retryRoundTripper struct { - logger *zap.SugaredLogger + transport http.RoundTripper + transport2 http.RoundTripper + logger *zap.SugaredLogger } -func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { +func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { var transport http.RoundTripper - - transport = http.DefaultTransport - if r.ProtoMajor == 2 { - transport = h2cutil.NewTransport() + if r.ProtoMajor == 1 { + transport = rrt.transport + } else { + transport = rrt.transport2 } - resp, err := transport.RoundTrip(r) // TODO: Activator should retry with backoff. // https://github.com/knative/serving/issues/1229 @@ -90,8 +93,10 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) { Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port), } proxy := httputil.NewSingleHostReverseProxy(target) - proxy.Transport = retryRoundTripper{ - logger: a.logger, + proxy.Transport = &retryRoundTripper{ + transport: a.transport, + transport2: a.transport2, + logger: a.logger, } // TODO: Clear the host to avoid 404's. @@ -123,7 +128,11 @@ func main() { a := activator.NewRevisionActivator(kubeClient, elaClient, logger) a = activator.NewDedupingActivator(a) - ah := &activationHandler{a, logger} + ah := &activationHandler{ + act: a, + logger: logger, + transport: http.DefaultTransport, + transport2: h2cutil.NewTransport()} // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() diff --git a/pkg/apis/serving/v1alpha1/revision_types.go b/pkg/apis/serving/v1alpha1/revision_types.go index 12ff1a868b9a..7fa0ba351e59 100644 --- a/pkg/apis/serving/v1alpha1/revision_types.go +++ b/pkg/apis/serving/v1alpha1/revision_types.go @@ -398,6 +398,14 @@ func (rs *RevisionStatus) MarkInactive() { }) } +func (rs *RevisionStatus) MarkDeactivating() { + rs.setCondition(&RevisionCondition{ + Type: RevisionConditionReady, + Status: corev1.ConditionFalse, + Reason: "Deactivating", + }) +} + func (rs *RevisionStatus) MarkContainerMissing(message string) { for _, cond := range []RevisionConditionType{ RevisionConditionContainerHealthy, diff --git a/pkg/apis/serving/v1alpha1/revision_types_test.go b/pkg/apis/serving/v1alpha1/revision_types_test.go index ff476684c4e9..13dc191456b1 100644 --- a/pkg/apis/serving/v1alpha1/revision_types_test.go +++ b/pkg/apis/serving/v1alpha1/revision_types_test.go @@ -452,6 +452,14 @@ func TestTypicalFlowWithSuspendResume(t *testing.T) { checkConditionSucceededRevision(r.Status, RevisionConditionContainerHealthy, t) checkConditionSucceededRevision(r.Status, RevisionConditionReady, t) + // Deactivate the revision to simulate scale to zero. + r.Status.MarkDeactivating() + checkConditionSucceededRevision(r.Status, RevisionConditionResourcesAvailable, t) + checkConditionSucceededRevision(r.Status, RevisionConditionContainerHealthy, t) + if got := checkConditionFailedRevision(r.Status, RevisionConditionReady, t); got == nil || got.Reason != "Deactivating" { + t.Errorf("MarkDeactivating = %v, want Deactivating", got) + } + // From a Ready state, make the revision inactive to simulate scale to zero. r.Status.MarkInactive() checkConditionSucceededRevision(r.Status, RevisionConditionResourcesAvailable, t) diff --git a/pkg/controller/revision/resource.go b/pkg/controller/revision/resource.go index 99a32973c4db..c0ab810d5004 100644 --- a/pkg/controller/revision/resource.go +++ b/pkg/controller/revision/resource.go @@ -31,7 +31,14 @@ func MakeServingResourceLabels(revision *v1alpha1.Revision) map[string]string { labels[serving.RevisionUID] = string(revision.UID) for k, v := range revision.ObjectMeta.Labels { - labels[k] = v + // TODO: Use a fixed set for revision labels. If the set of labels changed, + // the deployment could end up with multiple replica sets. + // https://github.com/knative/serving/issues/1293 + // The route for a revision could change, therefore it is excluded + // in revision labels as a temporary solution. + if k != serving.RouteLabelKey { + labels[k] = v + } } // If users don't specify an app: label we will automatically // populate it with the revision name to get the benefit of richer diff --git a/pkg/controller/revision/revision.go b/pkg/controller/revision/revision.go index 2555219f3e9e..e97df86b763c 100644 --- a/pkg/controller/revision/revision.go +++ b/pkg/controller/revision/revision.go @@ -78,7 +78,7 @@ const ( var ( elaPodReplicaCount = int32(1) - elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 1} + elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 0} elaPodMaxSurge = intstr.IntOrString{Type: intstr.Int, IntVal: 1} ) @@ -441,45 +441,44 @@ func (c *Controller) reconcileOnceBuilt(ctx context.Context, rev *v1alpha1.Revis logger.Info("Creating or reconciling resources for revision") return c.createK8SResources(ctx, rev) } - return c.deleteK8SResources(ctx, rev) + return c.teardownK8SResources(ctx, rev) } -func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { +func (c *Controller) teardownK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) - logger.Info("Deleting the resources for revision") - err := c.deleteDeployment(ctx, rev) - if err != nil { - logger.Error("Failed to delete a deployment", zap.Error(err)) - } - logger.Info("Deleted deployment") - - err = c.deleteAutoscalerDeployment(ctx, rev) - if err != nil { - logger.Error("Failed to delete autoscaler Deployment", zap.Error(err)) - } - logger.Info("Deleted autoscaler Deployment") - - err = c.deleteAutoscalerService(ctx, rev) - if err != nil { - logger.Error("Failed to delete autoscaler Service", zap.Error(err)) - } - logger.Info("Deleted autoscaler Service") - - err = c.deleteService(ctx, rev) - if err != nil { - logger.Error("Failed to delete k8s service", zap.Error(err)) + if rev.Spec.ServingState == v1alpha1.RevisionServingStateRetired { + // Delete the k8s deployment and revision service if serving state is Retired. + logger.Info("Deleting the resources for revision") + if err := c.deleteDeployment(ctx, rev); err != nil { + return err + } + if err := c.deleteService(ctx, rev); err != nil { + return err + } + } else { + // Serving state is RevisionServingStateReserve. Keep the revision service and update + // the deployment replicas to be 0. + if cond := rev.Status.GetCondition(v1alpha1.RevisionConditionReady); cond != nil && cond.Reason != "Inactive" { + if cond.Reason == "Deactivating" { + logger.Info("The revision is in Deactivating condition") + return nil + } + rev.Status.MarkDeactivating() + _, err := c.updateStatus(rev) + if err != nil { + logger.Error("Error updating revision condition to be deactivating", + zap.Error(err)) + } + return err + } + if err := c.deactivateDeployment(ctx, rev); err != nil { + return err + } } - logger.Info("Deleted service") - - // And the deployment is no longer ready, so update that - rev.Status.MarkInactive() - logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions) - if _, err := c.updateStatus(rev); err != nil { - logger.Error("Error recording inactivation of revision", zap.Error(err)) + if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil { return err } - - return nil + return c.deleteAutoscalerService(ctx, rev) } func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revision) error { @@ -553,6 +552,32 @@ func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revis return nil } +func (c *Controller) deactivateDeployment(ctx context.Context, rev *v1alpha1.Revision) error { + logger := logging.FromContext(ctx) + deploymentName := controller.GetRevisionDeploymentName(rev) + dc := c.KubeClientSet.AppsV1().Deployments(rev.Namespace) + deployment, err := dc.Get(deploymentName, metav1.GetOptions{}) + if err != nil { + logger.Errorf("Failed to get deployment %q", deploymentName) + return err + } + if *deployment.Spec.Replicas == 0 { + logger.Infof("Deployment %s is scaled to 0 already.", deploymentName) + return nil + } + + logger.Infof("Deactivating deployment %q", deploymentName) + deployment.Spec.Replicas = new(int32) + *deployment.Spec.Replicas = int32(0) + _, err = dc.Update(deployment) + if err != nil { + logger.Errorf("Error deactivating deployment %q: %s", deploymentName, err) + return err + } + logger.Infof("Successfully scaled deployment %s to 0.", deploymentName) + return nil +} + func (c *Controller) deleteDeployment(ctx context.Context, rev *v1alpha1.Revision) error { logger := logging.FromContext(ctx) deploymentName := controller.GetRevisionDeploymentName(rev) @@ -581,24 +606,19 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi // First, check if deployment exists already. deploymentName := controller.GetRevisionDeploymentName(rev) - if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil { + deploymentExists := true + _, err := dc.Get(deploymentName, metav1.GetOptions{}) + if err != nil { if !apierrs.IsNotFound(err) { logger.Errorf("deployments.Get for %q failed: %s", deploymentName, err) return err } - logger.Infof("Deployment %q doesn't exist, creating", deploymentName) - } else { - // TODO(mattmoor): Compare the deployments and update if it has changed - // out from under us. - logger.Infof("Found existing deployment %q", deploymentName) - return nil + deploymentExists = false } - // Create the deployment. deployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig) - // Resolve tag image references to digests. - if err := c.resolver.Resolve(deployment); err != nil { + if err = c.resolver.Resolve(deployment); err != nil { logger.Error("Error resolving deployment", zap.Error(err)) rev.Status.MarkContainerMissing(err.Error()) if _, err := c.updateStatus(rev); err != nil { @@ -608,10 +628,16 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi return err } - logger.Infof("Creating Deployment: %q", deployment.Name) - _, createErr := dc.Create(deployment) - - return createErr + if deploymentExists { + // TODO(mattmoor): Compare the deployments and update if it has changed + // out from under us. + logger.Infof("Found existing deployment %q, updating", deploymentName) + _, err = dc.Update(deployment) + } else { + logger.Infof("Deployment %q doesn't exist, creating", deploymentName) + _, err = dc.Create(deployment) + } + return err } func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error { diff --git a/pkg/controller/revision/revision_test.go b/pkg/controller/revision/revision_test.go index aead14629d88..386efeb3cbe0 100644 --- a/pkg/controller/revision/revision_test.go +++ b/pkg/controller/revision/revision_test.go @@ -78,9 +78,8 @@ func getTestRevision() *v1alpha1.Revision { Name: "test-rev", Namespace: testNamespace, Labels: map[string]string{ - "testLabel1": "foo", - "testLabel2": "bar", - serving.RouteLabelKey: "test-route", + "testLabel1": "foo", + "testLabel2": "bar", }, Annotations: map[string]string{ "testAnnotation": "test", @@ -1503,8 +1502,8 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { // appropriate. createRevision(elaClient, elaInformer, controller, rev) - expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name) - _, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) + deploymentName := fmt.Sprintf("%s-deployment", rev.Name) + _, err := kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{}) if err != nil { t.Fatalf("Couldn't get ela deployment: %v", err) } @@ -1514,10 +1513,14 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) { rev.Spec.ServingState = v1alpha1.RevisionServingStateReserve updateRevision(elaClient, elaInformer, controller, rev) - // Expect the deployment to be gone. - deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{}) - if err == nil { - t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment) + // Expect the deployment to be there. + _, err = kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{}) + + if err != nil { + if apierrs.IsNotFound(err) { + t.Fatalf("Expected k8s deployment to be there but it was gone: %s/%s", testNamespace, deploymentName) + } + t.Fatalf("There was an error to get the deployment %s while it exists", deploymentName) } } diff --git a/pkg/controller/route/route.go b/pkg/controller/route/route.go index 61c0e3a5f289..6d949f43e80b 100644 --- a/pkg/controller/route/route.go +++ b/pkg/controller/route/route.go @@ -283,7 +283,7 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(ctx context.Context, // Then create the actual route rules. logger.Info("Creating Istio route rules") - revisionRoutes, err := c.createOrUpdateRouteRules(ctx, route, configMap, revMap) + revisionRoutes, inactiveRev, err := c.createOrUpdateRouteRules(ctx, route, configMap, revMap) if err != nil { logger.Error("Failed to create routes", zap.Error(err)) return nil, err @@ -302,7 +302,27 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(ctx context.Context, route.Status.Traffic = traffic } route.Status.Domain = c.routeDomain(route) - return c.updateStatus(ctx, route) + newRoute, err := c.updateStatus(ctx, route) + if err != nil { + logger.Error("Failed to update routes", zap.Error(err)) + return nil, err + } + // After the route rules are updated, mark the revision inactive. In case of deactivation, + // the routerules need to point to activator-service before we tear down k8s resources. + if inactiveRev != "" { + revisionClient := c.ServingClientSet.ServingV1alpha1().Revisions(route.Namespace) + rev, err := revisionClient.Get(inactiveRev, metav1.GetOptions{}) + if err != nil { + logger.Errorf("Failed to fetch the inactive revision %s: %s", inactiveRev, err) + return nil, err + } + rev.Status.MarkInactive() + if _, err = revisionClient.Update(rev); err != nil { + logger.Errorf("Failed to update revision %s to be inactive: %s", inactiveRev, err) + return nil, err + } + } + return newRoute, nil } func (c *Controller) reconcilePlaceholderService(ctx context.Context, route *v1alpha1.Route) error { @@ -661,33 +681,35 @@ func (c *Controller) computeRevisionRoutes( return nil, "", err } - hasRouteRule := true + revisionWeight := tt.Percent cond := rev.Status.GetCondition(v1alpha1.RevisionConditionReady) if enableScaleToZero && cond != nil { - // A revision is considered inactive (yet) if it's in - // "Inactive" condition or "Activating" condition. + // A revision is considered inactive if it's in + // "Inactive" condition, "Activating" or "Deactivating" condition. + // TODO: consilidate revision conditions. + // https://github.com/knative/serving/issues/645 if (cond.Reason == "Inactive" && cond.Status == corev1.ConditionFalse) || - (cond.Reason == "Activating" && cond.Status == corev1.ConditionUnknown) { + (cond.Reason == "Activating" && cond.Status == corev1.ConditionUnknown) || + (cond.Reason == "Deactivating" && cond.Status == corev1.ConditionFalse) { // Let inactiveRev be the Reserve revision with the largest traffic weight. if tt.Percent > maxInactivePercent { maxInactivePercent = tt.Percent inactiveRev = rev.Name } totalInactivePercent += tt.Percent - hasRouteRule = false + revisionWeight = 0 } } - if hasRouteRule { - rr := RevisionRoute{ - Name: tt.Name, - RevisionName: rev.Name, - Service: rev.Status.ServiceName, - Namespace: elaNS, - Weight: tt.Percent, - } - ret = append(ret, rr) + rr := RevisionRoute{ + Name: tt.Name, + RevisionName: rev.Name, + Service: rev.Status.ServiceName, + Namespace: elaNS, + Weight: revisionWeight, } + logger.Infof("RevisionRoute: %v", rr) + ret = append(ret, rr) } // TODO: The ideal solution is to append different revision name as headers for each inactive revision. @@ -753,31 +775,31 @@ func (c *Controller) computeEmptyRevisionRoutes( } func (c *Controller) createOrUpdateRouteRules(ctx context.Context, route *v1alpha1.Route, - configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, error) { + configMap map[string]*v1alpha1.Configuration, revMap map[string]*v1alpha1.Revision) ([]RevisionRoute, string, error) { logger := logging.FromContext(ctx) // grab a client that's specific to RouteRule. ns := route.Namespace routeClient := c.ServingClientSet.ConfigV1alpha2().RouteRules(ns) if routeClient == nil { logger.Errorf("Failed to create resource client") - return nil, fmt.Errorf("Couldn't get a routeClient") + return nil, "", fmt.Errorf("Couldn't get a routeClient") } revisionRoutes, inactiveRev, err := c.computeRevisionRoutes(ctx, route, configMap, revMap) if err != nil { logger.Errorf("Failed to get routes for %s : %q", route.Name, err) - return nil, err + return nil, "", err } if len(revisionRoutes) == 0 { logger.Errorf("No routes were found for the service %q", route.Name) - return nil, nil + return nil, "", nil } // TODO: remove this once https://github.com/istio/istio/issues/5204 is fixed. emptyRoutes, err := c.computeEmptyRevisionRoutes(ctx, route, configMap, revMap) if err != nil { logger.Errorf("Failed to get empty routes for %s : %q", route.Name, err) - return nil, err + return nil, "", err } revisionRoutes = append(revisionRoutes, emptyRoutes...) // Create route rule for the route domain @@ -785,19 +807,19 @@ func (c *Controller) createOrUpdateRouteRules(ctx context.Context, route *v1alph routeRules, err := routeClient.Get(routeRuleName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { - return nil, err + return nil, "", err } routeRules = MakeIstioRoutes(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) - return nil, err + return nil, "", err } c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, nil, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { c.Recorder.Eventf(route, corev1.EventTypeWarning, "UpdateFailed", "Failed to update Istio route rule %q: %s", routeRules.Name, err) - return nil, err + return nil, "", err } c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } @@ -811,26 +833,26 @@ func (c *Controller) createOrUpdateRouteRules(ctx context.Context, route *v1alph routeRules, err := routeClient.Get(routeRuleName, metav1.GetOptions{}) if err != nil { if !apierrs.IsNotFound(err) { - return nil, err + return nil, "", err } routeRules = MakeIstioRoutes(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Create(routeRules); err != nil { c.Recorder.Eventf(route, corev1.EventTypeWarning, "CreationFailed", "Failed to create Istio route rule %q: %s", routeRules.Name, err) - return nil, err + return nil, "", err } c.Recorder.Eventf(route, corev1.EventTypeNormal, "Created", "Created Istio route rule %q", routeRules.Name) } else { routeRules.Spec = makeIstioRouteSpec(route, &tt, ns, revisionRoutes, c.routeDomain(route), inactiveRev) if _, err := routeClient.Update(routeRules); err != nil { - return nil, err + return nil, "", err } c.Recorder.Eventf(route, corev1.EventTypeNormal, "Updated", "Updated Istio route rule %q", routeRules.Name) } } if err := c.removeOutdatedRouteRules(ctx, route); err != nil { - return nil, err + return nil, "", err } - return revisionRoutes, nil + return revisionRoutes, inactiveRev, nil } func (c *Controller) updateStatus(ctx context.Context, route *v1alpha1.Route) (*v1alpha1.Route, error) { diff --git a/pkg/controller/route/route_test.go b/pkg/controller/route/route_test.go index 13b9cd9d8619..f9e4d874a836 100644 --- a/pkg/controller/route/route_test.go +++ b/pkg/controller/route/route_test.go @@ -430,7 +430,13 @@ func TestCreateRouteForOneReserveRevision(t *testing.T) { }, }, }, - Route: []v1alpha2.DestinationWeight{getActivatorDestinationWeight(100)}, + Route: []v1alpha2.DestinationWeight{{ + Destination: v1alpha2.IstioService{ + Name: "test-rev-service", + Namespace: testNamespace, + }, + Weight: 0, + }, getActivatorDestinationWeight(100)}, AppendHeaders: appendHeaders, } @@ -590,8 +596,7 @@ func TestCreateRouteWithMultipleTargets(t *testing.T) { Namespace: testNamespace, }, Weight: 10, - }, getActivatorDestinationWeight(0)}, - } + }, getActivatorDestinationWeight(0)}} if diff := cmp.Diff(expectedRouteSpec, routerule.Spec); diff != "" { t.Errorf("Unexpected rule spec diff (-want +got): %v", diff) @@ -666,10 +671,16 @@ func TestCreateRouteWithOneTargetReserve(t *testing.T) { }, Route: []v1alpha2.DestinationWeight{{ Destination: v1alpha2.IstioService{ - Name: fmt.Sprintf("%s-service", cfgrev.Name), + Name: "p-deadbeef-service", Namespace: testNamespace, }, Weight: 90, + }, { + Destination: v1alpha2.IstioService{ + Name: "test-rev-service", + Namespace: testNamespace, + }, + Weight: 0, }, getActivatorDestinationWeight(10)}, AppendHeaders: appendHeaders, } @@ -860,8 +871,7 @@ func TestCreateRouteWithNamedTargets(t *testing.T) { Namespace: testNamespace, }, Weight: 50, - }, getActivatorDestinationWeight(0)}, - }) + }, getActivatorDestinationWeight(0)}}) // Expects authority header to have the traffic target name prefixed to the // domain suffix. Also weights 100% of the traffic to the specified traffic