Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,21 @@ func main() {
QueueProxyLoggingLevel: queueProxyLoggingLevel.Get(),
}

opt := controller.Options{
KubeClientSet: kubeClient,
ServingClientSet: elaClient,
Logger: logger,
}

// Build all of our controllers, with the clients constructed above.
// Add new controllers to this array.
controllers := []controller.Interface{
configuration.NewController(kubeClient, elaClient, buildClient, kubeInformerFactory, elaInformerFactory, cfg, logger),
revision.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory,
buildInformerFactory, servingSystemInformerFactory, cfg, &revControllerConfig, logger),
route.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory,
servingSystemInformerFactory, cfg, autoscaleEnableScaleToZero, logger),
service.NewController(kubeClient, elaClient, kubeInformerFactory, elaInformerFactory, cfg, logger),
configuration.NewController(opt, buildClient, elaInformerFactory, cfg),
revision.NewController(opt, kubeInformerFactory, elaInformerFactory,
buildInformerFactory, servingSystemInformerFactory, cfg, &revControllerConfig),
route.NewController(opt, kubeInformerFactory, elaInformerFactory,
servingSystemInformerFactory, cfg, autoscaleEnableScaleToZero),
service.NewController(opt, elaInformerFactory, cfg),
}

go kubeInformerFactory.Start(stopCh)
Expand Down
13 changes: 3 additions & 10 deletions pkg/controller/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
buildclientset "github.com/knative/build/pkg/client/clientset/versioned"
"github.com/knative/serving/pkg/apis/serving"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
listers "github.com/knative/serving/pkg/client/listers/serving/v1alpha1"
"github.com/knative/serving/pkg/controller"
Expand All @@ -33,8 +32,6 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
)
Expand All @@ -57,22 +54,18 @@ type Controller struct {

// NewController creates a new Configuration controller
func NewController(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
opt controller.Options,
buildClientSet buildclientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
config *rest.Config,
logger *zap.SugaredLogger) controller.Interface {
config *rest.Config) controller.Interface {

// obtain references to a shared index informer for the Configuration
// and Revision type.
informer := elaInformerFactory.Serving().V1alpha1().Configurations()
revisionInformer := elaInformerFactory.Serving().V1alpha1().Revisions()

controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet,
informer.Informer(), controllerAgentName, "Configurations", logger),
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Configurations"),
buildClientSet: buildClientSet,
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/configuration/configuration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,14 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) (
elaInformer = informers.NewSharedInformerFactory(elaClient, 0)

controller = NewController(
kubeClient,
elaClient,
ctrl.Options{
kubeClient,
elaClient,
zap.NewNop().Sugar(),
},
buildClient,
kubeInformer,
elaInformer,
&rest.Config{},
zap.NewNop().Sugar(),
).(*Controller)

return
Expand Down
28 changes: 16 additions & 12 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,29 +74,33 @@ type Base struct {
Logger *zap.SugaredLogger
}

// Options defines the common controller options passed to NewBase.
// We define this to reduce the boilerplate argument list when
// creating derivative controllers.
type Options struct {
KubeClientSet kubernetes.Interface
ServingClientSet clientset.Interface
Logger *zap.SugaredLogger
}

// NewBase instantiates a new instance of Base implementing
// the common & boilerplate code between our controllers.
func NewBase(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
informer cache.SharedIndexInformer,
controllerAgentName string,
workQueueName string,
logger *zap.SugaredLogger) *Base {
func NewBase(opt Options, informer cache.SharedIndexInformer, controllerAgentName string, workQueueName string) *Base {

// Enrich the logs with controller name
logger = logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName))
logger := opt.Logger.Named(controllerAgentName).With(zap.String(logkey.ControllerType, controllerAgentName))

// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: opt.KubeClientSet.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

base := &Base{
KubeClientSet: kubeClientSet,
ElaClientSet: elaClientSet,
KubeClientSet: opt.KubeClientSet,
ElaClientSet: opt.ServingClientSet,
Recorder: recorder,
WorkQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), workQueueName),
Logger: logger,
Expand Down
17 changes: 6 additions & 11 deletions pkg/controller/revision/revision.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/knative/serving/pkg/logging/logkey"
"go.uber.org/zap"

clientset "github.com/knative/serving/pkg/client/clientset/versioned"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
kubeinformers "k8s.io/client-go/informers"
Expand All @@ -46,7 +45,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

Expand Down Expand Up @@ -164,36 +162,33 @@ type ControllerConfig struct {
// si - informer factory shared across all controllers for listening to events and indexing resource properties
// queue - message queue for handling new events. unique to this controller.
func NewController(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
opt controller.Options,
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
buildInformerFactory buildinformers.SharedInformerFactory,
servingSystemInformerFactory kubeinformers.SharedInformerFactory,
config *rest.Config,
controllerConfig *ControllerConfig,
logger *zap.SugaredLogger) controller.Interface {
controllerConfig *ControllerConfig) controller.Interface {

// obtain references to a shared index informer for the Revision and Endpoint type.
informer := elaInformerFactory.Serving().V1alpha1().Revisions()
endpointsInformer := kubeInformerFactory.Core().V1().Endpoints()
deploymentInformer := kubeInformerFactory.Apps().V1().Deployments()
configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps().Informer()

networkConfig, err := NewNetworkConfig(kubeClientSet)
networkConfig, err := NewNetworkConfig(opt.KubeClientSet)
if err != nil {
logger.Fatalf("Error loading network config: %v", err)
opt.Logger.Fatalf("Error loading network config: %v", err)
}

controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet,
informer.Informer(), controllerAgentName, "Revisions", logger),
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Revisions"),
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
endpointsSynced: endpointsInformer.Informer().HasSynced,
configMapSynced: configMapInformer.HasSynced,
buildtracker: &buildTracker{builds: map[key]set{}},
resolver: &digestResolver{client: kubeClientSet, transport: http.DefaultTransport},
resolver: &digestResolver{client: opt.KubeClientSet, transport: http.DefaultTransport},
controllerConfig: controllerConfig,
networkConfig: networkConfig,
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/revision/revision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,17 @@ func newTestControllerWithConfig(t *testing.T, controllerConfig *ControllerConfi
servingSystemInformer = kubeinformers.NewFilteredSharedInformerFactory(kubeClient, 0, pkg.GetServingSystemNamespace(), nil)

controller = NewController(
kubeClient,
elaClient,
ctrl.Options{
kubeClient,
elaClient,
zap.NewNop().Sugar(),
},
kubeInformer,
elaInformer,
buildInformer,
servingSystemInformer,
&rest.Config{},
controllerConfig,
zap.NewNop().Sugar(),
).(*Controller)

controller.resolver = &nopResolver{}
Expand Down
15 changes: 5 additions & 10 deletions pkg/controller/route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,11 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"

"github.com/knative/serving/pkg/apis/serving"
"github.com/knative/serving/pkg/apis/serving/v1alpha1"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
listers "github.com/knative/serving/pkg/client/listers/serving/v1alpha1"
"github.com/knative/serving/pkg/controller"
Expand Down Expand Up @@ -106,14 +104,12 @@ type Controller struct {
// si - informer factory shared across all controllers for listening to events and indexing resource properties
// reconcileKey - function for mapping queue keys to resource names
func NewController(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
opt controller.Options,
kubeInformerFactory kubeinformers.SharedInformerFactory,
elaInformerFactory informers.SharedInformerFactory,
servingSystemInformerFactory kubeinformers.SharedInformerFactory,
config *rest.Config,
enableScaleToZero *k8sflag.BoolFlag,
logger *zap.SugaredLogger) controller.Interface {
enableScaleToZero *k8sflag.BoolFlag) controller.Interface {

// obtain references to a shared index informer for the Routes and
// Configurations type.
Expand All @@ -122,16 +118,15 @@ func NewController(
ingressInformer := kubeInformerFactory.Extensions().V1beta1().Ingresses()
configMapInformer := servingSystemInformerFactory.Core().V1().ConfigMaps().Informer()

domainConfig, err := NewDomainConfig(kubeClientSet)
domainConfig, err := NewDomainConfig(opt.KubeClientSet)
if err != nil {
logger.Fatalf("Error loading domain config: %v", err)
opt.Logger.Fatalf("Error loading domain config: %v", err)
}

// No need to lock domainConfigMutex yet since the informers that can modify
// domainConfig haven't started yet.
controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet,
informer.Informer(), controllerAgentName, "Routes", logger),
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Routes"),
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
configSynced: configInformer.Informer().HasSynced,
Expand Down
8 changes: 5 additions & 3 deletions pkg/controller/route/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,16 @@ func newTestController(t *testing.T, elaObjects ...runtime.Object) (
servingSystemInformer = kubeinformers.NewFilteredSharedInformerFactory(kubeClient, 0, pkg.GetServingSystemNamespace(), nil)

controller = NewController(
kubeClient,
elaClient,
ctrl.Options{
kubeClient,
elaClient,
testLogger,
},
kubeInformer,
elaInformer,
servingSystemInformer,
&rest.Config{},
k8sflag.Bool("enable-scale-to-zero", false),
testLogger,
).(*Controller)

return
Expand Down
13 changes: 3 additions & 10 deletions pkg/controller/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,12 @@ import (
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"

"github.com/knative/serving/pkg/apis/serving/v1alpha1"
clientset "github.com/knative/serving/pkg/client/clientset/versioned"
informers "github.com/knative/serving/pkg/client/informers/externalversions"
listers "github.com/knative/serving/pkg/client/listers/serving/v1alpha1"
"github.com/knative/serving/pkg/controller"
Expand Down Expand Up @@ -75,19 +72,15 @@ type Controller struct {
// NewController initializes the controller and is called by the generated code
// Registers eventhandlers to enqueue events
func NewController(
kubeClientSet kubernetes.Interface,
elaClientSet clientset.Interface,
kubeInformerFactory kubeinformers.SharedInformerFactory,
opt controller.Options,
elaInformerFactory informers.SharedInformerFactory,
config *rest.Config,
logger *zap.SugaredLogger) controller.Interface {
config *rest.Config) controller.Interface {

// obtain references to a shared index informer for the Services.
informer := elaInformerFactory.Serving().V1alpha1().Services()

controller := &Controller{
Base: controller.NewBase(kubeClientSet, elaClientSet,
informer.Informer(), controllerAgentName, "Services", logger),
Base: controller.NewBase(opt, informer.Informer(), controllerAgentName, "Services"),
lister: informer.Lister(),
synced: informer.Informer().HasSynced,
}
Expand Down