diff --git a/Gopkg.lock b/Gopkg.lock index 6f3a66eaafb..2c7802032cd 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1382,7 +1382,6 @@ "go.opencensus.io/trace", "go.uber.org/atomic", "go.uber.org/zap", - "go.uber.org/zap/zapcore", "google.golang.org/grpc", "google.golang.org/grpc/codes", "google.golang.org/grpc/status", diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 96f0aeeb108..bf398788a50 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -21,19 +21,20 @@ import ( "log" "time" - "k8s.io/client-go/kubernetes" - "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" + + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/injection" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" "knative.dev/eventing/pkg/broker/filter" - "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/injection/sharedmain" @@ -53,21 +54,10 @@ type envConfig struct { } func main() { - logConfig := channel.NewLoggingConfig() - logger := channel.NewProvisionerLoggerFromConfig(logConfig).Desugar() flag.Parse() - defer flush(logger) - ctx := signals.NewContext() - logger.Info("Starting...") - - var env envConfig - if err := envconfig.Process("", &env); err != nil { - logger.Fatal("Failed to process env var", zap.Error(err)) - } - // Report stats on Go memory usage every 30 seconds. msp := metrics.NewMemStatsAll() msp.Start(ctx, 30*time.Second) @@ -80,7 +70,23 @@ func main() { log.Fatal("Error building kubeconfig", err) } - kubeClient := kubernetes.NewForConfigOrDie(cfg) + ctx, _ = injection.Default.SetupInformers(ctx, cfg) + kubeClient := kubeclient.Get(ctx) + + loggingConfig, err := sharedmain.GetLoggingConfig(ctx) + if err != nil { + log.Fatal("Error loading/parsing logging configuration:", err) + } + sl, _ := logging.NewLoggerFromConfig(loggingConfig, "broker_filter") + logger := sl.Desugar() + defer flush(sl) + + logger.Info("Starting the Broker Filter") + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + logger.Fatal("Failed to process env var", zap.Error(err)) + } eventingClient := eventingv1alpha1.NewForConfigOrDie(cfg) eventingFactory := eventinginformers.NewSharedInformerFactoryWithOptions(eventingClient, @@ -88,13 +94,13 @@ func main() { eventinginformers.WithNamespace(env.Namespace)) triggerInformer := eventingFactory.Eventing().V1alpha1().Triggers() - cmw := configmap.NewInformedWatcher(kubeClient, system.Namespace()) + configMapWatcher := configmap.NewInformedWatcher(kubeClient, system.Namespace()) bin := tracing.BrokerFilterName(tracing.BrokerFilterNameArgs{ Namespace: env.Namespace, BrokerName: env.Broker, }) - if err = tracing.SetupDynamicPublishing(logger.Sugar(), cmw, bin); err != nil { + if err = tracing.SetupDynamicPublishing(sl, configMapWatcher, bin); err != nil { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } @@ -111,10 +117,10 @@ func main() { // TODO change the component name to trigger once Stackdriver metrics are approved. // Watch the observability config map and dynamically update metrics exporter. - cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", logger.Sugar())) + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_filter", sl)) // configMapWatcher does not block, so start it first. - if err = cmw.Start(ctx.Done()); err != nil { + if err = configMapWatcher.Start(ctx.Done()); err != nil { logger.Warn("Failed to start ConfigMap watcher", zap.Error(err)) } @@ -134,7 +140,7 @@ func main() { logger.Info("Exiting...") } -func flush(logger *zap.Logger) { - logger.Sync() +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() metrics.FlushExporter() } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index ebddd3ecc8b..4db7f73163b 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -32,7 +32,6 @@ import ( "go.opencensus.io/stats/view" "knative.dev/eventing/pkg/broker/ingress" - "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" kubeclient "knative.dev/pkg/client/injection/kube/client" @@ -40,6 +39,7 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" "knative.dev/pkg/system" @@ -69,16 +69,8 @@ type envConfig struct { } func main() { - logConfig := channel.NewLoggingConfig() - logger := channel.NewProvisionerLoggerFromConfig(logConfig).Desugar() - defer flush(logger) flag.Parse() - var env envConfig - if err := envconfig.Process("", &env); err != nil { - logger.Fatal("Failed to process env var", zap.Error(err)) - } - ctx := signals.NewContext() // Report stats on Go memory usage every 30 seconds. @@ -88,18 +80,31 @@ func main() { log.Fatalf("Error exporting go memstats view: %v", err) } + cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig) + if err != nil { + log.Fatal("Error building kubeconfig", err) + } + log.Printf("Registering %d clients", len(injection.Default.GetClients())) log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories())) log.Printf("Registering %d informers", len(injection.Default.GetInformers())) - logger.Info("Starting...") + ctx, informers := injection.Default.SetupInformers(ctx, cfg) - cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig) + loggingConfig, err := sharedmain.GetLoggingConfig(ctx) if err != nil { - log.Fatal("Error building kubeconfig", err) + log.Fatal("Error loading/parsing logging configuration:", err) } + sl, _ := logging.NewLoggerFromConfig(loggingConfig, "broker_ingress") + logger := sl.Desugar() + defer flush(sl) - ctx, informers := injection.Default.SetupInformers(ctx, cfg) + logger.Info("Starting the Broker Ingress") + + var env envConfig + if err := envconfig.Process("", &env); err != nil { + logger.Fatal("Failed to process env var", zap.Error(err)) + } channelURI := &url.URL{ Scheme: "http", @@ -107,19 +112,19 @@ func main() { Path: "/", } - cmw := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace()) + configMapWatcher := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace()) // TODO watch logging config map. // TODO change the component name to broker once Stackdriver metrics are approved. // Watch the observability config map and dynamically update metrics exporter. - cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", logger.Sugar())) + configMapWatcher.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap("broker_ingress", sl)) bin := tracing.BrokerIngressName(tracing.BrokerIngressNameArgs{ Namespace: env.Namespace, BrokerName: env.Broker, }) - if err = tracing.SetupDynamicPublishing(logger.Sugar(), cmw, bin); err != nil { + if err = tracing.SetupDynamicPublishing(sl, configMapWatcher, bin); err != nil { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } @@ -155,7 +160,7 @@ func main() { } // configMapWatcher does not block, so start it first. - if err = cmw.Start(ctx.Done()); err != nil { + if err = configMapWatcher.Start(ctx.Done()); err != nil { logger.Warn("Failed to start ConfigMap watcher", zap.Error(err)) } @@ -172,7 +177,7 @@ func main() { logger.Info("Exiting...") } -func flush(logger *zap.Logger) { - logger.Sync() +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() metrics.FlushExporter() } diff --git a/pkg/channel/logger.go b/pkg/channel/logger.go deleted file mode 100644 index 0d45b487dd1..00000000000 --- a/pkg/channel/logger.go +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright 2018 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package channel - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "knative.dev/pkg/logging" -) - -const ( - provisionerLoggingComponent = "provisioner" -) - -// NewLoggingConfig creates a static logging configuration appropriate for a -// provisioner. All logging levels are set to Info. -func NewLoggingConfig() *logging.Config { - lc := &logging.Config{} - lc.LoggingConfig = `{ - "level": "info", - "development": false, - "outputPaths": ["stdout"], - "errorOutputPaths": ["stderr"], - "encoding": "json", - "encoderConfig": { - "timeKey": "ts", - "levelKey": "level", - "nameKey": "logger", - "callerKey": "caller", - "messageKey": "msg", - "stacktraceKey": "stacktrace", - "lineEnding": "", - "levelEncoder": "", - "timeEncoder": "iso8601", - "durationEncoder": "", - "callerEncoder": "" - } - }` - lc.LoggingLevel = make(map[string]zapcore.Level) - lc.LoggingLevel[provisionerLoggingComponent] = zapcore.InfoLevel - return lc -} - -// NewProvisionerLoggerFromConfig creates a new zap logger for the provisioner component based -// on the provided configuration -func NewProvisionerLoggerFromConfig(config *logging.Config) *zap.SugaredLogger { - logger, _ := logging.NewLoggerFromConfig(config, provisionerLoggingComponent) - return logger -} diff --git a/pkg/channel/logger_test.go b/pkg/channel/logger_test.go deleted file mode 100644 index 95ae7153320..00000000000 --- a/pkg/channel/logger_test.go +++ /dev/null @@ -1,47 +0,0 @@ -/* -Copyright 2018 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package channel - -import ( - "testing" - - "github.com/google/go-cmp/cmp" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - _ "knative.dev/pkg/system/testing" -) - -func TestNewLoggingConfig(t *testing.T) { - config := NewLoggingConfig() - expected := map[string]zapcore.Level{ - "provisioner": zap.InfoLevel, - } - actual := config.LoggingLevel - if cmp.Diff(expected, actual) != "" { - t.Errorf("%s expected: %+v got: %+v", "Logging level", expected, actual) - } -} - -func TestNewBusLoggerFromConfig(t *testing.T) { - config := NewLoggingConfig() - logger := NewProvisionerLoggerFromConfig(config) - expected := true - actual := logger.Desugar().Core().Enabled(zap.InfoLevel) - if expected != actual { - t.Errorf("%s expected: %+v got: %+v", "Logging level", expected, actual) - } -} diff --git a/pkg/defaultchannel/channel_defaulter_test.go b/pkg/defaultchannel/channel_defaulter_test.go index a5d861f7e21..55cbc03a4df 100644 --- a/pkg/defaultchannel/channel_defaulter_test.go +++ b/pkg/defaultchannel/channel_defaulter_test.go @@ -99,7 +99,7 @@ func TestChannelDefaulter_GetDefault(t *testing.T) { } channelTemplate := cd.GetDefault(tc.channel.Namespace) if diff := cmp.Diff(tc.expectedChannelTemplate, channelTemplate); diff != "" { - t.Fatalf("Unexpected provisioner (-want, +got): %s", diff) + t.Fatalf("Unexpected channelTemplate (-want, +got): %s", diff) } }) }