Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

153 changes: 126 additions & 27 deletions injection/sharedmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,22 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/tools/record"

"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
kle "knative.dev/pkg/leaderelection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
Expand Down Expand Up @@ -94,6 +101,20 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) {
return logging.NewConfigFromConfigMap(loggingConfigMap)
}

// GetLeaderElectionConfig gets the leader election config.
func GetLeaderElectionConfig(ctx context.Context) (*kle.Config, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method need to be public?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in controllers that do not use sharedmain, as are other methods from this file that are exported.

leaderElectionConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(kle.ConfigMapName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return kle.NewConfigFromMap(nil)
}

return nil, err
}

return kle.NewConfigFromConfigMap(leaderElectionConfigMap)
}

// Main runs the generic main flow for non-webhook controllers with a new
// context. Use WebhookMainWith* if you need to serve webhooks.
func Main(component string, ctors ...injection.ControllerConstructor) {
Expand Down Expand Up @@ -127,36 +148,53 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)

CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
go func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why this is a goroutine, rather than a defer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method won't exit until the server is done or crashes. Is there a downside to using a goroutine here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was just a bit unusual, but it looks like the only leader election option is RunOrDie, so this is fine.

// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()

profilingServer.Shutdown(context.Background())
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}()
CheckK8sClientMinimumVersionOrDie(ctx, logger)

// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()
run := func(ctx context.Context) {
cmw := SetupConfigMapWatchOrDie(ctx, logger)
controllers, _ := ControllersAndWebhooksFromCtors(ctx, cmw, ctors...)
WatchLoggingConfigOrDie(ctx, cmw, logger, atomicLevel, component)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still wondering why we only want to configure logging and monitoring after being elected leader, rather than applying the logging and monitoring configs continuously.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was following the pattern in the k8s controller manager, which is to basically do absolutely nothing until you're the leader. I don't see an advantage to watching these configs until you're the leader, but if you want me to change it, I will. Do you want me to change it?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea would be to have logs and metrics potentially exporting to a remote destination even for non-leader controllers (so you could count the number of standby working controllers, for example).

But this can be done later, so I'll approve as-is.

WatchObservabilityConfigOrDie(ctx, cmw, profilingHandler, logger, component)

profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
logger.Info("Starting configuration manager...")
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start configuration manager", zap.Error(err))
}
logger.Info("Starting informers...")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

<-ctx.Done()
}

// Set up leader election config
leaderElectionConfig, err := GetLeaderElectionConfig(ctx)
if err != nil {
logger.Fatalf("Error loading leader election configuration: %v", err)
}
leConfig := leaderElectionConfig.GetComponentConfig(component)

if !leConfig.LeaderElect {
logger.Infof("%v will not run in leader-elected mode", component)
run(ctx)
} else {
RunLeaderElected(ctx, logger, run, component, leConfig)
}
}

Expand Down Expand Up @@ -186,6 +224,7 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
profilingHandler := profiling.NewHandler(logger, false)
profilingServer := profiling.NewServer(profilingHandler)

CheckK8sClientMinimumVersionOrDie(ctx, logger)
cmw := SetupConfigMapWatchOrDie(ctx, logger)
Expand All @@ -204,8 +243,6 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)

profilingServer := profiling.NewServer(profilingHandler)

eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)

Expand Down Expand Up @@ -369,3 +406,65 @@ func ControllersAndWebhooksFromCtors(ctx context.Context,

return controllers, webhooks
}

// RunLeaderElected runs the given function in leader elected mode. The function
// will be run only once the leader election lock is obtained.
func RunLeaderElected(ctx context.Context, logger *zap.SugaredLogger, run func(context.Context), component string, leConfig kle.ComponentConfig) {
recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
watches := []watch.Interface{
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
eventBroadcaster.StartRecordingToSink(
&typedcorev1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}),
}
recorder = eventBroadcaster.NewRecorder(
scheme.Scheme, corev1.EventSource{Component: component})
go func() {
<-ctx.Done()
for _, w := range watches {
w.Stop()
}
}()
}

// Create a unique identifier so that two controllers on the same host don't
// race.
id, err := kle.UniqueID()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not for this PR

It seems like we might want to include a metric here indicating the leader-election status.

Can you drop a // TODO: add monitoring for leader election status to this bit of the code (and maybe file a good-first-issue for it)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure

if err != nil {
logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err))
}
logger.Infof("%v will run in leader-elected mode with id %v", component, id)

// rl is the resource used to hold the leader election lock.
rl, err := resourcelock.New(leConfig.ResourceLock,
system.Namespace(), // use namespace we are running in
component, // component is used as the resource name
kubeclient.Get(ctx).CoreV1(),
kubeclient.Get(ctx).CoordinationV1(),
resourcelock.ResourceLockConfig{
Identity: id,
EventRecorder: recorder,
})
if err != nil {
logger.Fatalw("Error creating lock: %v", err)
}

// Execute the `run` function when we have the lock.
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: leConfig.LeaseDuration,
RenewDeadline: leConfig.RenewDeadline,
RetryPeriod: leConfig.RetryPeriod,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStoppedLeading: func() {
logger.Fatal("leaderelection lost")
},
},
// TODO: use health check watchdog, knative/pkg#1048
Name: component,
})
}
Loading