diff --git a/Gopkg.lock b/Gopkg.lock index e8fbbd82acd..a1068d2bd68 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1261,6 +1261,7 @@ "metrics", "metrics/metricskey", "metrics/metricstest", + "metrics/testing", "profiling", "reconciler/testing", "signals", @@ -1416,6 +1417,7 @@ "knative.dev/pkg/metrics", "knative.dev/pkg/metrics/metricskey", "knative.dev/pkg/metrics/metricstest", + "knative.dev/pkg/metrics/testing", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/system", diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 94e57ed4b2f..ecf31b88dcf 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -18,15 +18,14 @@ package main import ( "flag" - "log" + "fmt" "strings" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // Uncomment the following line to load the gcp plugin + // (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -34,9 +33,16 @@ import ( "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" + "knative.dev/eventing/pkg/utils" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" ) +const ( + component = "apiserversource" +) + var ( masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") @@ -60,6 +66,17 @@ type envConfig struct { Kind StringList `required:"true"` Controller []bool `required:"true"` LabelSelector StringList `envconfig:"SELECTOR" required:"true"` + Name string `envconfig:"NAME" required:"true"` + // MetricsConfigBase64 is a base64 encoded json string of + // metrics.ExporterOptions. This is used to configure the metrics exporter + // options, the config is stored in a config map inside the controllers + // namespace and copied here. + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` } // TODO: the controller should take the list of GVR @@ -67,18 +84,41 @@ type envConfig struct { func main() { flag.Parse() - logCfg := zap.NewProductionConfig() - logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - dlogger, err := logCfg.Build() + var env envConfig + err := envconfig.Process("", &env) if err != nil { - log.Fatalf("Error building logger: %v", err) + panic(fmt.Sprintf("Error processing env var: %s", err)) + } + // TODO move this util to pkg + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64) + if err != nil { + fmt.Printf("[ERROR] failed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } + } + loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component) + logger := loggerSugared.Desugar() + defer flush(loggerSugared) + + // Convert base64 encoded json metrics.ExporterOptions to + // metrics.ExporterOptions. + metricsConfig, err := utils.Base64ToMetricsOptions( + env.MetricsConfigBase64) + if err != nil { + logger.Error("failed to process metrics options ", zap.Error(err)) } - logger := dlogger.Sugar() - var env envConfig - err = envconfig.Process("", &env) + if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil { + logger.Error("failed to create the metrics exporter ", zap.Error(err)) + } + + reporter, err := apiserver.NewStatsReporter() if err != nil { - logger.Fatalw("Error processing environment", zap.Error(err)) + logger.Error("error building statsreporter", zap.Error(err)) } // set up signals so we handle the first shutdown signal gracefully @@ -86,26 +126,24 @@ func main() { cfg, err := clientcmd.BuildConfigFromFlags(*masterURL, *kubeconfig) if err != nil { - logger.Fatalw("Error building kubeconfig", zap.Error(err)) + logger.Fatal("error building kubeconfig", zap.Error(err)) } - logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - client, err := dynamic.NewForConfig(cfg) if err != nil { - logger.Fatalw("Error building dynamic client", zap.Error(err)) + logger.Fatal("error building dynamic client", zap.Error(err)) } - if err = tracing.SetupStaticPublishing(logger, "apiserversource", tracing.OnePercentSampling); err != nil { - // If tracing doesn't work, we will log an error, but allow the importer to continue to - // start. + if err = tracing.SetupStaticPublishing(loggerSugared, "apiserversource", tracing.OnePercentSampling); err != nil { + // If tracing doesn't work, we will log an error, but allow the importer + // to continue to start. logger.Error("Error setting up trace publishing", zap.Error(err)) } eventsClient, err := kncloudevents.NewDefaultClient(env.SinkURI) if err != nil { - logger.Fatalw("Error building cloud event client", zap.Error(err)) + logger.Fatal("error building cloud event client", zap.Error(err)) } gvrcs := []apiserver.GVRC(nil) @@ -117,7 +155,7 @@ func main() { gv, err := schema.ParseGroupVersion(apiVersion) if err != nil { - logger.Fatalw("Error parsing APIVersion", zap.Error(err)) + logger.Fatal("error parsing APIVersion", zap.Error(err)) } // TODO: pass down the resource and the kind so we do not have to guess. gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) @@ -134,9 +172,14 @@ func main() { GVRCs: gvrcs, } - a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt) - logger.Info("starting kubernetes api adapter") + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name) + logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) } } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index 02641e717f9..b75ff61b8aa 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -109,3 +109,21 @@ - source_labels: [__meta_kubernetes_service_name] target_label: service +# apiserver-source +- job_name: apiserver-source + scrape_interval: 3s + scrape_timeout: 3s + kubernetes_sd_configs: + - role: pod + relabel_configs: + # Scrape only the the targets matching the following metadata + - source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name] + action: keep + regex: apiserver-source-controller;metrics + # Rename metadata labels to be reader friendly + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_service_name] + target_label: service diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 03b3dd35e6a..71805e4790f 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -67,9 +67,13 @@ type adapter struct { mode string delegate eventDelegate + reporter StatsReporter + name string } -func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter { +func NewAdaptor(source string, k8sClient dynamic.Interface, + ceClient cloudevents.Client, logger *zap.SugaredLogger, + opt Options, reporter StatsReporter, name string) Adapter { mode := opt.Mode switch mode { case ResourceMode, RefMode: @@ -88,6 +92,8 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents gvrcs: opt.GVRCs, namespace: opt.Namespace, mode: mode, + reporter: reporter, + name: name, } return a } @@ -102,21 +108,26 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { stop := make(chan struct{}) resyncPeriod := time.Duration(10 * time.Hour) - var d eventDelegate switch a.mode { case ResourceMode: d = &resource{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + name: a.name, } case RefMode: d = &ref{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + name: a.name, } default: diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 9cc2c3778ed..df576abdc8c 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -32,6 +32,12 @@ import ( rectesting "knative.dev/eventing/pkg/reconciler/testing" ) +type mockReporter struct{} + +func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { + return nil +} + func TestNewAdaptor(t *testing.T) { ce := kncetesting.NewTestClient() logger := zap.NewExample().Sugar() @@ -132,8 +138,8 @@ func TestNewAdaptor(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - - a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt) + r := &mockReporter{} + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r, "test-importer") got, ok := a.(*adapter) if !ok { @@ -171,8 +177,8 @@ func TestAdapter_StartRef(t *testing.T) { }, }}, } - - a := NewAdaptor(source, k8s, ce, logger, opt) + r := &mockReporter{} + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") err := errors.New("test never ran") stopCh := make(chan struct{}) @@ -206,8 +212,8 @@ func TestAdapter_StartResource(t *testing.T) { }, }}, } - - a := NewAdaptor(source, k8s, ce, logger, opt) + r := &mockReporter{} + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") err := errors.New("test never ran") stopCh := make(chan struct{}) @@ -292,11 +298,12 @@ func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClie ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r := &mockReporter{} return &resource{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } @@ -304,10 +311,11 @@ func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r := &mockReporter{} return &ref{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index c7c9ac67a22..2b6848afcca 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -31,11 +31,15 @@ import ( ) type ref struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger controlledGVRs []schema.GroupVersionResource + reporter StatsReporter + namespace string + name string } var _ cache.Store = (*ref)(nil) @@ -67,11 +71,7 @@ func (a *ref) Add(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -82,11 +82,7 @@ func (a *ref) Update(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -97,11 +93,7 @@ func (a *ref) Delete(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event) } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { @@ -112,6 +104,23 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { a.controlledGVRs = append(a.controlledGVRs, gvr) } +func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + name: a.name, + } + + rctx, _, err := a.ce.Send(ctx, *event) + if err != nil { + a.logger.Info("failed to send a ref based event ", zap.Error(err)) + } + rtctx := cloudevents.HTTPTransportContextFrom(rctx) + a.reporter.ReportEventCount(reportArgs, rtctx.StatusCode) + return err +} + // Stub cache.Store impl // Implements cache.Store diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 567524caf1f..0c2bf1f2e37 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -27,9 +27,13 @@ import ( ) type resource struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger + reporter StatsReporter + namespace string + name string } var _ cache.Store = (*resource)(nil) @@ -41,12 +45,7 @@ func (a *resource) Add(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - - return nil + return a.sendEvent(context.Background(), event, a.reporter) } func (a *resource) Update(obj interface{}) error { @@ -56,10 +55,7 @@ func (a *resource) Update(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } + return a.sendEvent(context.Background(), event, a.reporter) return nil } @@ -71,12 +67,24 @@ func (a *resource) Delete(obj interface{}) error { return err } - if _, _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err + return a.sendEvent(context.Background(), event, a.reporter) +} + +func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + name: a.name, } - return nil + rctx, _, err := a.ce.Send(ctx, *event) + if err != nil { + a.logger.Info("failed to send a resource based event ", zap.Error(err)) + } + rtctx := cloudevents.HTTPTransportContextFrom(rctx) + a.reporter.ReportEventCount(reportArgs, rtctx.StatusCode) + return err } func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go new file mode 100644 index 00000000000..00602ae7bb6 --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -0,0 +1,155 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package apiserver + +import ( + "context" + "strconv" + + . "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/eventing/pkg/utils" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" +) + +var ( + // eventCountM is a counter which records the number of events sent + // by the ApiServerSource. + eventCountM = stats.Int64( + "event_count", + "Number of events created", + stats.UnitDimensionless, + ) +) + +type ReportArgs struct { + ns string + eventType string + eventSource string + name string +} + +const ( + importerResourceGroupValue = "apiserversources.sources.eventing.knative.dev" +) + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, responseCode int) error +} + +var _ StatsReporter = (*reporter)(nil) + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { + namespaceTagKey tag.Key + eventTypeTagKey tag.Key + eventSourceTagKey tag.Key + importerNameTagKey tag.Key + importerResourceGroupTagKey tag.Key + responseCodeKey tag.Key + responseCodeClassKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports apiserversource +// metrics. +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + + eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) + if err != nil { + return nil, err + } + r.eventSourceTagKey = eventSourceTag + + eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) + if err != nil { + return nil, err + } + r.eventTypeTagKey = eventTypeTag + + importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) + if err != nil { + return nil, err + } + r.importerNameTagKey = importerNameTag + + importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) + if err != nil { + return nil, err + } + r.importerResourceGroupTagKey = importerResourceGroupTag + + responseCodeTag, err := tag.NewKey(LabelResponseCode) + if err != nil { + return nil, err + } + r.responseCodeKey = responseCodeTag + responseCodeClassTag, err := tag.NewKey(LabelResponseCodeClass) + if err != nil { + return nil, err + } + r.responseCodeClassKey = responseCodeClassTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceTagKey, r.eventTypeTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey, r.responseCodeKey, r.responseCodeClassKey}, + }, + ) + if err != nil { + return nil, err + } + + return r, nil +} + +// ReportEventCount captures the event count. +func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { + ctx, err := r.generateTag(args, responseCode) + if err != nil { + return err + } + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) { + return tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.eventSourceTagKey, args.eventSource), + tag.Insert(r.eventTypeTagKey, args.eventType), + tag.Insert(r.importerNameTagKey, args.name), + tag.Insert(r.importerResourceGroupTagKey, importerResourceGroupValue), + tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)), + tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode))) +} diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go new file mode 100644 index 00000000000..6f86870db80 --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -0,0 +1,113 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "net/http" + "testing" + + . "knative.dev/eventing/pkg/metrics/metricskey" + metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count") +} + +func TestStatsReporter(t *testing.T) { + args := &ReportArgs{ + ns: "testns", + eventType: "dev.knative.apiserver.resource.update", + eventSource: "unit-test", + name: "testimporter", + } + + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskey.LabelEventType: "dev.knative.apiserver.resource.update", + metricskey.LabelEventSource: "unit-test", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", + LabelResponseCode: "202", + LabelResponseCodeClass: "2xx", + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) +} + +func TestReporterFor5xxResponse(t *testing.T) { + r, err := NewStatsReporter() + defer unregister() + + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + + args := &ReportArgs{ + ns: "testns", + eventType: "eventtype", + eventSource: "eventsource", + name: "testimporter", + } + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskeyEventing.LabelFilterResult: "success", + metricskey.LabelEventType: "eventtype", + metricskey.LabelEventSource: "eventsource", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", + LabelResponseCode: "500", + LabelResponseCodeClass: "5xx", + } + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusInternalServerError) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusInternalServerError) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index f182c0d2ef5..d748f6c4300 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -33,7 +33,6 @@ import ( "github.com/google/go-cmp/cmp" "go.uber.org/zap" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" @@ -459,11 +458,11 @@ func makeTriggerFilterWithAttributesAndExtension(t, s, e string) *eventingv1alph func makeTrigger(filter *eventingv1alpha1.TriggerFilter) *eventingv1alpha1.Trigger { return &eventingv1alpha1.Trigger{ - TypeMeta: v1.TypeMeta{ + TypeMeta: metav1.TypeMeta{ APIVersion: "eventing.knative.dev/v1alpha1", Kind: "Trigger", }, - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, Name: triggerName, UID: triggerUID, diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index d3c319f7eb0..1044381f1f4 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -43,6 +43,8 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/apiserversource/resources" "knative.dev/eventing/pkg/utils" + pkgLogging "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" ) const ( @@ -56,6 +58,8 @@ const ( // raImageEnvVar is the name of the environment variable that contains the receive adapter's // image. It must be defined. raImageEnvVar = "APISERVER_RA_IMAGE" + + component = "apiserversource" ) var ( @@ -87,6 +91,9 @@ type Reconciler struct { source string sinkReconciler *duck.SinkReconciler + loggingContext context.Context + loggingConfig *pkgLogging.Config + metricsConfig *metrics.ExporterOptions } // Reconcile compares the actual state with the desired, and attempts to @@ -186,11 +193,21 @@ func (r *Reconciler) getReceiveAdapterImage() string { } func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) { + loggingConfig, err := utils.LoggingConfigToBase64(r.loggingConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting logging config to base64", zap.Any("receiveAdapter", err)) + } + metricsConfig, err := utils.MetricsOptionsToBase64(r.metricsConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting metrics config to base64", zap.Any("receiveAdapter", err)) + } adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.getReceiveAdapterImage(), - Source: src, - Labels: resources.Labels(src.Name), - SinkURI: sinkURI, + Image: r.getReceiveAdapterImage(), + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + LoggingConfig: loggingConfig, + MetricsConfig: metricsConfig, } expected := resources.MakeReceiveAdapter(&adapterArgs) @@ -374,3 +391,30 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.ApiServ return cj, err } + +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) + if err != nil { + logging.FromContext(r.loggingContext).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) +} diff --git a/pkg/reconciler/apiserversource/controller.go b/pkg/reconciler/apiserversource/controller.go index df8dc477de9..da7b8ba81df 100644 --- a/pkg/reconciler/apiserversource/controller.go +++ b/pkg/reconciler/apiserversource/controller.go @@ -25,10 +25,12 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/metrics" eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" apiserversourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/apiserversource" deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" + "knative.dev/pkg/logging" ) const ( @@ -57,6 +59,7 @@ func NewController( apiserversourceLister: apiServerSourceInformer.Lister(), deploymentLister: deploymentInformer.Lister(), source: GetCfgHost(ctx), + loggingContext: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) @@ -76,5 +79,8 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) + return impl } diff --git a/pkg/reconciler/apiserversource/controller_test.go b/pkg/reconciler/apiserversource/controller_test.go index e1895f92518..1f18800a706 100644 --- a/pkg/reconciler/apiserversource/controller_test.go +++ b/pkg/reconciler/apiserversource/controller_test.go @@ -17,11 +17,13 @@ limitations under the License. package apiserversource import ( + "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "knative.dev/pkg/configmap" - logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" @@ -35,8 +37,26 @@ func TestNew(t *testing.T) { defer logtesting.ClearAll() ctx, _ := SetupFakeContext(t) ctx = withCfgHost(ctx, &rest.Config{Host: "unit_test"}) - - c := NewController(ctx, configmap.NewFixedWatcher()) + os.Setenv("METRICS_DOMAIN", "knative.dev/eventing") + c := NewController(ctx, configmap.NewFixedWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + })) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 1b899711cdf..258c1055a1d 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -30,10 +30,12 @@ import ( // ReceiveAdapterArgs are the arguments needed to create a ApiServer Receive Adapter. // Every field is required. type ReceiveAdapterArgs struct { - Image string - Source *v1alpha1.ApiServerSource - Labels map[string]string - SinkURI string + Image string + Source *v1alpha1.ApiServerSource + Labels map[string]string + SinkURI string + MetricsConfig string + LoggingConfig string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -67,7 +69,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, - Env: makeEnv(args.SinkURI, &args.Source.Spec), + Env: makeEnv(args.SinkURI, args.LoggingConfig, args.MetricsConfig, &args.Source.Spec, args.Source.ObjectMeta.Name), + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, }, }, }, @@ -76,7 +82,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { } } -func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { +func makeEnv(sinkURI, loggingConfig, metricsConfig string, spec *v1alpha1.ApiServerSourceSpec, name string) []corev1.EnvVar { apiversions := "" kinds := "" controlled := "" @@ -131,5 +137,17 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar FieldPath: "metadata.namespace", }, }, + }, { + Name: "NAME", + Value: name, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: metricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: loggingConfig, }} } diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 94d1ba96183..093b96bf259 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -24,12 +25,14 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" + _ "knative.dev/pkg/metrics/testing" ) func TestMakeReceiveAdapter(t *testing.T) { + name := "source-name" src := &v1alpha1.ApiServerSource{ ObjectMeta: metav1.ObjectMeta{ - Name: "source-name", + Name: name, Namespace: "source-namespace", UID: "1234", }, @@ -89,10 +92,11 @@ func TestMakeReceiveAdapter(t *testing.T) { one := int32(1) trueValue := true + want := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: "source-namespace", - Name: "apiserversource-source-name-1234", + Name: fmt.Sprintf("apiserversource-%s-1234", name), Labels: map[string]string{ "test-key1": "test-value1", "test-key2": "test-value2", @@ -101,7 +105,7 @@ func TestMakeReceiveAdapter(t *testing.T) { { APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "ApiServerSource", - Name: "source-name", + Name: name, UID: "1234", Controller: &trueValue, BlockOwnerDeletion: &trueValue, @@ -132,6 +136,10 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "receive-adapter", Image: "test-image", + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SINK_URI", @@ -157,6 +165,18 @@ func TestMakeReceiveAdapter(t *testing.T) { FieldPath: "metadata.namespace", }, }, + }, { + Name: "NAME", + Value: name, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: "", + }, { + Name: "K_LOGGING_CONFIG", + Value: "", }, }, }, diff --git a/pkg/utils/config.go b/pkg/utils/config.go new file mode 100644 index 00000000000..e682b797f52 --- /dev/null +++ b/pkg/utils/config.go @@ -0,0 +1,130 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package utils + +// TODO to move this to pkg +import ( + "encoding/json" + "errors" + "strconv" + + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" +) + +var zapLoggerConfig = "zap-logger-config" + +// Base64ToMetricsOptions converts a json+base64 string of a +// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. +func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { + var opts metrics.ExporterOptions + if base64 == "" { + return nil, errors.New("base64 metrics string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + if err := json.Unmarshal(bytes, &opts); err != nil { + return nil, err + } + + return &opts, nil +} + +// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 +// string. +func MetricsOptionsToBase64(opts *metrics.ExporterOptions) (string, error) { + if opts == nil { + return "", nil + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return "", err + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Opts, err := json.Marshal(jsonOpts) + if err != nil { + return "", err + } + + // Turn the base64 encoded []byte back into a string. + base64, err := strconv.Unquote(string(base64Opts)) + if err != nil { + return "", err + } + return base64, nil +} + +// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. +// Returns a non-nil logging.Config always. +func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { + if base64 == "" { + return nil, errors.New("base64 logging string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + var configMap map[string]string + if err := json.Unmarshal(bytes, &configMap); err != nil { + return nil, err + } + + cfg, err := logging.NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } + } + return cfg, nil +} + +// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. +func LoggingConfigToBase64(cfg *logging.Config) (string, error) { + if cfg == nil || cfg.LoggingConfig == "" { + return "", nil + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return "", err + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Cfg, err := json.Marshal(jsonCfg) + if err != nil { + return "", err + } + + // Turn the base64 encoded []byte back into a string. + base64, err := strconv.Unquote(string(base64Cfg)) + if err != nil { + return "", err + } + return base64, nil +} diff --git a/pkg/utils/config_test.go b/pkg/utils/config_test.go new file mode 100644 index 00000000000..cf2c8ab293b --- /dev/null +++ b/pkg/utils/config_test.go @@ -0,0 +1,144 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "testing" + + "go.uber.org/zap/zapcore" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + + "github.com/google/go-cmp/cmp" +) + +func TestMetricsOptions(t *testing.T) { + testCases := map[string]struct { + opts *metrics.ExporterOptions + want string + wantErr string + }{ + "nil": { + opts: nil, + want: "", + wantErr: "base64 metrics string is empty", + }, + "happy": { + opts: &metrics.ExporterOptions{ + Domain: "domain", + Component: "component", + PrometheusPort: 9090, + ConfigMap: map[string]string{ + "foo": "bar", + "boosh": "kakow", + }, + }, + want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64, err := MetricsOptionsToBase64(tc.opts) + if err != nil { + t.Errorf("error while converting metrics config to base64: %v", err) + } + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to options. + { + want := tc.opts + got, gotErr := Base64ToMetricsOptions(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} + +func TestLoggingConfig(t *testing.T) { + testCases := map[string]struct { + cfg *logging.Config + want string + wantErr string + }{ + "nil": { + cfg: nil, + want: "", + wantErr: "base64 logging string is empty", + }, + "happy": { + cfg: &logging.Config{ + LoggingConfig: "{}", + LoggingLevel: map[string]zapcore.Level{}, + }, + want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64, err := LoggingConfigToBase64(tc.cfg) + if err != nil { + t.Errorf("error while converting logging config to base64: %v", err) + } + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to config. + if tc.cfg != nil { + want := tc.cfg + got, gotErr := Base64ToLoggingConfig(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +}