Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions pkg/controller/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package configuration

import (
"context"
"fmt"
"reflect"

Expand All @@ -26,6 +27,7 @@ import (
servinginformers "github.com/knative/serving/pkg/client/informers/externalversions/serving/v1alpha1"
listers "github.com/knative/serving/pkg/client/listers/serving/v1alpha1"
"github.com/knative/serving/pkg/controller"
"github.com/knative/serving/pkg/logging"
"github.com/knative/serving/pkg/logging/logkey"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -102,6 +104,7 @@ func (c *Controller) Reconcile(key string) error {
}
// Wrap our logger with the additional context of the configuration that we are reconciling.
logger := loggerWithConfigInfo(c.Logger, namespace, name)
ctx := logging.WithLogger(context.TODO(), logger)

// Get the Configuration resource with this namespace/name
config, err := c.configurationLister.Configurations(namespace).Get(name)
Expand All @@ -115,6 +118,19 @@ func (c *Controller) Reconcile(key string) error {

// Don't modify the informer's copy.
config = config.DeepCopy()

// Reconcile this copy of the configuration and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, config)
if _, err := c.updateStatus(config); err != nil {
logger.Warn("Failed to update configuration status", zap.Error(err))
return err
}
return err
}

func (c *Controller) reconcile(ctx context.Context, config *v1alpha1.Configuration) error {
logger := logging.FromContext(ctx)
config.Status.InitializeConditions()

// First, fetch the revision that should exist for the current generation
Expand Down Expand Up @@ -173,12 +189,6 @@ func (c *Controller) Reconcile(key string) error {
return err
}

// Reflect any status changes that occurred during reconciliation.
if _, err := c.updateStatus(config); err != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious what you think about not splitting these methods but having this as a top level defer call

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this, but this felt clearer (less magical) to me. One thing that makes hard is returning an error on updateStatus (you need to use named return values).

LMK if you feel strongly (the naming could also be more original, but nothing else felt appreciably better).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly - was just wondering if you considered it

logger.Error("Error updating configuration", zap.Error(err))
return err
}

return nil
}

Expand Down
81 changes: 36 additions & 45 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package revision
import (
"context"
"fmt"
"log"
"net/http"
"reflect"
"strings"
Expand Down Expand Up @@ -274,14 +273,28 @@ func (c *Controller) Reconcile(key string) error {
}
// Don't modify the informer's copy.
rev = rev.DeepCopy()

// Reconcile this copy of the revision and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, rev)
if _, err := c.updateStatus(rev); err != nil {
logger.Warn("Failed to update revision status", zap.Error(err))
return err
}
return err
}

func (c *Controller) reconcile(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)

rev.Status.InitializeConditions()
c.updateRevisionLoggingURL(rev)

if rev.Spec.BuildName != "" {
rev.Status.InitializeBuildCondition()
build, err := c.buildLister.Builds(rev.Namespace).Get(rev.Spec.BuildName)
if err != nil {
logger.Errorf("Error fetching Build %q for Revision %q: %v", rev.Spec.BuildName, key, err)
logger.Errorf("Error fetching Build %q for Revision %q: %v", rev.Spec.BuildName, rev.Name, err)
return err
}
before := rev.Status.GetCondition(v1alpha1.RevisionConditionBuildSucceeded)
Expand All @@ -305,15 +318,6 @@ func (c *Controller) Reconcile(key string) error {
}
}

// TODO(mattmoor): Remove this comment.
// In the level-based reconciliation we've started moving to in #1208, this should be the last thing we do.
// This controller is substantial enough that we will slowly move pieces above this line until it is the
// last thing, and then remove this comment.
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error updating Revision status", zap.Error(err))
return err
}

bc := rev.Status.GetCondition(v1alpha1.RevisionConditionBuildSucceeded)
if bc == nil || bc.Status == corev1.ConditionTrue {
// There is no build, or the build completed successfully.
Expand Down Expand Up @@ -426,7 +430,6 @@ func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) {
logger.Infof("Endpoint %q is ready", eName)
rev.Status.MarkResourcesAvailable()
rev.Status.MarkContainerHealthy()
log.Printf("UPDATING STATUS TO: %v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error marking revision ready", zap.Error(err))
return
Expand All @@ -452,44 +455,41 @@ func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) {
func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
logger.Info("Deleting the resources for revision")
err := c.deleteDeployment(ctx, rev)
if err != nil {

if err := c.deleteDeployment(ctx, rev); err != nil {
logger.Error("Failed to delete a deployment", zap.Error(err))
return err
}
logger.Info("Deleted deployment")

err = c.deleteAutoscalerDeployment(ctx, rev)
if err != nil {
if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler Deployment", zap.Error(err))
return err
}
logger.Info("Deleted autoscaler Deployment")

err = c.deleteAutoscalerService(ctx, rev)
if err != nil {
if err := c.deleteAutoscalerService(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler Service", zap.Error(err))
return err
}
logger.Info("Deleted autoscaler Service")

if c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() {
if err := c.deleteVpa(ctx, rev); err != nil {
logger.Error("Failed to delete VPA", zap.Error(err))
return err
}
logger.Info("Deleted VPA")
}

err = c.deleteService(ctx, rev)
if err != nil {
if err := c.deleteService(ctx, rev); err != nil {
logger.Error("Failed to delete k8s service", zap.Error(err))
return err
}
logger.Info("Deleted service")

// And the deployment is no longer ready, so update that
rev.Status.MarkInactive()
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording inactivation of revision", zap.Error(err))
return err
}

return nil
}
Expand All @@ -505,29 +505,31 @@ func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revis
// Autoscale the service
if err := c.reconcileAutoscalerDeployment(ctx, rev); err != nil {
logger.Error("Failed to create autoscaler Deployment", zap.Error(err))
return err
}
if err := c.reconcileAutoscalerService(ctx, rev); err != nil {
logger.Error("Failed to create autoscaler Service", zap.Error(err))
return err
}
if c.controllerConfig.EnableVarLogCollection {
if err := c.reconcileFluentdConfigMap(ctx, rev); err != nil {
logger.Error("Failed to create fluent config map", zap.Error(err))
return err
}
}

// Vertically autoscale the revision pods
if c.controllerConfig.AutoscaleEnableVerticalPodAutoscaling.Get() {
if err := c.reconcileVpa(ctx, rev); err != nil {
logger.Error("Failed to create the vertical pod autoscaler for Deployment", zap.Error(err))
return err
}
}

// Create k8s service
serviceName, err := c.reconcileService(ctx, rev)
if err != nil {
if err := c.reconcileService(ctx, rev); err != nil {
logger.Error("Failed to create k8s service", zap.Error(err))
} else {
rev.Status.ServiceName = serviceName
return err
}

// Check to see if the revision has already been marked as ready and
Expand All @@ -551,14 +553,6 @@ func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revis
}
rev.Status.MarkDeploying(reason)

// By updating our deployment status we will trigger a Reconcile()
// that will watch for service to become ready for serving traffic.
logger.Infof("Updating status with the following conditions %+v", rev.Status.Conditions)
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording build completion", zap.Error(err))
return err
}

return nil
}

Expand Down Expand Up @@ -605,10 +599,6 @@ func (c *Controller) reconcileDeployment(ctx context.Context, rev *v1alpha1.Revi
if err := c.resolver.Resolve(deployment); err != nil {
logger.Error("Error resolving deployment", zap.Error(err))
rev.Status.MarkContainerMissing(err.Error())
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error recording resolution problem", zap.Error(err))
return err
}
return err
}

Expand All @@ -634,29 +624,30 @@ func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision)
return nil
}

func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revision) (string, error) {
func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
ns := controller.GetServingNamespaceName(rev.Namespace)
sc := c.KubeClientSet.CoreV1().Services(ns)
serviceName := controller.GetServingK8SServiceNameForRevision(rev)
rev.Status.ServiceName = serviceName

if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil {
if !apierrs.IsNotFound(err) {
logger.Errorf("services.Get for %q failed: %s", serviceName, err)
return "", err
return err
}
logger.Infof("serviceName %q doesn't exist, creating", serviceName)
} else {
// TODO(vaikas): Check that the service is legit and matches what we expect
// to have there.
logger.Infof("Found existing service %q", serviceName)
return serviceName, nil
return nil
}

service := MakeRevisionK8sService(rev)
logger.Infof("Creating service: %q", service.Name)
_, err := sc.Create(service)
return serviceName, err
return err
}

func (c *Controller) reconcileFluentdConfigMap(ctx context.Context, rev *v1alpha1.Revision) error {
Expand Down
42 changes: 20 additions & 22 deletions pkg/controller/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,18 @@ func (c *Controller) updateRouteEvent(key string) error {
}
// Don't modify the informers copy
route = route.DeepCopy()

// Reconcile this copy of the route and then write back any status
// updates regardless of whether the reconciliation errored out.
err = c.reconcile(ctx, route)
if _, err := c.updateStatus(ctx, route); err != nil {
return err
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

route's updateStatus has logging so this is great

}
return err
}

func (c *Controller) reconcile(ctx context.Context, route *v1alpha1.Route) error {
logger := logging.FromContext(ctx)
route.Status.InitializeConditions()

logger.Infof("Reconciling route :%v", route)
Expand All @@ -204,10 +216,10 @@ func (c *Controller) updateRouteEvent(key string) error {
return err
}

// Call syncTrafficTargetsAndUpdateRouteStatus, which also updates the Route.Status
// Call syncTrafficTargets, which also updates the Route.Status
// to contain the domain we will use for Ingress creation.

if _, err := c.syncTrafficTargetsAndUpdateRouteStatus(ctx, route); err != nil {
if _, err := c.syncTrafficTargets(ctx, route); err != nil {
return err
}

Expand All @@ -217,10 +229,7 @@ func (c *Controller) updateRouteEvent(key string) error {
logger.Error("Failed to create or update ingress rule", zap.Error(err))
return err
}

logger.Info("Route successfully synced")
_, err = c.updateStatus(ctx, route)
return err
return nil
}

func (c *Controller) getDomainConfig() *DomainConfig {
Expand All @@ -240,10 +249,10 @@ func (c *Controller) routeDomain(route *v1alpha1.Route) string {
return fmt.Sprintf("%s.%s.%s", route.Name, route.Namespace, domain)
}

// syncTrafficTargetsAndUpdateRouteStatus attempts to converge the actual state and desired state
// syncTrafficTargets attempts to converge the actual state and desired state
// according to the traffic targets in Spec field for Route resource. It then updates the Status
// block of the Route and returns the updated one.
func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(ctx context.Context, route *v1alpha1.Route) (*v1alpha1.Route, error) {
func (c *Controller) syncTrafficTargets(ctx context.Context, route *v1alpha1.Route) (*v1alpha1.Route, error) {
logger := logging.FromContext(ctx)
c.consolidateTrafficTargets(ctx, route)
configMap, revMap, err := c.getDirectTrafficTargets(ctx, route)
Expand Down Expand Up @@ -292,7 +301,7 @@ func (c *Controller) syncTrafficTargetsAndUpdateRouteStatus(ctx context.Context,
route.Status.Traffic = traffic
}
route.Status.Domain = c.routeDomain(route)
return c.updateStatus(ctx, route)
return route, nil
}

func (c *Controller) reconcilePlaceholderService(ctx context.Context, route *v1alpha1.Route) error {
Expand Down Expand Up @@ -358,10 +367,6 @@ func (c *Controller) getDirectTrafficTargets(ctx context.Context, route *v1alpha
logger.Infof("Failed to fetch Configuration %q: %v", configName, err)
if apierrs.IsNotFound(err) {
route.Status.MarkTrafficNotAssigned("Configuration", configName)
if _, err := c.updateStatus(ctx, route); err != nil {
// If we failed to update the status, return that error instead of the "config not found" error.
return nil, nil, err
}
}
return nil, nil, err
}
Expand All @@ -373,10 +378,6 @@ func (c *Controller) getDirectTrafficTargets(ctx context.Context, route *v1alpha
logger.Infof("Failed to fetch Revision %q: %v", revName, err)
if apierrs.IsNotFound(err) {
route.Status.MarkTrafficNotAssigned("Revision", revName)
if _, err := c.updateStatus(ctx, route); err != nil {
// If we failed to update the status, return that error instead of the "revision not found" error.
return nil, nil, err
}
}
return nil, nil, err
}
Expand Down Expand Up @@ -446,10 +447,6 @@ func (c *Controller) extendRevisionsWithIndirectTrafficTargets(
logger.Errorf("Failed to fetch Revision %s: %s", revName, err)
if apierrs.IsNotFound(err) {
route.Status.MarkTrafficNotAssigned("Revision", revName)
if _, err := c.updateStatus(ctx, route); err != nil {
// If we failed to update the status, return that error instead of the "revision not found" error.
return err
}
}
return err
}
Expand Down Expand Up @@ -969,9 +966,10 @@ func (c *Controller) SyncConfiguration(config *v1alpha1.Configuration) {

// Don't modify the informers copy
route = route.DeepCopy()
if _, err := c.syncTrafficTargetsAndUpdateRouteStatus(ctx, route); err != nil {
if _, err := c.syncTrafficTargets(ctx, route); err != nil {
logger.Error("Error updating route upon configuration becoming ready", zap.Error(err))
}
c.updateStatus(ctx, route)
}

func (c *Controller) SyncIngress(ingress *v1beta1.Ingress) {
Expand Down
Loading