Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 128 additions & 41 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ const (

var (
elaPodReplicaCount = int32(1)
elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 1}
elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 0}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious to know what your motivation was to change this to 0?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same question.

elaPodMaxSurge = intstr.IntOrString{Type: intstr.Int, IntVal: 1}
foregroundDeletion = metav1.DeletePropagationForeground
fgDeleteOptions = &metav1.DeleteOptions{
Expand Down Expand Up @@ -324,7 +324,7 @@ func (c *Controller) Reconcile(key string) error {
return c.createK8SResources(ctx, rev)

case v1alpha1.RevisionServingStateReserve:
return c.deleteK8SResources(ctx, rev)
return c.scaleRevisionResourcesToZero(ctx, rev)

// TODO(mattmoor): Nothing sets this state, and it should be removed.
case v1alpha1.RevisionServingStateRetired:
Expand Down Expand Up @@ -449,48 +449,134 @@ func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) {
return
}

// deleteK8SResources deletes autoscaler resources, and deletes the revision service and deployment.
// It is used when the revision serving state is Retired.
func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
logger.Info("Deleting the resources for revision")
err := c.deleteDeployment(ctx, rev)
if err != nil {
if err := c.deleteDeployment(ctx, rev); err != nil {
logger.Error("Failed to delete a deployment", zap.Error(err))
return err
}
logger.Info("Deleted deployment")

err = c.deleteAutoscalerDeployment(ctx, rev)
if err != nil {
if err := c.deleteService(ctx, rev); err != nil {
logger.Error("Failed to delete k8s service", zap.Error(err))
return err
}

if err := c.deleteAutoscalerResources(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler resources", zap.Error(err))
return err
}

// And the deployment is no longer ready, so update that
rev.Status.MarkInactive()
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you rebase you won't need the updateStatus call anymore due to the PR #1321

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole function will be gone after #1334

logger.Error("Error recording inactivation of revision", zap.Error(err))
return err
}

return nil
}

// scaleRevisionResourcesToZero deletes autoscaler resources, but keeps the revision service and deployment.
// It is used when the revision serving state is Reserve.
func (c *Controller) scaleRevisionResourcesToZero(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
logger.Info("Scaling the deployment to 0 for the revision")
if err := c.scaleRevisionDeploymentToZero(ctx, rev); err != nil {
logger.Error("Failed to scale the deployment to 0", zap.Error(err))
return err
}

if err := c.deleteAutoscalerResources(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler resources", zap.Error(err))
return err
}

// And the deployment is no longer ready, so update that
rev.Status.MarkInactive()
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise see: #1321

logger.Error("Error recording inactivation of revision", zap.Error(err))
return err
}

return nil
}

func (c *Controller) deleteAutoscalerResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
if err := c.scaleAutoscalerDeploymentToZero(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler Deployment", zap.Error(err))
return err
}
logger.Info("Deleted autoscaler Deployment")

err = c.deleteAutoscalerService(ctx, rev)
if err != nil {
if err := c.deleteAutoscalerService(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler Service", zap.Error(err))
return err
}
logger.Info("Deleted autoscaler Service")

if c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() {
if err := c.deleteVpa(ctx, rev); err != nil {
logger.Error("Failed to delete VPA", zap.Error(err))
return err
}
logger.Info("Deleted VPA")
}
return nil
}

err = c.deleteService(ctx, rev)
func (c *Controller) scaleAutoscalerDeploymentToZero(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
deploymentName := controller.GetRevisionAutoscalerName(rev)

dc := c.KubeClientSet.AppsV1().Deployments(pkg.GetServingSystemNamespace())
deployment, err := dc.Get(deploymentName, metav1.GetOptions{})
if err != nil {
logger.Error("Failed to delete k8s service", zap.Error(err))
logger.Errorf("Failed to get deployment %q", deploymentName)
return err
}
if *deployment.Spec.Replicas == 0 {
logger.Infof("Deployment %s is scaled to 0 already.", deploymentName)
return nil
}
logger.Info("Deleted service")

// And the deployment is no longer ready, so update that
rev.Status.MarkInactive()
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording inactivation of revision", zap.Error(err))
logger.Infof("Setting deployment %q replicas to 0", deploymentName)
deployment.Spec.Replicas = new(int32)
*deployment.Spec.Replicas = int32(0)
_, err = dc.Update(deployment)
if err != nil {
logger.Errorf("Error scaling deployment %q to 0: %s", deploymentName, err)
return err
}
logger.Infof("Successfully scaled deployment %s to 0.", deploymentName)
return nil
}

