Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 27 additions & 21 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,20 @@ import (
"log"
"time"

"k8s.io/client-go/kubernetes"

"github.com/kelseyhightower/envconfig"
"go.opencensus.io/stats/view"
"go.uber.org/zap"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"

"knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/tracing"

"knative.dev/pkg/injection/sharedmain"
Expand All @@ -53,21 +54,10 @@ type envConfig struct {
}

func main() {
logConfig := channel.NewLoggingConfig()
logger := channel.NewProvisionerLoggerFromConfig(logConfig).Desugar()
flag.Parse()

defer flush(logger)

ctx := signals.NewContext()

logger.Info("Starting...")

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)
Expand All @@ -80,21 +70,37 @@ func main() {
log.Fatal("Error building kubeconfig", err)
}

kubeClient := kubernetes.NewForConfigOrDie(cfg)
ctx, _ = injection.Default.SetupInformers(ctx, cfg)
kubeClient := kubeclient.Get(ctx)

loggingConfig, err := sharedmain.GetLoggingConfig(ctx)
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
}
sl, _ := logging.NewLoggerFromConfig(loggingConfig, "broker_filter")
logger := sl.Desugar()
defer flush(sl)

logger.Info("Starting the Broker Filter")

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

eventingClient := eventingv1alpha1.NewForConfigOrDie(cfg)
eventingFactory := eventinginformers.NewSharedInformerFactoryWithOptions(eventingClient,
controller.GetResyncPeriod(ctx),
eventinginformers.WithNamespace(env.Namespace))
triggerInformer := eventingFactory.Eventing().V1alpha1().Triggers()

cmw := configmap.NewInformedWatcher(kubeClient, system.Namespace())
configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace())

bin := tracing.BrokerFilterName(tracing.BrokerFilterNameArgs{
Namespace: env.Namespace,
BrokerName: env.Broker,
})
if err = tracing.SetupDynamicPublishing(logger.Sugar(), cmw, bin); err != nil {
if err = tracing.SetupDynamicPublishing(sl, configMapWatcher, bin); err != nil {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

Expand All @@ -111,10 +117,10 @@ func main() {

// TODO change the component name to trigger once Stackdriver metrics are approved.
// Watch the observability config map and dynamically update metrics exporter.
cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar()))
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", sl))

// configMapWatcher does not block, so start it first.
if err = cmw.Start(ctx.Done()); err != nil {
if err = configMapWatcher.Start(ctx.Done()); err != nil {
logger.Warn("Failed to start ConfigMap watcher", zap.Error(err))
}

Expand All @@ -134,7 +140,7 @@ func main() {
logger.Info("Exiting...")
}

func flush(logger *zap.Logger) {
logger.Sync()
func flush(logger *zap.SugaredLogger) {
_ = logger.Sync()
metrics.FlushExporter()
}
43 changes: 24 additions & 19 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

"go.opencensus.io/stats/view"
"knative.dev/eventing/pkg/broker/ingress"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/tracing"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
Expand Down Expand Up @@ -69,16 +69,8 @@ type envConfig struct {
}

func main() {
logConfig := channel.NewLoggingConfig()
logger := channel.NewProvisionerLoggerFromConfig(logConfig).Desugar()
defer flush(logger)
flag.Parse()

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

ctx := signals.NewContext()

// Report stats on Go memory usage every 30 seconds.
Expand All @@ -88,38 +80,51 @@ func main() {
log.Fatalf("Error exporting go memstats view: %v", err)
}

cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatal("Error building kubeconfig", err)
}

log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))

logger.Info("Starting...")
ctx, informers := injection.Default.SetupInformers(ctx, cfg)

cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig)
loggingConfig, err := sharedmain.GetLoggingConfig(ctx)
if err != nil {
log.Fatal("Error building kubeconfig", err)
log.Fatal("Error loading/parsing logging configuration:", err)
}
sl, _ := logging.NewLoggerFromConfig(loggingConfig, "broker_ingress")
logger := sl.Desugar()
defer flush(sl)

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
logger.Info("Starting the Broker Ingress")

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatal("Failed to process env var", zap.Error(err))
}

channelURI := &url.URL{
Scheme: "http",
Host: env.Channel,
Path: "/",
}

cmw := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace())
configMapWatcher := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace())

// TODO watch logging config map.

// TODO change the component name to broker once Stackdriver metrics are approved.
// Watch the observability config map and dynamically update metrics exporter.
cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar()))
configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", sl))

bin := tracing.BrokerIngressName(tracing.BrokerIngressNameArgs{
Namespace: env.Namespace,
BrokerName: env.Broker,
})
if err = tracing.SetupDynamicPublishing(logger.Sugar(), cmw, bin); err != nil {
if err = tracing.SetupDynamicPublishing(sl, configMapWatcher, bin); err != nil {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

Expand Down Expand Up @@ -155,7 +160,7 @@ func main() {
}

// configMapWatcher does not block, so start it first.
if err = cmw.Start(ctx.Done()); err != nil {
if err = configMapWatcher.Start(ctx.Done()); err != nil {
logger.Warn("Failed to start ConfigMap watcher", zap.Error(err))
}

Expand All @@ -172,7 +177,7 @@ func main() {
logger.Info("Exiting...")
}

func flush(logger *zap.Logger) {
logger.Sync()
func flush(logger *zap.SugaredLogger) {
_ = logger.Sync()
metrics.FlushExporter()
}
63 changes: 0 additions & 63 deletions pkg/channel/logger.go

This file was deleted.

47 changes: 0 additions & 47 deletions pkg/channel/logger_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion pkg/defaultchannel/channel_defaulter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestChannelDefaulter_GetDefault(t *testing.T) {
}
channelTemplate := cd.GetDefault(tc.channel.Namespace)
if diff := cmp.Diff(tc.expectedChannelTemplate, channelTemplate); diff != "" {
t.Fatalf("Unexpected provisioner (-want, +got): %s", diff)
t.Fatalf("Unexpected channelTemplate (-want, +got): %s", diff)
}
})
}
Expand Down