Skip to content
Merged
16 changes: 16 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 51 additions & 1 deletion cmd/controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,22 @@ import (
flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1"
"github.com/knative/eventing/pkg/controller/feed"
"github.com/knative/eventing/pkg/controller/flow"
"github.com/knative/eventing/pkg/logconfig"

"github.com/knative/pkg/configmap"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/logging/logkey"

istiov1alpha3 "github.com/knative/serving/pkg/apis/istio/v1alpha3"
servingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
"go.uber.org/zap"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/rest"

"github.com/knative/eventing/pkg/system"
"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -44,8 +56,45 @@ type ProvideFunc func(manager.Manager) (controller.Controller, error)

func main() {
flag.Parse()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an issue w/ your PR, but we should consider renaming these binaries slightly; 'controller' runs the bus controllers and 'controller-manager' runs the flow, feed, etc controllers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll create an issue for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have some code that combines these into a single deployment, but haven't PR'd it: master...grantr:combine-controllers

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


// Read the logging config and setup a logger.
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
cfg, err := logging.NewConfigFromMap(cm, logconfig.ControllerManager)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(cfg, logconfig.ControllerManager)
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.ControllerManager))

logger.Info("Starting the Controller Manager")

// This tells controller-runtime to use zap to log internal messages.
logf.SetLogger(logf.ZapLogger(false))

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
logger.Fatal("Failed to get in cluster config", err)
}

kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
logger.Fatal("Failed to get the client set", err)
}

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace)
configMapWatcher.Watch(logconfig.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.ControllerManager, logconfig.ControllerManager))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to be able to encapsulate the 35 new lines in pkg/logging, but I think that's a follow-on PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Had the same thought as I was adding this.

if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("failed to start controller manager configmap watcher: %v", err)
}

// Setup a Manager
mrg, err := manager.New(config.GetConfigOrDie(), manager.Options{})
if err != nil {
Expand All @@ -72,11 +121,12 @@ func main() {
flow.ProvideController,
}

// TODO(n3wscott): Send the logger to the controllers.
for _, provider := range providers {
if _, err := provider(mrg); err != nil {
log.Fatal(err)
}
}

log.Fatal(mrg.Start(signals.SetupSignalHandler()))
log.Fatal(mrg.Start(stopCh))
}
49 changes: 38 additions & 11 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"net/http"
"time"

"github.com/golang/glog"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
Expand All @@ -41,7 +40,14 @@ import (
"github.com/knative/eventing/pkg/controller/clusterbus"
"github.com/knative/eventing/pkg/signals"

"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/system"
"github.com/knative/pkg/configmap"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/logging/logkey"
"github.com/prometheus/client_golang/prometheus/promhttp"
"go.uber.org/zap"
"log"
)

const (
Expand All @@ -58,27 +64,42 @@ var (
func main() {
flag.Parse()

// Read the logging config and setup a logger.
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
config, err := logging.NewConfigFromMap(cm, logconfig.Controller)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.Controller)
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.Controller))

logger.Info("Starting the Controller")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
logger.Fatalf("Error building kubeconfig: %s", err.Error())
}

kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
logger.Fatalf("Error building kubernetes clientset: %s", err.Error())
}

client, err := clientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building clientset: %s", err.Error())
logger.Fatalf("Error building clientset: %s", err.Error())
}

servingClient, err := servingclientset.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building serving clientset: %s", err.Error())
logger.Fatalf("Error building serving clientset: %s", err.Error())
}

// TODO: Rip this out from all the controllers since we can get it
Expand All @@ -87,20 +108,28 @@ func main() {
// Kubernetes. Clients will use the Pod's ServiceAccount principal.
restConfig, err := rest.InClusterConfig()
if err != nil {
glog.Fatalf("Error building rest config: %v", err.Error())
logger.Fatalf("Error building rest config: %v", err.Error())
}

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
informerFactory := informers.NewSharedInformerFactory(client, time.Second*30)
servingInformerFactory := servinginformers.NewSharedInformerFactory(servingClient, time.Second*30)

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace)
configMapWatcher.Watch(logconfig.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Controller, logconfig.Controller))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is very cool and will be extremely useful :)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm glad you like it :)

if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("failed to start controller config map watcher: %v", err)
}

// Add new controllers here.
ctors := []controller.Constructor{
bus.NewController,
clusterbus.NewController,
channel.NewController,
}

// TODO(n3wscott): Send the logger to the controllers.
// Build all of our controllers, with the clients constructed above.
controllers := make([]controller.Interface, 0, len(ctors))
for _, ctor := range ctors {
Expand All @@ -118,7 +147,7 @@ func main() {
// We don't expect this to return until stop is called,
// but if it does, propagate it back.
if err := ctrlr.Run(threadsPerController, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
logger.Fatalf("Error running controller: %s", err.Error())
}
}(ctrlr)
}
Expand All @@ -127,9 +156,9 @@ func main() {
srv := &http.Server{Addr: metricsScrapeAddr}
http.Handle(metricsScrapePath, promhttp.Handler())
go func() {
glog.Info("Starting metrics listener at %s", metricsScrapeAddr)
logger.Info("Starting metrics listener at %s", metricsScrapeAddr)
if err := srv.ListenAndServe(); err != nil {
glog.Infof("Httpserver: ListenAndServe() finished with error: %s", err)
logger.Infof("Httpserver: ListenAndServe() finished with error: %s", err)
}
}()

Expand All @@ -139,8 +168,6 @@ func main() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
srv.Shutdown(ctx)

glog.Flush()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, I think you caught a bug here. Nice!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💃

}

func init() {
Expand Down
44 changes: 35 additions & 9 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,71 @@ package main

import (
"flag"
"log"

"github.com/golang/glog"
"github.com/knative/eventing/pkg/system"
"github.com/knative/eventing/pkg/logconfig"
"github.com/knative/eventing/pkg/signals"
"github.com/knative/eventing/pkg/system"
"github.com/knative/eventing/pkg/webhook"

"github.com/knative/pkg/configmap"
"github.com/knative/pkg/logging"
"github.com/knative/pkg/logging/logkey"

"go.uber.org/zap"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)

func main() {
flag.Parse()
defer glog.Flush()

glog.Info("Starting the Configuration Webhook")
// Read the logging config and setup a logger.
cm, err := configmap.Load("/etc/config-logging")
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
}
config, err := logging.NewConfigFromMap(cm, logconfig.Webhook)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.Webhook)
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.Webhook))

logger.Info("Starting the Eventing Webhook")

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
glog.Fatal("Failed to get in cluster config", err)
logger.Fatal("Failed to get in cluster config", err)
}

clientset, err := kubernetes.NewForConfig(clusterConfig)
kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
glog.Fatal("Failed to get the client set", err)
logger.Fatal("Failed to get the client set", err)
}

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewDefaultWatcher(kubeClient, system.Namespace)
configMapWatcher.Watch(logconfig.ConfigName, logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.Webhook, logconfig.Webhook))
if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("failed to start webhook configmap watcher: %v", err)
}

// TODO(n3wscott): Send the logger to the controller.
options := webhook.ControllerOptions{
ServiceName: "eventing-webhook",
ServiceNamespace: system.Namespace,
Port: 443,
SecretName: "eventing-webhook-certs",
WebhookName: "webhook.eventing.knative.dev",
}
controller, err := webhook.NewAdmissionController(clientset, options)
controller, err := webhook.NewAdmissionController(kubeClient, options)
if err != nil {
glog.Fatal("Failed to create the admission controller", err)
logger.Fatal("Failed to create the admission controller", err)
}
controller.Run(stopCh)
}
7 changes: 7 additions & 0 deletions config/500-controller-manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ spec:
"-logtostderr",
"-stderrthreshold", "INFO",
]
volumeMounts:
- name: config-logging
mountPath: /etc/config-logging
volumes:
- name: config-logging
configMap:
name: config-logging
7 changes: 7 additions & 0 deletions config/500-controller.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,10 @@ spec:
"-logtostderr",
"-stderrthreshold", "INFO",
]
volumeMounts:
- name: config-logging
mountPath: /etc/config-logging
volumes:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we're both mounting a ConfigMap, and using the k8s watcher on the ConfigMap object (because you're feeding in a k8s client). We should just be able to use inotify on the file:

https://kubernetes.io/docs/tasks/configure-pod-container/configure-pod-configmap/#mounted-configmaps-are-updated-automatically

https://github.com/fsnotify/fsnotify looks promising.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The autoscaling group found that inotify on configmaps is very slow, normally taking 30+ seconds to propagate updates to volumes. I think we can probably drop the volume mount now that we have the configmap watch.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do tell more !

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I vote make this an issue and do a followup pass. Having loggers is more valuable than optimizing i think

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this was intended to be a followup, not a blocker.

- name: config-logging
configMap:
name: config-logging
9 changes: 8 additions & 1 deletion config/500-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,11 @@ spec:
- name: eventing-webhook
# This is the Go import path for the binary that is containerized
# and substituted here.
image: github.com/knative/eventing/cmd/webhook
image: github.com/knative/eventing/cmd/webhook
volumeMounts:
- name: config-logging
mountPath: /etc/config-logging
volumes:
- name: config-logging
configMap:
name: config-logging
Loading