Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 52 additions & 8 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,18 @@ import (
buildinformers "github.com/knative/build/pkg/client/informers/externalversions"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
"github.com/knative/serving/pkg/controller/build"
"github.com/knative/serving/pkg/controller/configuration"
"github.com/knative/serving/pkg/controller/deployment"
"github.com/knative/serving/pkg/controller/endpoint"
"github.com/knative/serving/pkg/controller/ingress"
"github.com/knative/serving/pkg/controller/revision"
"github.com/knative/serving/pkg/controller/route"
"github.com/knative/serving/pkg/controller/service"
cfgrcv "github.com/knative/serving/pkg/receiver/configuration"
revrcv "github.com/knative/serving/pkg/receiver/revision"
rtrcv "github.com/knative/serving/pkg/receiver/route"
svcrcv "github.com/knative/serving/pkg/receiver/service"
"github.com/knative/serving/pkg/signals"
"go.opencensus.io/exporter/prometheus"
"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -134,7 +142,7 @@ func main() {
logger.Fatalf("Error loading controller config: %v", err)
}

revControllerConfig := revision.ControllerConfig{
revControllerConfig := revrcv.ControllerConfig{
AutoscaleConcurrencyQuantumOfTime: autoscaleConcurrencyQuantumOfTime,
AutoscaleEnableSingleConcurrency: autoscaleEnableSingleConcurrency,
AutoscalerImage: autoscalerImage,
Expand All @@ -149,15 +157,51 @@ func main() {
QueueProxyLoggingLevel: queueProxyLoggingLevel.Get(),
}

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
controllers := []controller.Interface{
configuration.NewController(kubeClient, elaClient, buildClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
revision.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, buildInformerFactory, cfg, &revControllerConfig, logger),
route.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, autoscaleEnableScaleToZero, logger),
service.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
// The receivers are what implement our core logic. Each of these subscribe to some subset of the resources for which we
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Receiver" feels like a funny name for core logic.

What about "Implementation", "Director", "Manager", or "Producer"?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally up for a naming 🚲🏠 once we settle on if/what we're naming :)

// have controllers below.
receivers := []interface{}{
revrcv.New(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, &revControllerConfig, logger),
rtrcv.New(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, autoscaleEnableScaleToZero, logger),
cfgrcv.New(kubeClient, elaClient, buildClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit/note: it feels to me like a common "Options" struct would be useful here for configuring these receivers. In particular, it feels like there are small differences in the arguments (e.g. the configuration takes a buildClient as the 3rd argument, but the others take additional arguments around the 6th position, with logger always last) which makes this harder to use.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, totally agree. We see the same thing with the controller constructors today (initially we were trying to use a common ctor and a loop to cut down on this boilerplate, but ultimately unrolled and diverged).

Following on @mdemirhan's initial refactoring, we should perhaps define controller.Options with the common arguments taken by controller.Base and a majority of these argument lists would shrink dramatically. I can open an issue to track this discussion.

svcrcv.New(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger),
}

// Construct a collection of thin workqueue controllers. Each of these looks for entries in "receivers" that implements foo.Receiver,
// and on reconciliation events forwards the work to those receivers.
rtc, err := route.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Route controller: %v", err)
}
svc, err := service.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Service controller: %v", err)
}
config, err := configuration.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Configuration controller: %v", err)
}
rc, err := revision.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Revision controller: %v", err)
}
dc, err := deployment.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Deployment controller: %v", err)
}
ec, err := endpoint.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Endpoints controller: %v", err)
}
bc, err := build.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, buildInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Build controller: %v", err)
}
ic, err := ingress.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, *controllerConfig, logger, receivers...)
if err != nil {
logger.Fatalf("Error creating Ingress controller: %v", err)
}
controllers := []controller.Interface{rtc, svc, config, rc, dc, ec, bc, ic}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there may be two different goals here:

  1. Reduce the number of informers/watches managed by the controller.
  2. Normalize the process of having changes delivered to the business logic.

I'm presuming the priority is on #2, and not on #1?


go kubeInformerFactory.Start(stopCh)
go elaInformerFactory.Start(stopCh)
go buildInformerFactory.Start(stopCh)
Expand Down
131 changes: 131 additions & 0 deletions pkg/controller/build/build.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
Copyright 2018 Google LLC.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package build

import (
"fmt"

buildv1alpha1 "github.com/knative/build/pkg/apis/build/v1alpha1"
buildinformers "github.com/knative/build/pkg/client/informers/externalversions"
listers "github.com/knative/build/pkg/client/listers/build/v1alpha1"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
"github.com/knative/serving/pkg/controller"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)

const controllerAgentName = "build-controller"

// Receiver defines the interface that a receiver must implement to receive events
// from this Controller.
type Receiver interface {
SyncBuild(*buildv1alpha1.Build) error
}

// Controller implements the controller for Build resources
type Controller struct {
*controller.Base

// lister indexes properties about Build
lister listers.BuildLister
synced cache.InformerSynced

receivers []Receiver
}

// NewController creates a new Build controller
func NewController(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
buildInformerFactory buildinformers.SharedInformerFactory,
config *rest.Config,
controllerConfig controller.Config,
logger *zap.SugaredLogger,
receivers ...interface{}) (controller.Interface, error) {

// obtain references to a shared index informer for the Build type.
informer := buildInformerFactory.Build().V1alpha1().Builds()

controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet, kubeInformerFactory,
elaInformerFactory, informer.Informer(), controllerAgentName, "Builds", logger),
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
}

for _, rcv := range receivers {
if dr, ok := rcv.(Receiver); ok {
controller.receivers = append(controller.receivers, dr)
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest doing this block above all the informer setup; that way there's no need to do any cleanup in the error case. (In the possible future where we wanted to be able to flag enable certain features/directors.)

if len(controller.receivers) == 0 {
return nil, fmt.Errorf("None of the provided receivers implement build.Receiver. " +
"If the Build controller is no longer needed it should be removed.")
}
return controller, nil
}

// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
return c.RunController(threadiness, stopCh,
[]cache.InformerSynced{c.synced}, c.syncHandler, "Build")
}

// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Build
// resource with the current status of the resource.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this comment still accurate?

func (c *Controller) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this return nil?

}

// Get the Build resource with this namespace/name
original, err := c.lister.Builds(namespace).Get(name)
if err != nil {
// The resource may no longer exist, in which case we stop
// processing.
if errors.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("build %q in work queue no longer exists", key))
return nil
}

return err
}

for _, dr := range c.receivers {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, the effects of this loop are:

  • All receivers must succeed to mark this item successful in the workqueue.
  • If a single receiver returns an error, all following receivers are skipped and the item is retried later (using the default workqueue retry rules).
  • Receivers that return success before a later receiver returns an error will be tried again with the same item (using the default workqueue retry rules).
  • Only one receiver may process an item at a time.

I'm concerned about the conflation of success/failure of all receivers into the failure state of a single queue item. Issues that come to mind include:

  • If a receiver consistently fails to process an event successfully, receivers later in the list never see that event.
  • Later receivers must wait until all prior receivers have finished before processing an event, leading to higher reconcile latencies for those receivers.

Randomizing or reversing the order of the list would mitigate these issues somewhat, but I suspect they'd still cause hard-to-debug problems occasionally.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I specifically included this ~disclaimer in the pseudo-code of the body above for this reason:

// This is an example, we could also fan out goroutines with errgroup, or something else.
for _, fr := range c.receivers {

It would certainly mean a coarser grain of retries to go this route, but I'm unsure if that's a thing we should optimize for.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, errgroup would fix the starvation and latency issues. The retries thing feels vaguely incorrect but I agree it doesn't seem like a blocker right now.

It's possible that in larger clusters the additional reconciles would cause unacceptable apiserver load, but at that point there's nothing stopping the internals from changing to one workqueue per receiver. The Receiver interface remains the same.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not aware of any controller across the kube ecosystem that has succeeded in having shared workqueues, since it becomes really difficult to reason about the state of the caches relative to the action. Generally the most successful patterns are where you try to get all of your work into a single action (level driven) and queue up synchronizations of that. I've seen that scale to the largest kube clusters in the world, so you're unlikely to go too far.

// Don't modify the informer's copy, and give each receiver a fresh copy.
cp := original.DeepCopy()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is this really what want here? Seems like if a receiver (say the first one) modifies the object, then by copying the original, we're now sending an out of date object to the next receiver (say the second one) and if they try to update it, seems like the update will be rejected by k8s because the object is out of date, or the receiver will have to explicitly fetch a newer version of the object?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more important in the case of parallelism (see the other thread about using goroutines here).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like it would be more correct to give each receiver the resource key and a lookup/update function. The controller (that name seems incorrect given this new pattern -- maybe "observer"?) could then perform caching, fetches, and possibly even merging as needed.

Unfortunately, if all of our controllers are actually using GetControllerOf, they'll probably all need to fetch the object in order to figure out if they should be acting on it.

if err := dr.SyncBuild(cp); err != nil {
return err
}
}
return nil
}
Loading