Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func main() {
configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
coreServiceInformer := kubeInformerFactory.Core().V1().Services()
ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses()
vpaInformer := vpaInformerFactory.Poc().V1alpha1().VerticalPodAutoscalers()

Expand All @@ -175,7 +176,8 @@ func main() {
controllers := []controller.Interface{
configuration.NewController(opt, configurationInformer, revisionInformer, cfg),
revision.NewController(opt, vpaClient, revisionInformer, buildInformer, configMapInformer,
deploymentInformer, endpointsInformer, vpaInformer, cfg, &revControllerConfig),
deploymentInformer, coreServiceInformer, endpointsInformer, vpaInformer,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more Informers I see being added, the more I like passing in the map of them so each and every NewController method doesn't need to change when one of them needs something ;)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benefit of this style

  • type safety
  • plus it makes dependencies explicit

cfg, &revControllerConfig),
route.NewController(opt, routeInformer, configurationInformer, ingressInformer,
configMapInformer, cfg, autoscaleEnableScaleToZero),
service.NewController(opt, serviceInformer, configurationInformer, routeInformer, cfg),
Expand All @@ -197,6 +199,7 @@ func main() {
buildInformer.Informer().HasSynced,
configMapInformer.Informer().HasSynced,
deploymentInformer.Informer().HasSynced,
coreServiceInformer.Informer().HasSynced,
endpointsInformer.Informer().HasSynced,
ingressInformer.Informer().HasSynced,
} {
Expand Down
250 changes: 138 additions & 112 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
"time"

// TODO(mattmoor): Used by the commented checkAndUpdateDeployment logic below.
// "github.com/google/go-cmp/cmp"
// "github.com/google/go-cmp/cmp/cmpopts"
// "k8s.io/apimachinery/pkg/api/resource"

"github.com/knative/serving/pkg"

"github.com/google/go-cmp/cmp"
"github.com/josephburnett/k8sflag/pkg/k8sflag"
"github.com/knative/serving/pkg/apis/serving"
"github.com/knative/serving/pkg/logging"
Expand All @@ -55,6 +55,7 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/runtime"
appsv1listers "k8s.io/client-go/listers/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -110,6 +111,8 @@ type Controller struct {
revisionLister listers.RevisionLister
buildLister buildlisters.BuildLister
deploymentLister appsv1listers.DeploymentLister
serviceLister corev1listers.ServiceLister
endpointsLister corev1listers.EndpointsLister

buildtracker *buildTracker

Expand Down Expand Up @@ -182,6 +185,7 @@ func NewController(
buildInformer buildinformers.BuildInformer,
configMapInformer corev1informers.ConfigMapInformer,
deploymentInformer appsv1informers.DeploymentInformer,
serviceInformer corev1informers.ServiceInformer,
endpointsInformer corev1informers.EndpointsInformer,
vpaInformer vpav1alpha1informers.VerticalPodAutoscalerInformer,
config *rest.Config,
Expand All @@ -198,6 +202,8 @@ func NewController(
revisionLister: revisionInformer.Lister(),
buildLister: buildInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
serviceLister: serviceInformer.Lister(),
endpointsLister: endpointsInformer.Lister(),
buildtracker: &buildTracker{builds: map[key]set{}},
resolver: &digestResolver{client: opt.KubeClientSet, transport: http.DefaultTransport},
controllerConfig: controllerConfig,
Expand All @@ -218,12 +224,8 @@ func NewController(
})

endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing the owner of the endpoints is the service - hence no filtering on the revision as an owner?

Does the Filter support label keys?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The filter is just a function, so we could potentially generalize this to something like our Controller kind filter that takes a label key. WDYT?

AddFunc: func(obj interface{}) {
c.SyncEndpoints(obj.(*corev1.Endpoints))
},
UpdateFunc: func(old, new interface{}) {
c.SyncEndpoints(new.(*corev1.Endpoints))
},
AddFunc: c.EnqueueEndpointsRevision,
UpdateFunc: controller.PassNew(c.EnqueueEndpointsRevision),
})

deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
Expand Down Expand Up @@ -340,6 +342,10 @@ func (c *Controller) reconcile(ctx context.Context, rev *v1alpha1.Revision) erro
logger.Error("Failed to create a deployment", zap.Error(err))
return err
}
if err := c.reconcileService(ctx, rev); err != nil {
logger.Error("Failed to create k8s service", zap.Error(err))
return err
}

switch rev.Spec.ServingState {
case v1alpha1.RevisionServingStateActive:
Expand Down Expand Up @@ -380,72 +386,19 @@ func (c *Controller) EnqueueBuildTrackers(obj interface{}) {
}
}

func (c *Controller) SyncEndpoints(endpoint *corev1.Endpoints) {
eName := endpoint.Name
namespace := endpoint.Namespace
// Lookup and see if this endpoints corresponds to a service that
// we own and hence the Revision that created this service.
revName := lookupServiceOwner(endpoint)
if revName == "" {
return
}
logger := loggerWithRevisionInfo(c.Logger, namespace, revName)

rev, err := c.revisionLister.Revisions(namespace).Get(revName)
if err != nil {
logger.Error("Error fetching revision", zap.Error(err))
return
}

// Check to see if endpoint is the service endpoint
if eName != controller.GetServingK8SServiceNameForRevision(rev) {
return
}

// Check to see if the revision has already been marked as ready or failed
// and if it is, then there's no need to do anything to it.
if c := rev.Status.GetCondition(v1alpha1.RevisionConditionReady); c != nil && c.Status != corev1.ConditionUnknown {
return
}

// Don't modify the informer's copy.
rev = rev.DeepCopy()

if getIsServiceReady(endpoint) {
logger.Infof("Endpoint %q is ready", eName)
rev.Status.MarkResourcesAvailable()
rev.Status.MarkContainerHealthy()
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error marking revision ready", zap.Error(err))
return
}
c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady", "Revision becomes ready upon endpoint %q becoming ready", endpoint.Name)
return
}

revisionAge := time.Now().Sub(getRevisionLastTransitionTime(rev))
if revisionAge < serviceTimeoutDuration {
return
func (c *Controller) EnqueueEndpointsRevision(obj interface{}) {
endpoints := obj.(*corev1.Endpoints)
// Use the label on the Endpoints (from Service) to determine whether it is
// owned by a Revision, and if so queue that Revision.
if revisionName, ok := endpoints.Labels[serving.RevisionLabelKey]; ok {
c.EnqueueKey(endpoints.Namespace + "/" + revisionName)
}

rev.Status.MarkServiceTimeout()
if _, err := c.updateStatus(rev); err != nil {
logger.Error("Error marking revision failed", zap.Error(err))
return
}
c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed", "Revision did not become ready due to endpoint %q", endpoint.Name)
return
}

func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)

// Delete the user resources
if err := c.deleteService(ctx, rev); err != nil {
logger.Error("Failed to delete k8s service", zap.Error(err))
return err
}

// Delete the resources we set up to autoscale the user resources.
if err := c.deleteAutoscalerDeployment(ctx, rev); err != nil {
logger.Error("Failed to delete autoscaler Deployment", zap.Error(err))
Expand All @@ -469,12 +422,6 @@ func (c *Controller) deleteK8SResources(ctx context.Context, rev *v1alpha1.Revis
func (c *Controller) createK8SResources(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)

// Set up the user resources
if err := c.reconcileService(ctx, rev); err != nil {
logger.Error("Failed to create k8s service", zap.Error(err))
return err
}

// Set up resources to autoscale the user resources.
if err := c.reconcileAutoscalerDeployment(ctx, rev); err != nil {
logger.Error("Failed to create autoscaler Deployment", zap.Error(err))
Expand Down Expand Up @@ -626,47 +573,138 @@ func (c *Controller) deleteDeployment(ctx context.Context, deployment *appsv1.De
return nil
}

func (c *Controller) deleteService(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
serviceName := controller.GetServingK8SServiceNameForRevision(rev)
ns := controller.GetServingNamespaceName(rev.Namespace)

err := c.KubeClientSet.CoreV1().Services(ns).Delete(serviceName, fgDeleteOptions)
if apierrs.IsNotFound(err) {
return nil
} else if err != nil {
logger.Errorf("service.Delete for %q failed: %s", serviceName, err)
return err
}
logger.Infof("Deleted service %q", serviceName)
return nil
}

func (c *Controller) reconcileService(ctx context.Context, rev *v1alpha1.Revision) error {
logger := logging.FromContext(ctx)
ns := controller.GetServingNamespaceName(rev.Namespace)
sc := c.KubeClientSet.CoreV1().Services(ns)
serviceName := controller.GetServingK8SServiceNameForRevision(rev)

rev.Status.ServiceName = serviceName

if _, err := sc.Get(serviceName, metav1.GetOptions{}); err != nil {
if !apierrs.IsNotFound(err) {
logger.Errorf("services.Get for %q failed: %s", serviceName, err)
service, err := c.serviceLister.Services(ns).Get(serviceName)
switch rev.Spec.ServingState {
case v1alpha1.RevisionServingStateActive:
// When Active, the Service should exist and have a particular specification.
if apierrs.IsNotFound(err) {
// If it does not exist, then create it.
rev.Status.MarkDeploying("Deploying")
service, err = c.createService(ctx, rev)
if err != nil {
logger.Errorf("Error creating Service %q: %v", serviceName, err)
return err
}
logger.Infof("Created Service %q", serviceName)
} else if err != nil {
logger.Errorf("Error reconciling Active Service %q: %v", serviceName, err)
return err
} else {
// If it exists, then make sure if looks as we expect.
// It may change if a user edits things around our controller, which we
// should not allow, or if our expectations of how the service should look
// changes (e.g. we update our controller with new sidecars).
var updated bool
service, updated, err = c.checkAndUpdateService(ctx, rev, service)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing there are is no defaulting involved with the service hence why it's safe to perform checkAndUpdateService?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't seen the dueling controllers that I saw with Deployment, but if this becomes a problem we can relax it as well (until we find a better way of achieving this that works for both).

if err != nil {
logger.Errorf("Error updating Service %q: %v", serviceName, err)
return err
}
if updated {
logger.Infof("Updated Service %q", serviceName)
rev.Status.MarkDeploying("Updating")
}
}
logger.Infof("serviceName %q doesn't exist, creating", serviceName)
} else {
// TODO(vaikas): Check that the service is legit and matches what we expect
// to have there.
logger.Infof("Found existing service %q", serviceName)

// We cannot determine readiness from the Service directly. Instead, we look up
// the backing Endpoints resource and check it for healthy pods. The name of the
// Endpoints resource matches the Service it backs.
endpoints, err := c.endpointsLister.Endpoints(ns).Get(serviceName)
if apierrs.IsNotFound(err) {
// If it isn't found, then we need to wait for the Service controller to
// create it.
rev.Status.MarkDeploying("Deploying")
return nil
} else if err != nil {
logger.Errorf("Error checking Active Endpoints %q: %v", serviceName, err)
return err
}
// If the endpoints resource indicates that the Service it sits in front of is ready,
// then surface this in our Revision status as resources available (pods were scheduled)
// and container healthy (endpoints should be gated by any provided readiness checks).
if getIsServiceReady(endpoints) {
rev.Status.MarkResourcesAvailable()
rev.Status.MarkContainerHealthy()
// TODO(mattmoor): How to ensure this only fires once?
c.Recorder.Eventf(rev, corev1.EventTypeNormal, "RevisionReady",
"Revision becomes ready upon endpoint %q becoming ready", serviceName)
} else {
// If the endpoints is NOT ready, then check whether it is taking unreasonably
// long to become ready and if so mark our revision as having timed out waiting
// for the Service to become ready.
revisionAge := time.Now().Sub(getRevisionLastTransitionTime(rev))
if revisionAge >= serviceTimeoutDuration {
rev.Status.MarkServiceTimeout()
// TODO(mattmoor): How to ensure this only fires once?
c.Recorder.Eventf(rev, corev1.EventTypeWarning, "RevisionFailed",
"Revision did not become ready due to endpoint %q", serviceName)
}
}
return nil

case v1alpha1.RevisionServingStateReserve, v1alpha1.RevisionServingStateRetired:
// When Reserve or Retired, we remove the underlying Service.
if apierrs.IsNotFound(err) {
// If it does not exist, then we have nothing to do.
return nil
}
if err := c.deleteService(ctx, service); err != nil {
logger.Errorf("Error deleting Service %q: %v", serviceName, err)
return err
}
logger.Infof("Deleted Service %q", serviceName)
rev.Status.MarkInactive()
return nil

default:
logger.Errorf("Unknown serving state: %v", rev.Spec.ServingState)
return nil
}
}

func (c *Controller) createService(ctx context.Context, rev *v1alpha1.Revision) (*corev1.Service, error) {
ns := controller.GetServingNamespaceName(rev.Namespace)

// Create the service.
service := MakeRevisionK8sService(rev)
logger.Infof("Creating service: %q", service.Name)
_, err := sc.Create(service)
return err

return c.KubeClientSet.CoreV1().Services(ns).Create(service)
}

func (c *Controller) checkAndUpdateService(ctx context.Context, rev *v1alpha1.Revision, service *corev1.Service) (*corev1.Service, bool, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, but it might be good to document what the bool here is. I think it means if it an update was attempted or not?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use an enum - self documenting

logger := logging.FromContext(ctx)

desiredService := MakeRevisionK8sService(rev)

if equality.Semantic.DeepEqual(desiredService.Spec, service.Spec) {
return service, false, nil
}
logger.Infof("Reconciling service diff (-desired, +observed): %v",
cmp.Diff(desiredService.Spec, service.Spec))
service.Spec = desiredService.Spec

d, err := c.KubeClientSet.CoreV1().Services(service.Namespace).Update(service)
return d, true, err
}

func (c *Controller) deleteService(ctx context.Context, svc *corev1.Service) error {
logger := logging.FromContext(ctx)

err := c.KubeClientSet.CoreV1().Services(svc.Namespace).Delete(svc.Name, fgDeleteOptions)
if apierrs.IsNotFound(err) {
return nil
} else if err != nil {
logger.Errorf("service.Delete for %q failed: %s", svc.Name, err)
return err
}
return nil
}

func (c *Controller) reconcileFluentdConfigMap(ctx context.Context, rev *v1alpha1.Revision) error {
Expand Down Expand Up @@ -874,18 +912,6 @@ func (c *Controller) updateStatus(rev *v1alpha1.Revision) (*v1alpha1.Revision, e
return rev, nil
}

// Given an endpoint see if it's managed by us and return the
// revision that created it.
// TODO: Consider using OwnerReferences.
// https://github.com/kubernetes/sample-controller/blob/master/controller.go#L373-L384
func lookupServiceOwner(endpoint *corev1.Endpoints) string {
// see if there's a label on this object marking it as ours.
if revisionName, ok := endpoint.Labels[serving.RevisionLabelKey]; ok {
return revisionName
}
return ""
}

func (c *Controller) addConfigMapEvent(obj interface{}) {
configMap := obj.(*corev1.ConfigMap)
if configMap.Namespace != pkg.GetServingSystemNamespace() || configMap.Name != controller.GetNetworkConfigMapName() {
Expand Down
Loading