diff --git a/Gopkg.lock b/Gopkg.lock index b7a41ecb942..49487686875 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,7 @@ "metrics", "metrics/metricskey", "metrics/metricstest", + "metrics/testing", "profiling", "reconciler/testing", "signals", @@ -1388,6 +1389,7 @@ "knative.dev/pkg/metrics", "knative.dev/pkg/metrics/metricskey", "knative.dev/pkg/metrics/metricstest", + "knative.dev/pkg/metrics/testing", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/system", diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 04b083391c7..9f931a15202 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -18,14 +18,14 @@ package main import ( "flag" - "log" + "fmt" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // Uncomment the following line to load the gcp plugin + // (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -33,21 +33,39 @@ import ( "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" + "knative.dev/eventing/pkg/utils" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" ) +const ( + component = "apiserversource" +) + var ( masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") ) type envConfig struct { - Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` - Mode string `envconfig:"MODE"` - SinkURI string `split_words:"true" required:"true"` - ApiVersion []string `split_words:"true" required:"true"` - Kind []string `required:"true"` - Controller []bool `required:"true"` + Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` + Mode string `envconfig:"MODE"` + SinkURI string `split_words:"true" required:"true"` + ApiVersion []string `split_words:"true" required:"true"` + Kind []string `required:"true"` + Controller []bool `required:"true"` + ApiServerImporter string `envconfig:"APISERVERIMPORTER" required:"true"` + // MetricsConfigBase64 is a base64 encoded json string of + // metrics.ExporterOptions. This is used to configure the metrics exporter + // options, the config is stored in a config map inside the controllers + // namespace and copied here. + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` } // TODO: the controller should take the list of GVR @@ -55,18 +73,41 @@ type envConfig struct { func main() { flag.Parse() - logCfg := zap.NewProductionConfig() - logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - dlogger, err := logCfg.Build() + var env envConfig + err := envconfig.Process("", &env) if err != nil { - log.Fatalf("Error building logger: %v", err) + panic(fmt.Sprintf("Error processing env var: %s", err)) } - logger := dlogger.Sugar() - var env envConfig - err = envconfig.Process("", &env) + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig, err := utils.Base64ToLoggingConfig( + env.LoggingConfigBase64) + if err != nil { + fmt.Printf("[ERROR] filed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } + } + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) + + // Convert base64 encoded json metrics.ExporterOptions to + // metrics.ExporterOptions. + metricsConfig, err := utils.Base64ToMetricsOptions( + env.MetricsConfigBase64) if err != nil { - logger.Fatalw("Error processing environment", zap.Error(err)) + logger.Errorf("failed to process metrics options: %s", err.Error()) + } + + if err := metrics.UpdateExporter(*metricsConfig, logger); err != nil { + logger.Fatalf("Failed to create the metrics exporter: %v", err) + } + + reporter, err := apiserver.NewStatsReporter() + if err != nil { + logger.Fatalw("Error building statsreporter", zap.Error(err)) } // set up signals so we handle the first shutdown signal gracefully @@ -77,17 +118,16 @@ func main() { logger.Fatalw("Error building kubeconfig", zap.Error(err)) } - logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - client, err := dynamic.NewForConfig(cfg) if err != nil { logger.Fatalw("Error building dynamic client", zap.Error(err)) } - if err = tracing.SetupStaticPublishing(logger, "apiserversource", tracing.OnePercentSampling); err != nil { - // If tracing doesn't work, we will log an error, but allow the importer to continue to - // start. + if err = tracing.SetupStaticPublishing(logger, "apiserversource", + tracing.OnePercentSampling); err != nil { + // If tracing doesn't work, we will log an error, but allow the importer + // to continue to start. logger.Error("Error setting up trace publishing", zap.Error(err)) } @@ -107,7 +147,10 @@ func main() { logger.Fatalw("Error parsing APIVersion", zap.Error(err)) } // TODO: pass down the resource and the kind so we do not have to guess. - gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) + gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{ + Kind: kind, + Group: gv.Group, + Version: gv.Version}) gvrcs = append(gvrcs, apiserver.GVRC{ GVR: gvr, Controller: controlled, @@ -120,9 +163,15 @@ func main() { GVRCs: gvrcs, } - a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt) - logger.Info("starting kubernetes api adapter") + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt, + reporter, env.ApiServerImporter) + logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) } } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index 02641e717f9..b75ff61b8aa 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -109,3 +109,21 @@ - source_labels: [__meta_kubernetes_service_name] target_label: service +# apiserver-source +- job_name: apiserver-source + scrape_interval: 3s + scrape_timeout: 3s + kubernetes_sd_configs: + - role: pod + relabel_configs: + # Scrape only the the targets matching the following metadata + - source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name] + action: keep + regex: apiserver-source-controller;metrics + # Rename metadata labels to be reader friendly + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_service_name] + target_label: service diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index d91985a09a7..1206d4cc2ae 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -64,11 +64,15 @@ type adapter struct { namespace string logger *zap.SugaredLogger - mode string - delegate eventDelegate + mode string + delegate eventDelegate + reporter StatsReporter + apiServerImporter string } -func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter { +func NewAdaptor(source string, k8sClient dynamic.Interface, + ceClient cloudevents.Client, logger *zap.SugaredLogger, + opt Options, reporter StatsReporter, apiServerImporter string) Adapter { mode := opt.Mode switch mode { case ResourceMode, RefMode: @@ -80,13 +84,15 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents } a := &adapter{ - k8s: k8sClient, - ce: ceClient, - source: source, - logger: logger, - gvrcs: opt.GVRCs, - namespace: opt.Namespace, - mode: mode, + k8s: k8sClient, + ce: ceClient, + source: source, + logger: logger, + gvrcs: opt.GVRCs, + namespace: opt.Namespace, + mode: mode, + reporter: reporter, + apiServerImporter: apiServerImporter, } return a } @@ -101,21 +107,26 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { stop := make(chan struct{}) resyncPeriod := time.Duration(10 * time.Hour) - var d eventDelegate switch a.mode { case ResourceMode: d = &resource{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + apiServerImporter: a.apiServerImporter, } case RefMode: d = &ref{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + apiServerImporter: a.apiServerImporter, } default: diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index df3f4bfa27b..f4bc1404eec 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -108,8 +108,11 @@ func TestNewAdaptor(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - - a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt) + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r, "test-importer") got, ok := a.(*adapter) if !ok { @@ -147,10 +150,14 @@ func TestAdapter_StartRef(t *testing.T) { }, }}, } + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } - a := NewAdaptor(source, k8s, ce, logger, opt) + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") - err := errors.New("test never ran") + err = errors.New("test never ran") stopCh := make(chan struct{}) done := make(chan struct{}) go func() { @@ -182,10 +189,13 @@ func TestAdapter_StartResource(t *testing.T) { }, }}, } + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") - a := NewAdaptor(source, k8s, ce, logger, opt) - - err := errors.New("test never ran") + err = errors.New("test never ran") stopCh := make(chan struct{}) done := make(chan struct{}) go func() { @@ -264,26 +274,34 @@ func validateNotSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want s } } -func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClient) { +func makeResourceAndTestingClient(t *testing.T) (*resource, *kncetesting.TestCloudEventsClient) { ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } return &resource{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } -func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { +func makeRefAndTestingClient(t *testing.T) (*ref, *kncetesting.TestCloudEventsClient) { ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } return &ref{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index d5f5ff8b5ad..5812423bf3c 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -34,7 +34,8 @@ func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { } object := obj.(*unstructured.Unstructured) - return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType, object, object) + return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType, + object, object) } func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) { @@ -43,7 +44,8 @@ func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) } object := obj.(*unstructured.Unstructured) - return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType, object, object) + return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType, + object, object) } func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { diff --git a/pkg/adapter/apiserver/metrics.go b/pkg/adapter/apiserver/metrics.go new file mode 100644 index 00000000000..2ae9dc2ee03 --- /dev/null +++ b/pkg/adapter/apiserver/metrics.go @@ -0,0 +1,26 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package apiserver + +// TODO: use HTTP status codes +// Type gets the eventtype. +func Result(err error) string { + if err != nil { + return "error" + } + return "success" +} diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 9fcbcede367..154a2b48f31 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -31,11 +31,15 @@ import ( ) type ref struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger - - controlledGVRs []schema.GroupVersionResource + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger + + controlledGVRs []schema.GroupVersionResource + reporter StatsReporter + namespace string + apiServerImporter string } var _ cache.Store = (*ref)(nil) @@ -67,11 +71,7 @@ func (a *ref) Add(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -82,11 +82,7 @@ func (a *ref) Update(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -97,11 +93,7 @@ func (a *ref) Delete(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { @@ -112,6 +104,24 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { a.controlledGVRs = append(a.controlledGVRs, gvr) } +func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + apiServerImporter: a.apiServerImporter, + } + a.reporter.ReportEventCount(reportArgs, nil) + + _, err := a.ce.Send(ctx, *event) + if err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + a.reporter.ReportEventCount(reportArgs, err) + return err + } + return nil +} + // Stub cache.Store impl // Implements cache.Store diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go index 60a95b10aeb..7c4cc44296e 100644 --- a/pkg/adapter/apiserver/ref_test.go +++ b/pkg/adapter/apiserver/ref_test.go @@ -8,43 +8,43 @@ import ( ) func TestRefAddEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) } func TestRefUpdateEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) } func TestRefDeleteEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) } func TestRefAddEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) } func TestRefUpdateEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) } func TestRefDeleteEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) } func TestRefAddEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -55,7 +55,7 @@ func TestRefAddEventAsController(t *testing.T) { } func TestRefUpdateEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -66,7 +66,7 @@ func TestRefUpdateEventAsController(t *testing.T) { } func TestRefDeleteEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -78,7 +78,7 @@ func TestRefDeleteEventAsController(t *testing.T) { // HACKHACKHACK For test coverage. func TestRefStub(t *testing.T) { - d, _ := makeRefAndTestingClient() + d, _ := makeRefAndTestingClient(t) d.List() d.ListKeys() diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 043327df91a..2e8fef9cbdc 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -27,9 +27,13 @@ import ( ) type resource struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger + reporter StatsReporter + namespace string + apiServerImporter string } var _ cache.Store = (*resource)(nil) @@ -41,12 +45,7 @@ func (a *resource) Add(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - - return nil + return a.sendEvent(context.Background(), event, a.reporter) } func (a *resource) Update(obj interface{}) error { @@ -56,10 +55,7 @@ func (a *resource) Update(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } + return a.sendEvent(context.Background(), event, a.reporter) return nil } @@ -71,17 +67,32 @@ func (a *resource) Delete(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { + return a.sendEvent(context.Background(), event, a.reporter) +} + +func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, + reporter StatsReporter) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + apiServerImporter: a.apiServerImporter, + } + a.reporter.ReportEventCount(reportArgs, nil) + + _, err := a.ce.Send(ctx, *event) + if err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + a.reporter.ReportEventCount(reportArgs, err) return err } - return nil } func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { // not supported for resource. - a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) + a.logger.Warn("ignored controller watch request on gvr.", + zap.String("gvr", gvr.String())) } // Stub cache.Store impl @@ -97,12 +108,14 @@ func (a *resource) ListKeys() []string { } // Implements cache.Store -func (a *resource) Get(obj interface{}) (item interface{}, exists bool, err error) { +func (a *resource) Get(obj interface{}) (item interface{}, exists bool, + err error) { return nil, false, nil } // Implements cache.Store -func (a *resource) GetByKey(key string) (item interface{}, exists bool, err error) { +func (a *resource) GetByKey(key string) (item interface{}, exists bool, + err error) { return nil, false, nil } diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go index 0a7d75151fd..fedc1219059 100644 --- a/pkg/adapter/apiserver/resource_test.go +++ b/pkg/adapter/apiserver/resource_test.go @@ -8,49 +8,49 @@ import ( ) func TestResourceAddEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) } func TestResourceUpdateEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) } func TestResourceDeleteEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) } func TestResourceAddEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) } func TestResourceUpdateEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) } func TestResourceDeleteEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) } func TestResourceCoverageHacks(t *testing.T) { - d, _ := makeResourceAndTestingClient() + d, _ := makeResourceAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{}) // for coverage. } // HACKHACKHACK For test coverage. func TestResourceStub(t *testing.T) { - d, _ := makeResourceAndTestingClient() + d, _ := makeResourceAndTestingClient(t) d.List() d.ListKeys() diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go new file mode 100644 index 00000000000..3193975fa8f --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -0,0 +1,150 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package apiserver + +import ( + "context" + + "knative.dev/pkg/metrics/metricskey" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + metricsKeyEventing "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics" +) + +var ( + // eventCountM is a counter which records the number of events sent + // by an Importer. + eventCountM = stats.Int64( + "event_count", + "Number of events created", + stats.UnitDimensionless, + ) + _ StatsReporter = (*reporter)(nil) +) + +type ReportArgs struct { + ns string + eventType string + eventSource string + apiServerImporter string +} + +const ( + importerResourceGroupValue = "apiserversources.sources.eventing.knative.dev" +) + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, err error) error +} + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { + namespaceTagKey tag.Key + eventTypeTagKey tag.Key + eventSourceTagKey tag.Key + importerNameTagKey tag.Key + importerResourceGroupTagKey tag.Key + resultKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports apiserversource +// metrics. +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + + eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) + if err != nil { + return nil, err + } + r.eventTypeTagKey = eventTypeTag + + eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) + if err != nil { + return nil, err + } + r.eventSourceTagKey = eventSourceTag + + importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) + if err != nil { + return nil, err + } + r.importerNameTagKey = importerNameTag + + importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) + if err != nil { + return nil, err + } + r.importerResourceGroupTagKey = importerResourceGroupTag + + resultTag, err := tag.NewKey(metricsKeyEventing.LabelResult) + if err != nil { + return nil, err + } + r.resultKey = resultTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceTagKey, + r.eventTypeTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey}, + }, + ) + if err != nil { + return nil, err + } + + return r, nil +} + +// ReportEventCount captures the event count. +func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, Result(err))) + if err != nil { + return err + } + ctx, err = r.generateTag(args, tag.Insert(r.importerResourceGroupTagKey, + importerResourceGroupValue)) + if err != nil { + return err + } + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) { + return tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.eventSourceTagKey, args.eventSource), + tag.Insert(r.eventTypeTagKey, args.eventType), + tag.Insert(r.importerNameTagKey, args.apiServerImporter), + t) +} diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go new file mode 100644 index 00000000000..140eee65c74 --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "fmt" + "testing" + + metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count") +} + +func TestStatsReporter(t *testing.T) { + args := &ReportArgs{ + ns: "testns", + eventType: "dev.knative.apiserver.resource.update", + eventSource: "unit-test", + apiServerImporter: "testimporter", + } + + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskey.LabelEventType: "dev.knative.apiserver.resource.update", + metricskey.LabelEventSource: "unit-test", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) + +} + +func TestReporterForErrorTag(t *testing.T) { + r, err := NewStatsReporter() + defer unregister() + + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + + args := &ReportArgs{ + ns: "testns", + eventType: "eventtype", + eventSource: "eventsource", + apiServerImporter: "testimporter", + } + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskeyEventing.LabelFilterResult: "success", + metricskey.LabelEventType: "eventtype", + metricskey.LabelEventSource: "eventsource", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", + } + e := fmt.Errorf("test error") + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, e) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, e) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index d3c319f7eb0..2c788c8df31 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -43,6 +43,8 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/apiserversource/resources" "knative.dev/eventing/pkg/utils" + pkgLogging "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" ) const ( @@ -56,6 +58,8 @@ const ( // raImageEnvVar is the name of the environment variable that contains the receive adapter's // image. It must be defined. raImageEnvVar = "APISERVER_RA_IMAGE" + + component = "apiserversource" ) var ( @@ -87,6 +91,9 @@ type Reconciler struct { source string sinkReconciler *duck.SinkReconciler + context context.Context + loggingConfig *pkgLogging.Config + metricsConfig *metrics.ExporterOptions } // Reconcile compares the actual state with the desired, and attempts to @@ -187,10 +194,12 @@ func (r *Reconciler) getReceiveAdapterImage() string { func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) { adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.getReceiveAdapterImage(), - Source: src, - Labels: resources.Labels(src.Name), - SinkURI: sinkURI, + Image: r.getReceiveAdapterImage(), + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + LoggingConfig: utils.LoggingConfigToBase64(r.loggingConfig), + MetricsConfig: utils.MetricsOptionsToBase64(r.metricsConfig), } expected := resources.MakeReceiveAdapter(&adapterArgs) @@ -374,3 +383,30 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.ApiServ return cj, err } + +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) + if err != nil { + logging.FromContext(r.context).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + logging.FromContext(r.context).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + logging.FromContext(r.context).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) +} diff --git a/pkg/reconciler/apiserversource/controller.go b/pkg/reconciler/apiserversource/controller.go index df8dc477de9..9cb6a7165eb 100644 --- a/pkg/reconciler/apiserversource/controller.go +++ b/pkg/reconciler/apiserversource/controller.go @@ -25,10 +25,12 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/metrics" eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" apiserversourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/apiserversource" deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" + "knative.dev/pkg/logging" ) const ( @@ -57,6 +59,7 @@ func NewController( apiserversourceLister: apiServerSourceInformer.Lister(), deploymentLister: deploymentInformer.Lister(), source: GetCfgHost(ctx), + context: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) @@ -76,5 +79,8 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) + return impl } diff --git a/pkg/reconciler/apiserversource/controller_test.go b/pkg/reconciler/apiserversource/controller_test.go index e1895f92518..1f18800a706 100644 --- a/pkg/reconciler/apiserversource/controller_test.go +++ b/pkg/reconciler/apiserversource/controller_test.go @@ -17,11 +17,13 @@ limitations under the License. package apiserversource import ( + "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "knative.dev/pkg/configmap" - logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" @@ -35,8 +37,26 @@ func TestNew(t *testing.T) { defer logtesting.ClearAll() ctx, _ := SetupFakeContext(t) ctx = withCfgHost(ctx, &rest.Config{Host: "unit_test"}) - - c := NewController(ctx, configmap.NewFixedWatcher()) + os.Setenv("METRICS_DOMAIN", "knative.dev/eventing") + c := NewController(ctx, configmap.NewFixedWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + })) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index a4b961bf2ee..48a82737b04 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -30,10 +30,12 @@ import ( // ReceiveAdapterArgs are the arguments needed to create a ApiServer Receive Adapter. // Every field is required. type ReceiveAdapterArgs struct { - Image string - Source *v1alpha1.ApiServerSource - Labels map[string]string - SinkURI string + Image string + Source *v1alpha1.ApiServerSource + Labels map[string]string + SinkURI string + MetricsConfig string + LoggingConfig string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -43,8 +45,9 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, - Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("apiserversource-%s", args.Source.Name)), - Labels: args.Labels, + Name: utils.GenerateFixedName(args.Source, + fmt.Sprintf("apiserversource-%s", args.Source.Name)), + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), }, @@ -67,7 +70,13 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, - Env: makeEnv(args.SinkURI, &args.Source.Spec), + Env: makeEnv(args.SinkURI, args.LoggingConfig, + args.MetricsConfig, &args.Source.Spec, + args.Source.ObjectMeta.Name), + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, }, }, }, @@ -76,7 +85,8 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { } } -func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { +func makeEnv(sinkURI, loggingConfig, metricsConfig string, + spec *v1alpha1.ApiServerSourceSpec, name string) []corev1.EnvVar { apiversions := "" kinds := "" controlled := "" @@ -115,5 +125,17 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar FieldPath: "metadata.namespace", }, }, + }, { + Name: "APISERVERIMPORTER", + Value: name, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: metricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: loggingConfig, }} } diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 9a99c94cfee..bf23ac31d29 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -24,12 +25,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" + _ "knative.dev/pkg/metrics/testing" ) func TestMakeReceiveAdapter(t *testing.T) { + name := "source-name" src := &v1alpha1.ApiServerSource{ ObjectMeta: metav1.ObjectMeta{ - Name: "source-name", + Name: name, Namespace: "source-namespace", UID: "1234", }, @@ -61,10 +64,11 @@ func TestMakeReceiveAdapter(t *testing.T) { one := int32(1) trueValue := true + want := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: "source-namespace", - Name: "apiserversource-source-name-1234", + Name: fmt.Sprintf("apiserversource-%s-1234", name), Labels: map[string]string{ "test-key1": "test-value1", "test-key2": "test-value2", @@ -73,7 +77,7 @@ func TestMakeReceiveAdapter(t *testing.T) { { APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "ApiServerSource", - Name: "source-name", + Name: name, UID: "1234", Controller: &trueValue, BlockOwnerDeletion: &trueValue, @@ -104,6 +108,10 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "receive-adapter", Image: "test-image", + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SINK_URI", @@ -126,6 +134,18 @@ func TestMakeReceiveAdapter(t *testing.T) { FieldPath: "metadata.namespace", }, }, + }, { + Name: "APISERVERIMPORTER", + Value: name, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: "", + }, { + Name: "K_LOGGING_CONFIG", + Value: "", }, }, }, diff --git a/pkg/utils/config.go b/pkg/utils/config.go new file mode 100644 index 00000000000..48fbb2adf3b --- /dev/null +++ b/pkg/utils/config.go @@ -0,0 +1,134 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package utils + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" +) + +var zapLoggerConfig = "zap-logger-config" + +// Base64ToMetricsOptions converts a json+base64 string of a +// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. +func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { + var opts metrics.ExporterOptions + if base64 == "" { + return nil, errors.New("base64 metrics string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &opts); err != nil { + return nil, err + } + + return &opts, nil +} + +// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 +// string. +func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { + if opts == nil { + return "" + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Opts, err := json.Marshal(jsonOpts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Opts)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} + +// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. +// Returns a non-nil logging.Config always. +func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { + if base64 == "" { + return nil, errors.New("base64 logging string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + var configMap map[string]string + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &configMap); err != nil { + return nil, err + } + + cfg, err := logging.NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } + } + return cfg, nil +} + +// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. +func LoggingConfigToBase64(cfg *logging.Config) string { + if cfg == nil || cfg.LoggingConfig == "" { + return "" + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Cfg, err := json.Marshal(jsonCfg) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Cfg)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} diff --git a/pkg/utils/config_test.go b/pkg/utils/config_test.go new file mode 100644 index 00000000000..8ca4c0c081b --- /dev/null +++ b/pkg/utils/config_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + + "go.uber.org/zap/zapcore" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + + "github.com/google/go-cmp/cmp" +) + +func TestMetricsOptions(t *testing.T) { + testCases := map[string]struct { + opts *metrics.ExporterOptions + want string + wantErr string + }{ + "nil": { + opts: nil, + want: "", + wantErr: "base64 metrics string is empty", + }, + "happy": { + opts: &metrics.ExporterOptions{ + Domain: "domain", + Component: "component", + PrometheusPort: 9090, + ConfigMap: map[string]string{ + "foo": "bar", + "boosh": "kakow", + }, + }, + want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := MetricsOptionsToBase64(tc.opts) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to options. + { + want := tc.opts + got, gotErr := Base64ToMetricsOptions(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} + +func TestLoggingConfig(t *testing.T) { + testCases := map[string]struct { + cfg *logging.Config + want string + wantErr string + }{ + "nil": { + cfg: nil, + want: "", + wantErr: "base64 logging string is empty", + }, + "happy": { + cfg: &logging.Config{ + LoggingConfig: "{}", + LoggingLevel: map[string]zapcore.Level{}, + }, + want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := LoggingConfigToBase64(tc.cfg) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to config. + if tc.cfg != nil { + want := tc.cfg + got, gotErr := Base64ToLoggingConfig(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} diff --git a/vendor/k8s.io/client-go/dynamic/fake/simple.go b/vendor/k8s.io/client-go/dynamic/fake/simple.go index dde45892f8e..13e2d8055e8 100644 --- a/vendor/k8s.io/client-go/dynamic/fake/simple.go +++ b/vendor/k8s.io/client-go/dynamic/fake/simple.go @@ -45,7 +45,7 @@ func NewSimpleDynamicClient(scheme *runtime.Scheme, objects ...runtime.Object) * } } - cs := &FakeDynamicClient{scheme: scheme} + cs := &FakeDynamicClient{} cs.AddReactor("*", "*", testing.ObjectReaction(o)) cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) { gvr := action.GetResource() diff --git a/vendor/k8s.io/code-generator/Godeps/OWNERS b/vendor/k8s.io/code-generator/Godeps/OWNERS new file mode 100644 index 00000000000..3d49f30605a --- /dev/null +++ b/vendor/k8s.io/code-generator/Godeps/OWNERS @@ -0,0 +1,2 @@ +approvers: +- dep-approvers diff --git a/vendor/k8s.io/code-generator/OWNERS b/vendor/k8s.io/code-generator/OWNERS new file mode 100644 index 00000000000..4155fc60ccd --- /dev/null +++ b/vendor/k8s.io/code-generator/OWNERS @@ -0,0 +1,11 @@ +approvers: +- lavalamp +- wojtek-t +- sttts +reviewers: +- lavalamp +- wojtek-t +- sttts +labels: +- sig/api-machinery +- area/code-generation diff --git a/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS b/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS new file mode 100644 index 00000000000..0c408a1aa94 --- /dev/null +++ b/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS @@ -0,0 +1,8 @@ +approvers: +- lavalamp +- wojtek-t +- caesarxuchao +reviewers: +- lavalamp +- wojtek-t +- caesarxuchao diff --git a/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS b/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS new file mode 100644 index 00000000000..05d4b2a6574 --- /dev/null +++ b/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS @@ -0,0 +1,4 @@ +approvers: +- smarterclayton +reviewers: +- smarterclayton diff --git a/vendor/knative.dev/pkg/OWNERS b/vendor/knative.dev/pkg/OWNERS new file mode 100644 index 00000000000..030df98f78d --- /dev/null +++ b/vendor/knative.dev/pkg/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- pkg-approvers diff --git a/vendor/knative.dev/pkg/OWNERS_ALIASES b/vendor/knative.dev/pkg/OWNERS_ALIASES index 51ae932f7fb..86d5793c851 100644 --- a/vendor/knative.dev/pkg/OWNERS_ALIASES +++ b/vendor/knative.dev/pkg/OWNERS_ALIASES @@ -26,13 +26,16 @@ aliases: - dprotaso controller-approvers: - - mattmoor + - dgerd - grantr + - mattmoor - tcnghia + - vagababov kmeta-approvers: - mattmoor - - jonjohnsonjr + - dgerd + - vagababov logging-approvers: - mdemirhan diff --git a/vendor/knative.dev/pkg/apis/OWNERS b/vendor/knative.dev/pkg/apis/OWNERS new file mode 100644 index 00000000000..a25420ebc0d --- /dev/null +++ b/vendor/knative.dev/pkg/apis/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-approvers diff --git a/vendor/knative.dev/pkg/apis/duck/OWNERS b/vendor/knative.dev/pkg/apis/duck/OWNERS new file mode 100644 index 00000000000..ad4d83c51e4 --- /dev/null +++ b/vendor/knative.dev/pkg/apis/duck/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-duck-approvers diff --git a/vendor/knative.dev/pkg/apis/istio/OWNERS b/vendor/knative.dev/pkg/apis/istio/OWNERS new file mode 100644 index 00000000000..c09668f13f1 --- /dev/null +++ b/vendor/knative.dev/pkg/apis/istio/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-istio-approvers diff --git a/vendor/knative.dev/pkg/cloudevents/OWNERS b/vendor/knative.dev/pkg/cloudevents/OWNERS new file mode 100644 index 00000000000..27343aba0a9 --- /dev/null +++ b/vendor/knative.dev/pkg/cloudevents/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- cloudevents-approvers diff --git a/vendor/knative.dev/pkg/configmap/OWNERS b/vendor/knative.dev/pkg/configmap/OWNERS new file mode 100644 index 00000000000..2480fc6d43f --- /dev/null +++ b/vendor/knative.dev/pkg/configmap/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- configmap-approvers diff --git a/vendor/knative.dev/pkg/controller/OWNERS b/vendor/knative.dev/pkg/controller/OWNERS new file mode 100644 index 00000000000..afa22257a26 --- /dev/null +++ b/vendor/knative.dev/pkg/controller/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- controller-approvers diff --git a/vendor/knative.dev/pkg/hack/OWNERS b/vendor/knative.dev/pkg/hack/OWNERS new file mode 100644 index 00000000000..c50adc84930 --- /dev/null +++ b/vendor/knative.dev/pkg/hack/OWNERS @@ -0,0 +1,10 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers + +reviewers: +- productivity-reviewers + +labels: +- area/test-and-release diff --git a/vendor/knative.dev/pkg/injection/OWNERS b/vendor/knative.dev/pkg/injection/OWNERS new file mode 100644 index 00000000000..dda47512a47 --- /dev/null +++ b/vendor/knative.dev/pkg/injection/OWNERS @@ -0,0 +1,5 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- mattmoor +- n3wscott diff --git a/vendor/knative.dev/pkg/kmeta/OWNERS b/vendor/knative.dev/pkg/kmeta/OWNERS new file mode 100644 index 00000000000..29b0d9f2562 --- /dev/null +++ b/vendor/knative.dev/pkg/kmeta/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- kmeta-approvers diff --git a/vendor/knative.dev/pkg/logging/OWNERS b/vendor/knative.dev/pkg/logging/OWNERS new file mode 100644 index 00000000000..fa4854ba0a5 --- /dev/null +++ b/vendor/knative.dev/pkg/logging/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- logging-approvers diff --git a/vendor/knative.dev/pkg/metrics/OWNERS b/vendor/knative.dev/pkg/metrics/OWNERS new file mode 100644 index 00000000000..6d3966df44e --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- metrics-approvers diff --git a/vendor/knative.dev/pkg/network/OWNERS b/vendor/knative.dev/pkg/network/OWNERS new file mode 100644 index 00000000000..5fa3f1016d9 --- /dev/null +++ b/vendor/knative.dev/pkg/network/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- tcnghia diff --git a/vendor/knative.dev/pkg/reconciler/OWNERS b/vendor/knative.dev/pkg/reconciler/OWNERS new file mode 100644 index 00000000000..afa22257a26 --- /dev/null +++ b/vendor/knative.dev/pkg/reconciler/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- controller-approvers diff --git a/vendor/knative.dev/pkg/resolver/OWNERS b/vendor/knative.dev/pkg/resolver/OWNERS new file mode 100644 index 00000000000..acf2ee2c1cf --- /dev/null +++ b/vendor/knative.dev/pkg/resolver/OWNERS @@ -0,0 +1,5 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- n3wscott +- vaikas-google diff --git a/vendor/knative.dev/pkg/test/OWNERS b/vendor/knative.dev/pkg/test/OWNERS new file mode 100644 index 00000000000..c50adc84930 --- /dev/null +++ b/vendor/knative.dev/pkg/test/OWNERS @@ -0,0 +1,10 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers + +reviewers: +- productivity-reviewers + +labels: +- area/test-and-release diff --git a/vendor/knative.dev/pkg/testutils/OWNERS b/vendor/knative.dev/pkg/testutils/OWNERS new file mode 100644 index 00000000000..46b7acf91e9 --- /dev/null +++ b/vendor/knative.dev/pkg/testutils/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers diff --git a/vendor/knative.dev/pkg/webhook/OWNERS b/vendor/knative.dev/pkg/webhook/OWNERS new file mode 100644 index 00000000000..b87878d94ae --- /dev/null +++ b/vendor/knative.dev/pkg/webhook/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- webhook-approvers