func (c *Controller) scaleRevisionDeploymentToZero(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
deploymentName := controller.GetRevisionDeploymentName(rev)
dc := c.KubeClientSet.AppsV1().Deployments(rev.Namespace)
deployment, err := dc.Get(deploymentName, metav1.GetOptions{})
if err != nil {
logger.Errorf("Failed to get deployment %q", deploymentName)
return err
}
if *deployment.Spec.Replicas == 0 {
logger.Infof("Deployment %s is scaled to 0 already.", deploymentName)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

%q

return nil
}

logger.Infof("Setting deployment %q replicas to 0", deploymentName)
deployment.Spec.Replicas = new(int32)
*deployment.Spec.Replicas = int32(0)
_, err = dc.Update(deployment)
if err != nil {
logger.Errorf("Error scaling deployment %q to 0: %s", deploymentName, err)
return err
}
logger.Infof("Successfully scaled deployment %s to 0.", deploymentName)
return nil
}

Expand Down Expand Up @@ -585,37 +671,38 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi
// First, check if deployment exists already.
deploymentName := controller.GetRevisionDeploymentName(rev)

if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
desiredDeployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig)
// Resolve tag image references to digests.
if err := c.resolver.Resolve(desiredDeployment); err != nil {
logger.Error("Error resolving deployment", zap.Error(err))
rev.Status.MarkContainerMissing(err.Error())
if _, updateErr := c.updateStatus(rev); updateErr != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Likewise see: #1321

logger.Error("Error recording resolution problem", zap.Error(updateErr))
return updateErr
}
return err
}

if existingDeployment, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
if !apierrs.IsNotFound(err) {
logger.Errorf("deployments.Get for %q failed: %s", deploymentName, err)
return err
}
logger.Infof("Deployment %q doesn't exist, creating", deploymentName)
_, err := dc.Create(desiredDeployment)
return err
} else {
// TODO(mattmoor): Compare the deployments and update if it has changed
// out from under us.
logger.Infof("Found existing deployment %q", deploymentName)
return nil
}

// Create the deployment.
deployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig)

// Resolve tag image references to digests.
if err := c.resolver.Resolve(deployment); err != nil {
logger.Error("Error resolving deployment", zap.Error(err))
rev.Status.MarkContainerMissing(err.Error())
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording resolution problem", zap.Error(err))
return err
// TODO(mattmoor): Compare the deployments and update if it has changed
// out from under us. So far the deployment could only be updated for replicas field.
if *existingDeployment.Spec.Replicas == *desiredDeployment.Spec.Replicas {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something happened to my comments. I think I accidentally deleted them. Here it goes again:

This code will reset the replica count that is set by autoscaler every 30 seconds and that seems wrong. If a deployment exist, we should just return.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this seems strange to me too. I think the controller should be overwriting everything except the replicas count. We leave that to the Activator and the Autoscaler. And the update is punted for now by the TODO(mattmoor) above.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure, but I think the reason for not updating deployments every 30 seconds today is because updating deployments even without changes might cause restart of the pods (I don't think that is the case unless the pod spec changes, but I am not 100% sure). @mattmoor is that the reason that deployments are never reconciled today?

logger.Infof("The existing deployment %q replicas count %d is expected", deploymentName, *existingDeployment.Spec.Replicas)
return nil
}
logger.Infof("Updating deployment %q", deploymentName)
_, err := dc.Update(desiredDeployment)
return err
}

logger.Infof("Creating Deployment: %q", deployment.Name)
_, createErr := dc.Create(deployment)

return createErr
}

func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error {
Expand Down
23 changes: 13 additions & 10 deletions pkg/controller/revision/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,8 @@ func getTestRevision() *v1alpha1.Revision {
Name: "test-rev",
Namespace: testNamespace,
Labels: map[string]string{
"testLabel1": "foo",
"testLabel2": "bar",
serving.RouteLabelKey: "test-route",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the constant serving.RouteLabelKey used anymore?

"testLabel1": "foo",
"testLabel2": "bar",
},
Annotations: map[string]string{
"testAnnotation": "test",
Expand Down Expand Up @@ -1532,16 +1531,16 @@ func TestActiveToRetiredRevisionDeletesStuff(t *testing.T) {
}
}

func TestActiveToReserveRevisionDeletesStuff(t *testing.T) {
func TestActiveToReserveRevisionDeactivateDeployment(t *testing.T) {
kubeClient, _, elaClient, _, controller, _, _, elaInformer, _, _ := newTestController(t)
rev := getTestRevision()

// Create revision and verify that the k8s resources are created as
// appropriate.
createRevision(elaClient, elaInformer, controller, rev)

expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name)
_, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{})
deploymentName := fmt.Sprintf("%s-deployment", rev.Name)
_, err := kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Couldn't get ela deployment: %v", err)
}
Expand All @@ -1551,10 +1550,14 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) {
rev.Spec.ServingState = v1alpha1.RevisionServingStateReserve
updateRevision(elaClient, elaInformer, controller, rev)

// Expect the deployment to be gone.
deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{})
if err == nil {
t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment)
// Expect the deployment to be there.
_, err = kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{})

if err != nil {
if apierrs.IsNotFound(err) {
t.Fatalf("Expected k8s deployment to be there but it was gone: %s/%s", testNamespace, deploymentName)
}
t.Fatalf("There was an error to get the deployment %s while it exists", deploymentName)
}
}

Expand Down
82 changes: 0 additions & 82 deletions pkg/controller/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,13 +274,6 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(ctx context.Context,
return nil, err
}

if err := c.deleteLabelForOutsideOfGivenRevisions(ctx, route, revMap); err != nil {
return nil, err
}
if err := c.setLabelForGivenRevisions(ctx, route, revMap); err != nil {
return nil, err
}

// Then create the actual route rules.
logger.Info("Creating Istio route rules")
revisionRoutes, err := c.createOrUpdateRouteRules(ctx, route, configMap, revMap)
Expand Down Expand Up @@ -509,51 +502,6 @@ func (c *Controller) setLabelForGivenConfigurations(
return nil
}

func (c *Controller) setLabelForGivenRevisions(
ctx context.Context, route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
revisionClient := c.ServingClientSet.ServingV1alpha1().Revisions(route.Namespace)

// Validate revision if it already has a route label
for _, rev := range revMap {
if routeName, ok := rev.Labels[serving.RouteLabelKey]; ok {
if routeName != route.Name {
errMsg := fmt.Sprintf("Revision %q is already in use by %q, and cannot be used by %q",
rev.Name, routeName, route.Name)
c.Recorder.Event(route, corev1.EventTypeWarning, "RevisionInUse", errMsg)
logger.Error(errMsg)
return errors.New(errMsg)
}
}
}

for _, rev := range revMap {
if rev.Labels != nil {
if _, ok := rev.Labels[serving.RouteLabelKey]; ok {
continue
}
}
// Fetch the latest version of the revision to label, to narrow the window for
// optimistic concurrency failures.
latestRev, err := revisionClient.Get(rev.Name, metav1.GetOptions{})
if err != nil {
return err
}
if latestRev.Labels == nil {
latestRev.Labels = make(map[string]string)
} else if _, ok := latestRev.Labels[serving.RouteLabelKey]; ok {
continue
}
latestRev.Labels[serving.RouteLabelKey] = route.Name
if _, err := revisionClient.Update(latestRev); err != nil {
logger.Errorf("Failed to add route label to Revision %s: %s", rev.Name, err)
return err
}
}

return nil
}

func (c *Controller) deleteLabelForOutsideOfGivenConfigurations(
ctx context.Context, route *v1alpha1.Route, configMap map[string]*v1alpha1.Configuration) error {
logger := logging.FromContext(ctx)
Expand Down Expand Up @@ -584,36 +532,6 @@ func (c *Controller) deleteLabelForOutsideOfGivenConfigurations(
return nil
}

func (c *Controller) deleteLabelForOutsideOfGivenRevisions(
ctx context.Context, route *v1alpha1.Route, revMap map[string]*v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
revClient := c.ServingClientSet.ServingV1alpha1().Revisions(route.Namespace)

oldRevList, err := revClient.List(
metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", serving.RouteLabelKey, route.Name),
},
)
if err != nil {
logger.Errorf("Failed to fetch revisions with label '%s=%s': %s",
serving.RouteLabelKey, route.Name, err)
return err
}

// Delete label for newly removed revisions as traffic target.
for _, rev := range oldRevList.Items {
if _, ok := revMap[rev.Name]; !ok {
delete(rev.Labels, serving.RouteLabelKey)
if _, err := revClient.Update(&rev); err != nil {
logger.Errorf("Failed to remove route label from Revision %s: %s", rev.Name, err)
return err
}
}
}

return nil
}

// computeRevisionRoutes computes RevisionRoute for a route object. If there is one or more inactive revisions and enableScaleToZero
// is true, a route rule with the activator service as the destination will be added. It returns the revision routes, the inactive
// revision name to which the activator should forward requests to, and error if there is any.
Expand Down
Loading