Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions cmd/activator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,33 @@ import (
)

const (
maxRetry = 60
maxRetry = 30
retryInterval = 1 * time.Second
)

type activationHandler struct {
act activator.Activator
logger *zap.SugaredLogger
act activator.Activator
logger *zap.SugaredLogger
transport http.RoundTripper
transport2 http.RoundTripper
}

// retryRoundTripper retries on 503's for up to 60 seconds. The reason is there is
// a small delay for k8s to include the ready IP in service.
// https://github.com/knative/serving/issues/660#issuecomment-384062553
type retryRoundTripper struct {
logger *zap.SugaredLogger
transport http.RoundTripper
transport2 http.RoundTripper
logger *zap.SugaredLogger
}

func (rrt retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
func (rrt *retryRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
var transport http.RoundTripper

transport = http.DefaultTransport
if r.ProtoMajor == 2 {
transport = h2cutil.NewTransport()
if r.ProtoMajor == 1 {
transport = rrt.transport
} else {
transport = rrt.transport2
}

resp, err := transport.RoundTrip(r)
// TODO: Activator should retry with backoff.
// https://github.com/knative/serving/issues/1229
Expand Down Expand Up @@ -90,8 +93,10 @@ func (a *activationHandler) handler(w http.ResponseWriter, r *http.Request) {
Host: fmt.Sprintf("%s:%d", endpoint.FQDN, endpoint.Port),
}
proxy := httputil.NewSingleHostReverseProxy(target)
proxy.Transport = retryRoundTripper{
logger: a.logger,
proxy.Transport = &retryRoundTripper{
transport: a.transport,
transport2: a.transport2,
logger: a.logger,
}

// TODO: Clear the host to avoid 404's.
Expand Down Expand Up @@ -123,7 +128,11 @@ func main() {

a := activator.NewRevisionActivator(kubeClient, elaClient, logger)
a = activator.NewDedupingActivator(a)
ah := &activationHandler{a, logger}
ah := &activationHandler{
act: a,
logger: logger,
transport: http.DefaultTransport,
transport2: h2cutil.NewTransport()}

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/serving/v1alpha1/revision_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,14 @@ func (rs *RevisionStatus) MarkInactive() {
})
}

func (rs *RevisionStatus) MarkDeactivating() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs unit test coverage.

rs.setCondition(&RevisionCondition{
Type: RevisionConditionReady,
Status: corev1.ConditionFalse,
Reason: "Deactivating",
})
}

func (rs *RevisionStatus) MarkContainerMissing(message string) {
for _, cond := range []RevisionConditionType{
RevisionConditionContainerHealthy,
Expand Down
8 changes: 8 additions & 0 deletions pkg/apis/serving/v1alpha1/revision_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,14 @@ func TestTypicalFlowWithSuspendResume(t *testing.T) {
checkConditionSucceededRevision(r.Status, RevisionConditionContainerHealthy, t)
checkConditionSucceededRevision(r.Status, RevisionConditionReady, t)

// Deactivate the revision to simulate scale to zero.
r.Status.MarkDeactivating()
checkConditionSucceededRevision(r.Status, RevisionConditionResourcesAvailable, t)
checkConditionSucceededRevision(r.Status, RevisionConditionContainerHealthy, t)
if got := checkConditionFailedRevision(r.Status, RevisionConditionReady, t); got == nil || got.Reason != "Deactivating" {
t.Errorf("MarkDeactivating = %v, want Deactivating", got)
}

// From a Ready state, make the revision inactive to simulate scale to zero.
r.Status.MarkInactive()
checkConditionSucceededRevision(r.Status, RevisionConditionResourcesAvailable, t)
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/revision/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ func MakeServingResourceLabels(revision *v1alpha1.Revision) map[string]string {
labels[serving.RevisionUID] = string(revision.UID)

for k, v := range revision.ObjectMeta.Labels {
labels[k] = v
// TODO: Use a fixed set for revision labels. If the set of labels changed,
// the deployment could end up with multiple replica sets.
// https://github.com/knative/serving/issues/1293
// The route for a revision could change, therefore it is excluded
// in revision labels as a temporary solution.
if k != serving.RouteLabelKey {
labels[k] = v
}
}
// If users don't specify an app: label we will automatically
// populate it with the revision name to get the benefit of richer
Expand Down
122 changes: 74 additions & 48 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (

var (
elaPodReplicaCount = int32(1)
elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 1}
elaPodMaxUnavailable = intstr.IntOrString{Type: intstr.Int, IntVal: 0}
elaPodMaxSurge = intstr.IntOrString{Type: intstr.Int, IntVal: 1}
)

Expand Down Expand Up @@ -441,45 +441,44 @@ func (c *Controller) reconcileOnceBuilt(ctx context.Context, rev *v1alpha1.Revis
logger.Info("Creating or reconciling resources for revision")
return c.createK8SResources(ctx, rev)
}
return c.deleteK8SResources(ctx, rev)
return c.teardownK8SResources(ctx, rev)
}

func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
func (c *Controller) teardownK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
logger.Info("Deleting the resources for revision")
err := c.deleteDeployment(ctx, rev)
if err != nil {
logger.Error("Failed to delete a deployment", zap.Error(err))
}
logger.Info("Deleted deployment")

err = c.deleteAutoscalerDeployment(ctx, rev)
if err != nil {
logger.Error("Failed to delete autoscaler Deployment", zap.Error(err))
}
logger.Info("Deleted autoscaler Deployment")

err = c.deleteAutoscalerService(ctx, rev)
if err != nil {
logger.Error("Failed to delete autoscaler Service", zap.Error(err))
}
logger.Info("Deleted autoscaler Service")

err = c.deleteService(ctx, rev)
if err != nil {
logger.Error("Failed to delete k8s service", zap.Error(err))
if rev.Spec.ServingState == v1alpha1.RevisionServingStateRetired {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@josephburnett I thought we just added a comment indicating this state was unused? Should we just delete the code?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree it's cleaner to delete Retired state. I added this comment to issue #645

// Delete the k8s deployment and revision service if serving state is Retired.
logger.Info("Deleting the resources for revision")
if err := c.deleteDeployment(ctx, rev); err != nil {
return err
}
if err := c.deleteService(ctx, rev); err != nil {
return err
}
} else {
// Serving state is RevisionServingStateReserve. Keep the revision service and update
// the deployment replicas to be 0.
if cond := rev.Status.GetCondition(v1alpha1.RevisionConditionReady); cond != nil && cond.Reason != "Inactive" {
if cond.Reason == "Deactivating" {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not be predicating logic on cond.Reason, to me this is an indicator that our model for the lifecycle of inactive revisions is incomplete. This is clearly an extension of the logic we currently have in here, but we need to prioritize fixing this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we want to do this (#645 (comment)) but before we pivot to the new model, I want what we have at head to not throw 500's. This fixes the last of the 500's that happens just during deactivation.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, that's an issue out of the scope of this pr.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack. Does that mean that cleaning this up is the top priority after this goes in?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can take it after this pr and oncall.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1. After we fix this 503 issue, the top priority is migrating us to the new model you described and (hopefully) getting rid of serving state.

logger.Info("The revision is in Deactivating condition")
return nil
}
rev.Status.MarkDeactivating()
_, err := c.updateStatus(rev)
if err != nil {
logger.Error("Error updating revision condition to be deactivating",
zap.Error(err))
}
return err
}
if err := c.deactivateDeployment(ctx, rev); err != nil {
return err
}
}
logger.Info("Deleted service")

// And the deployment is no longer ready, so update that
rev.Status.MarkInactive()
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording inactivation of revision", zap.Error(err))
if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil {
return err
}

return nil
return c.deleteAutoscalerService(ctx, rev)
}

func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
Expand Down Expand Up @@ -553,6 +552,32 @@ func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revis
return nil
}

func (c *Controller) deactivateDeployment(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
deploymentName := controller.GetRevisionDeploymentName(rev)
dc := c.KubeClientSet.AppsV1().Deployments(rev.Namespace)
deployment, err := dc.Get(deploymentName, metav1.GetOptions{})
if err != nil {
logger.Errorf("Failed to get deployment %q", deploymentName)
return err
}
if *deployment.Spec.Replicas == 0 {
logger.Infof("Deployment %s is scaled to 0 already.", deploymentName)
return nil
}

logger.Infof("Deactivating deployment %q", deploymentName)
deployment.Spec.Replicas = new(int32)
*deployment.Spec.Replicas = int32(0)
_, err = dc.Update(deployment)
if err != nil {
logger.Errorf("Error deactivating deployment %q: %s", deploymentName, err)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return err ?

(or if that isn't desirable we should still return to avoid seeing the log.Infof success here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah thanks!

return err
}
logger.Infof("Successfully scaled deployment %s to 0.", deploymentName)
return nil
}

func (c *Controller) deleteDeployment(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
deploymentName := controller.GetRevisionDeploymentName(rev)
Expand Down Expand Up @@ -581,24 +606,19 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi
// First, check if deployment exists already.
deploymentName := controller.GetRevisionDeploymentName(rev)

if _, err := dc.Get(deploymentName, metav1.GetOptions{}); err != nil {
deploymentExists := true
_, err := dc.Get(deploymentName, metav1.GetOptions{})
if err != nil {
if !apierrs.IsNotFound(err) {
logger.Errorf("deployments.Get for %q failed: %s", deploymentName, err)
return err
}
logger.Infof("Deployment %q doesn't exist, creating", deploymentName)
} else {
// TODO(mattmoor): Compare the deployments and update if it has changed
// out from under us.
logger.Infof("Found existing deployment %q", deploymentName)
return nil
deploymentExists = false
}

// Create the deployment.
deployment := MakeServingDeployment(logger, rev, c.getNetworkConfig(), c.controllerConfig)

// Resolve tag image references to digests.
if err := c.resolver.Resolve(deployment); err != nil {
if err = c.resolver.Resolve(deployment); err != nil {
logger.Error("Error resolving deployment", zap.Error(err))
rev.Status.MarkContainerMissing(err.Error())
if _, err := c.updateStatus(rev); err != nil {
Expand All @@ -608,10 +628,16 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi
return err
}

logger.Infof("Creating Deployment: %q", deployment.Name)
_, createErr := dc.Create(deployment)

return createErr
if deploymentExists {
// TODO(mattmoor): Compare the deployments and update if it has changed
// out from under us.
logger.Infof("Found existing deployment %q, updating", deploymentName)
_, err = dc.Update(deployment)
} else {
logger.Infof("Deployment %q doesn't exist, creating", deploymentName)
_, err = dc.Create(deployment)
}
return err
}

func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error {
Expand Down
21 changes: 12 additions & 9 deletions pkg/controller/revision/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,8 @@ func getTestRevision() *v1alpha1.Revision {
Name: "test-rev",
Namespace: testNamespace,
Labels: map[string]string{
"testLabel1": "foo",
"testLabel2": "bar",
serving.RouteLabelKey: "test-route",
"testLabel1": "foo",
"testLabel2": "bar",
},
Annotations: map[string]string{
"testAnnotation": "test",
Expand Down Expand Up @@ -1503,8 +1502,8 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) {
// appropriate.
createRevision(elaClient, elaInformer, controller, rev)

expectedDeploymentName := fmt.Sprintf("%s-deployment", rev.Name)
_, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{})
deploymentName := fmt.Sprintf("%s-deployment", rev.Name)
_, err := kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Couldn't get ela deployment: %v", err)
}
Expand All @@ -1514,10 +1513,14 @@ func TestActiveToReserveRevisionDeletesStuff(t *testing.T) {
rev.Spec.ServingState = v1alpha1.RevisionServingStateReserve
updateRevision(elaClient, elaInformer, controller, rev)

// Expect the deployment to be gone.
deployment, err := kubeClient.AppsV1().Deployments(testNamespace).Get(expectedDeploymentName, metav1.GetOptions{})
if err == nil {
t.Fatalf("Expected ela deployment to be missing but it was really here: %v", deployment)
// Expect the deployment to be there.
_, err = kubeClient.AppsV1().Deployments(testNamespace).Get(deploymentName, metav1.GetOptions{})

if err != nil {
if apierrs.IsNotFound(err) {
t.Fatalf("Expected k8s deployment to be there but it was gone: %s/%s", testNamespace, deploymentName)
}
t.Fatalf("There was an error to get the deployment %s while it exists", deploymentName)
}
}

Expand Down
Loading