Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 72 additions & 52 deletions cmd/webhook/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,35 +16,42 @@ limitations under the License.
package main

import (
"context"
"flag"
"log"
"strconv"
"time"

"knative.dev/eventing/pkg/defaultchannel"

"go.uber.org/zap"

"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1"
"knative.dev/eventing/pkg/defaultchannel"
"knative.dev/eventing/pkg/logconfig"
"knative.dev/pkg/configmap"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/clients/kubeclient"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/logging/logkey"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
"knative.dev/pkg/version"
"knative.dev/pkg/webhook"
)

type envConfig struct {
RegistrationDelayTime string `envconfig:"REG_DELAY_TIME" required:"false"`
}

var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)

func getRegistrationDelayTime(rdt string) time.Duration {
var RegistrationDelay time.Duration

Expand All @@ -62,55 +69,59 @@ func getRegistrationDelayTime(rdt string) time.Duration {

func main() {
flag.Parse()
// Read the logging config and setup a logger.
cm, err := configmap.Load("/etc/config-logging")

// Set up signals so we handle the first shutdown signal gracefully.
ctx := signals.NewContext()

cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatalf("Error loading logging configuration: %v", err)
log.Fatal("Failed to get cluster config:", err)
}
config, err := logging.NewConfigFromMap(cm)

log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

ctx, _ = injection.Default.SetupInformers(ctx, cfg)
kubeClient := kubeclient.Get(ctx)

config, err := sharedmain.GetLoggingConfig(ctx)
if err != nil {
log.Fatalf("Error parsing logging configuration: %v", err)
log.Fatal("Error loading/parsing logging configuration:", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(config, logconfig.WebhookName())
defer logger.Sync()
logger = logger.With(zap.String(logkey.ControllerType, logconfig.WebhookName()))

logger.Infow("Starting the Eventing Webhook")

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}
RegistrationDelay := getRegistrationDelayTime(env.RegistrationDelayTime)

// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()

clusterConfig, err := rest.InClusterConfig()
if err != nil {
logger.Fatalw("Failed to get in cluster config", zap.Error(err))
if err := version.CheckMinimumVersion(kubeClient.Discovery()); err != nil {
logger.Fatalw("Version check failed", err)
}

kubeClient, err := kubernetes.NewForConfig(clusterConfig)
if err != nil {
logger.Fatalw("Failed to get the client set", zap.Error(err))
}
logger.Infow("Starting the Eventing Webhook")

// Watch the logging config map and dynamically update logging levels.
configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace())

configMapWatcher.Watch(logconfig.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.WebhookName()))
// Watch the observability config map and dynamically update metrics exporter.
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap(logconfig.WebhookName(), logger))
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, logconfig.WebhookName()))

// Watch the default-ch-webhook ConfigMap and dynamically update the default
// Channel CRD.
chDefaulter := defaultchannel.New(logger.Desugar())
eventingduckv1alpha1.ChannelDefaulterSingleton = chDefaulter
configMapWatcher.Watch(defaultchannel.ConfigMapName, chDefaulter.UpdateConfigMap)

if err = configMapWatcher.Start(stopCh); err != nil {
logger.Fatalf("failed to start webhook configmap watcher: %v", zap.Error(err))
if err = configMapWatcher.Start(ctx.Done()); err != nil {
logger.Fatalw("Failed to start the ConfigMap watcher", zap.Error(err))
}

var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", zap.Error(err))
}
registrationDelay := getRegistrationDelayTime(env.RegistrationDelayTime)

stats, err := webhook.NewStatsReporter()
if err != nil {
logger.Fatalw("failed to initialize the stats reporter", zap.Error(err))
Expand All @@ -124,29 +135,38 @@ func main() {
SecretName: "eventing-webhook-certs",
WebhookName: "webhook.eventing.knative.dev",
StatsReporter: stats,
RegistrationDelay: RegistrationDelay * time.Second,
RegistrationDelay: registrationDelay * time.Second,
}
controller := webhook.AdmissionController{
Client: kubeClient,
Options: options,
Handlers: map[schema.GroupVersionKind]webhook.GenericCRD{
// For group eventing.knative.dev,
eventingv1alpha1.SchemeGroupVersion.WithKind("Broker"): &eventingv1alpha1.Broker{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &eventingv1alpha1.Subscription{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{},
eventingv1alpha1.SchemeGroupVersion.WithKind("EventType"): &eventingv1alpha1.EventType{},
// For group messaging.knative.dev.
messagingv1alpha1.SchemeGroupVersion.WithKind("InMemoryChannel"): &messagingv1alpha1.InMemoryChannel{},
messagingv1alpha1.SchemeGroupVersion.WithKind("Sequence"): &messagingv1alpha1.Sequence{},
messagingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &messagingv1alpha1.Channel{},
},
Logger: logger,

handlers := map[schema.GroupVersionKind]webhook.GenericCRD{
// For group eventing.knative.dev,
eventingv1alpha1.SchemeGroupVersion.WithKind("Broker"): &eventingv1alpha1.Broker{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Subscription"): &eventingv1alpha1.Subscription{},
eventingv1alpha1.SchemeGroupVersion.WithKind("Trigger"): &eventingv1alpha1.Trigger{},
eventingv1alpha1.SchemeGroupVersion.WithKind("EventType"): &eventingv1alpha1.EventType{},
// For group messaging.knative.dev.
messagingv1alpha1.SchemeGroupVersion.WithKind("InMemoryChannel"): &messagingv1alpha1.InMemoryChannel{},
messagingv1alpha1.SchemeGroupVersion.WithKind("Sequence"): &messagingv1alpha1.Sequence{},
messagingv1alpha1.SchemeGroupVersion.WithKind("Choice"): &messagingv1alpha1.Choice{},
messagingv1alpha1.SchemeGroupVersion.WithKind("Channel"): &messagingv1alpha1.Channel{},
Comment thread
n3wscott marked this conversation as resolved.
}

// Decorate contexts with the current state of the config.
ctxFunc := func(ctx context.Context) context.Context {
// TODO: implement upgrades when eventing needs it:
// return v1beta1.WithUpgradeViaDefaulting(store.ToContext(ctx))
return ctx
}

controller, err := webhook.NewAdmissionController(kubeClient, options, handlers, logger, ctxFunc, true)

if err != nil {
logger.Fatalw("Failed to create the admission controller", zap.Error(err))
logger.Fatalw("Failed to create admission controller", zap.Error(err))
}
if err = controller.Run(stopCh); err != nil {
logger.Errorw("controller.Run() failed", zap.Error(err))

if err = controller.Run(ctx.Done()); err != nil {
logger.Fatalw("Failed to start the admission controller", zap.Error(err))
}

logger.Infow("Webhook stopping")
}
2 changes: 2 additions & 0 deletions config/500-webhook.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ spec:
fieldPath: metadata.namespace
- name: CONFIG_LOGGING_NAME
value: config-logging
- name: METRICS_DOMAIN
value: knative.dev/eventing
- name: WEBHOOK_NAME
value: eventing-webhook
- name: REG_DELAY_TIME
Expand Down
33 changes: 33 additions & 0 deletions third_party/VENDOR-LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -5713,6 +5713,39 @@ CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.



===========================================================
Import: knative.dev/eventing/vendor/github.com/rogpeppe/go-internal

Copyright (c) 2018 The Go Authors. All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are
met:

* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following disclaimer
in the documentation and/or other materials provided with the
distribution.
* Neither the name of Google Inc. nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.



===========================================================
Import: knative.dev/eventing/vendor/github.com/spf13/pflag

Expand Down
27 changes: 27 additions & 0 deletions vendor/github.com/rogpeppe/go-internal/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading