From b5683bb56b17f9aaaa4e3bf0354db99ef19b21b8 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Tue, 10 Sep 2019 13:31:13 +0200 Subject: [PATCH 1/9] Added dataplane metrics cronjobsource --- cmd/cronjob_receive_adapter/main.go | 72 +++++++- .../100-prometheus-scrape-kn-eventing.yaml | 19 +++ pkg/adapter/cronjobevents/adapter.go | 15 +- pkg/adapter/cronjobevents/adapter_test.go | 16 +- pkg/adapter/cronjobevents/stats_reporter.go | 156 ++++++++++++++++++ .../cronjobevents/stats_reporter_test.go | 111 +++++++++++++ pkg/reconciler/cronjobsource/controller.go | 6 + .../cronjobsource/controller_test.go | 34 +++- pkg/reconciler/cronjobsource/cronjobsource.go | 54 +++++- .../cronjobsource/resources/labels.go | 7 + .../resources/receive_adapter.go | 27 ++- .../resources/receive_adapter_test.go | 16 ++ 12 files changed, 510 insertions(+), 23 deletions(-) create mode 100644 pkg/adapter/cronjobevents/stats_reporter.go create mode 100644 pkg/adapter/cronjobevents/stats_reporter_test.go diff --git a/cmd/cronjob_receive_adapter/main.go b/cmd/cronjob_receive_adapter/main.go index 189f492d997..a3054f9aa83 100644 --- a/cmd/cronjob_receive_adapter/main.go +++ b/cmd/cronjob_receive_adapter/main.go @@ -18,14 +18,17 @@ package main import ( "flag" + "fmt" + "knative.dev/eventing/pkg/utils" + "knative.dev/pkg/metrics" "log" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/net/context" "knative.dev/eventing/pkg/adapter/cronjobevents" "knative.dev/eventing/pkg/tracing" + "knative.dev/pkg/logging" "knative.dev/pkg/signals" ) @@ -44,29 +47,76 @@ type envConfig struct { // Environment variable containing the namespace of the cron job. Namespace string `envconfig:"NAMESPACE" required:"true"` + + // MetricsConfigBase64 is a base64 encoded json string of + // metrics.ExporterOptions. This is used to configure the metrics exporter + // options, the config is stored in a config map inside the controllers + // namespace and copied here. + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` } +const ( + component = "cronjobsource" +) + func main() { flag.Parse() ctx := context.Background() - logCfg := zap.NewProductionConfig() - logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - dlogger, err := logCfg.Build() + var env envConfig + err := envconfig.Process("", &env) + if err != nil { + panic(fmt.Sprintf("Error processing env var: %s", err)) + } + // TODO move this util to pkg + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64) + if err != nil { + fmt.Printf("[ERROR] failed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } + } + loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component) + logger := loggerSugared.Desugar() + defer flush(loggerSugared) + + // Convert base64 encoded json metrics.ExporterOptions to + // metrics.ExporterOptions. + metricsConfig, err := utils.Base64ToMetricsOptions( + env.MetricsConfigBase64) if err != nil { - log.Fatalf("Error building logger: %v", err) + logger.Error("failed to process metrics options", zap.Error(err)) + } + + if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil { + logger.Error("failed to create the metrics exporter", zap.Error(err)) } - logger := dlogger.Sugar() - var env envConfig if err := envconfig.Process("", &env); err != nil { log.Fatal("Failed to process env var", zap.Error(err)) } +<<<<<<< HEAD if err = tracing.SetupStaticPublishing(logger, "cronjobsource", tracing.OnePercentSampling); err != nil { // If tracing doesn't work, we will log an error, but allow the source to continue to +======= + reporter, err := cronjobevents.NewStatsReporter() + if err != nil { + logger.Fatal("Error building statsreporter", zap.Error(err)) + } + if err = tracing.SetupStaticPublishing(loggerSugared, "cronjobsource", tracing.OnePercentSampling); err != nil { + // If tracing doesn't work, we will log an error, but allow the importer to continue to +>>>>>>> Added dataplane metrics cronjobsource // start. - logger.Errorw("Error setting up trace publishing", err) + logger.Error("Error setting up trace publishing", zap.Error(err)) } adapter := &cronjobevents.Adapter{ @@ -75,6 +125,7 @@ func main() { SinkURI: env.Sink, Name: env.Name, Namespace: env.Namespace, + Reporter: reporter, } logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter)) @@ -85,3 +136,8 @@ func main() { logger.Fatal("Failed to start adapter", zap.Error(err)) } } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index b75ff61b8aa..47570672a29 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -127,3 +127,22 @@ target_label: pod - source_labels: [__meta_kubernetes_service_name] target_label: service + +# cronjob-source +- job_name: cronjob-source + scrape_interval: 3s + scrape_timeout: 3s + kubernetes_sd_configs: + - role: pod + relabel_configs: + # Scrape only the the targets matching the following metadata + - source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name] + action: keep + regex: cronjob-source-controller;metrics + # Rename metadata labels to be reader friendly + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_service_name] + target_label: service \ No newline at end of file diff --git a/pkg/adapter/cronjobevents/adapter.go b/pkg/adapter/cronjobevents/adapter.go index 60c047a9146..593064b3583 100644 --- a/pkg/adapter/cronjobevents/adapter.go +++ b/pkg/adapter/cronjobevents/adapter.go @@ -49,6 +49,8 @@ type Adapter struct { // client sends cloudevents. client cloudevents.Client + + Reporter StatsReporter } // Initialize cloudevent client @@ -92,10 +94,19 @@ func (a *Adapter) cronTick() { event.SetType(sourcesv1alpha1.CronJobEventType) event.SetSource(sourcesv1alpha1.CronJobEventSource(a.Namespace, a.Name)) event.SetData(message(a.Data)) + reportArgs := &ReportArgs{ + ns: a.Namespace, + eventSource: event.Source(), + eventType: event.Type(), + name: a.Name, + } - if _, _, err := a.client.Send(context.TODO(), event); err != nil { - logger.Error("failed to send cloudevent", err) + rctx, _, err := a.client.Send(context.TODO(), event) + rtctx := cloudevents.HTTPTransportContextFrom(rctx) + if err != nil { + logger.Error("failed to send cloudevent", zap.Error(err)) } + a.Reporter.ReportEventCount(reportArgs, rtctx.StatusCode) } type Message struct { diff --git a/pkg/adapter/cronjobevents/adapter_test.go b/pkg/adapter/cronjobevents/adapter_test.go index ae9ca48dd42..1a9972420a5 100644 --- a/pkg/adapter/cronjobevents/adapter_test.go +++ b/pkg/adapter/cronjobevents/adapter_test.go @@ -28,6 +28,12 @@ import ( "github.com/google/go-cmp/cmp" ) +type mockReporter struct{} + +func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { + return nil +} + func TestStart_ServeHTTP(t *testing.T) { testCases := map[string]struct { schedule string @@ -55,10 +61,12 @@ func TestStart_ServeHTTP(t *testing.T) { sinkServer := httptest.NewServer(h) defer sinkServer.Close() + r := &mockReporter{} a := &Adapter{ Schedule: tc.schedule, Data: "data", SinkURI: sinkServer.URL, + Reporter: r, } if err := a.initClient(); err != nil { @@ -89,8 +97,10 @@ func TestStart_ServeHTTP(t *testing.T) { func TestStartBadCron(t *testing.T) { schedule := "bad" + r := &mockReporter{} a := &Adapter{ Schedule: schedule, + Reporter: r, } stop := make(chan struct{}) @@ -125,9 +135,11 @@ func TestPostMessage_ServeHTTP(t *testing.T) { sinkServer := httptest.NewServer(h) defer sinkServer.Close() + r := &mockReporter{} a := &Adapter{ - Data: "data", - SinkURI: sinkServer.URL, + Data: "data", + SinkURI: sinkServer.URL, + Reporter: r, } if err := a.initClient(); err != nil { diff --git a/pkg/adapter/cronjobevents/stats_reporter.go b/pkg/adapter/cronjobevents/stats_reporter.go new file mode 100644 index 00000000000..e744b6472cd --- /dev/null +++ b/pkg/adapter/cronjobevents/stats_reporter.go @@ -0,0 +1,156 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package cronjobevents + +import ( + "context" + "strconv" + + . "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/eventing/pkg/utils" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + "knative.dev/pkg/metrics" + "knative.dev/pkg/metrics/metricskey" +) + +var ( + // eventCountM is a counter which records the number of events sent + // by the CronJobSource. + eventCountM = stats.Int64( + "event_count", + "Number of events created", + stats.UnitDimensionless, + ) +) + +type ReportArgs struct { + ns string + eventType string + eventSource string + name string +} + +const ( + importerResourceGroupValue = "cronjobsources.sources.eventing.knative.dev" +) + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, responseCode int) error +} + +var _ StatsReporter = (*reporter)(nil) + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { + namespaceTagKey tag.Key + eventTypeTagKey tag.Key + eventSourceTagKey tag.Key + importerNameTagKey tag.Key + importerResourceGroupTagKey tag.Key + responseCodeKey tag.Key + responseCodeClassKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports cronjobsource +// metrics. +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + + eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) + if err != nil { + return nil, err + } + r.eventTypeTagKey = eventTypeTag + + eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) + if err != nil { + return nil, err + } + r.eventSourceTagKey = eventSourceTag + + importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) + if err != nil { + return nil, err + } + r.importerNameTagKey = importerNameTag + + importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) + if err != nil { + return nil, err + } + r.importerResourceGroupTagKey = importerResourceGroupTag + + responseCodeTag, err := tag.NewKey(LabelResponseCode) + if err != nil { + return nil, err + } + r.responseCodeKey = responseCodeTag + + responseCodeClassTag, err := tag.NewKey(LabelResponseCodeClass) + if err != nil { + return nil, err + } + r.responseCodeClassKey = responseCodeClassTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.eventTypeTagKey, r.eventSourceTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey, r.responseCodeKey, r.responseCodeClassKey}, + }, + ) + if err != nil { + return nil, err + } + + return r, nil +} + +// ReportEventCount captures the event count. +func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { + ctx, err := r.generateTag(args, responseCode) + if err != nil { + return err + } + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) { + return tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.eventTypeTagKey, args.eventType), + tag.Insert(r.eventSourceTagKey, args.eventSource), + tag.Insert(r.importerNameTagKey, args.name), + tag.Insert(r.importerResourceGroupTagKey, importerResourceGroupValue), + tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)), + tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode))) +} diff --git a/pkg/adapter/cronjobevents/stats_reporter_test.go b/pkg/adapter/cronjobevents/stats_reporter_test.go new file mode 100644 index 00000000000..f225338e2d6 --- /dev/null +++ b/pkg/adapter/cronjobevents/stats_reporter_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2019 The Knative Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjobevents + +import ( + "net/http" + "testing" + + . "knative.dev/eventing/pkg/metrics/metricskey" + metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count") +} + +func TestStatsReporter(t *testing.T) { + args := &ReportArgs{ + ns: "testns", + eventType: "test-eventtype", + eventSource: "unit-test", + name: "testimporter", + } + + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskey.LabelEventType: "test-eventtype", + metricskey.LabelEventSource: "unit-test", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "cronjobsources.sources.eventing.knative.dev", + LabelResponseCode: "202", + LabelResponseCodeClass: "2xx", + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusAccepted) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) + +} + +func TestReporterFor5xxResponse(t *testing.T) { + r, err := NewStatsReporter() + defer unregister() + + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + + args := &ReportArgs{ + ns: "testns", + eventType: "eventtype", + eventSource: "eventsource", + name: "testimporter", + } + + wantTags := map[string]string{ + metricskey.LabelNamespaceName: "testns", + metricskeyEventing.LabelFilterResult: "success", + metricskey.LabelEventType: "eventtype", + metricskey.LabelEventSource: "eventsource", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "cronjobsources.sources.eventing.knative.dev", + LabelResponseCode: "500", + LabelResponseCodeClass: "5xx", + } + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusInternalServerError) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, http.StatusInternalServerError) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 2) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index 5109b8fcce1..bafee1b7dec 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -30,6 +30,9 @@ import ( deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" ) const ( @@ -69,6 +72,7 @@ func NewController( deploymentLister: deploymentInformer.Lister(), eventTypeLister: eventTypeInformer.Lister(), env: *env, + context: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) r.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey) @@ -85,6 +89,8 @@ func NewController( FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("CronJobSource")), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) return impl } diff --git a/pkg/reconciler/cronjobsource/controller_test.go b/pkg/reconciler/cronjobsource/controller_test.go index 4bb70ce8d0b..a799ad95e68 100644 --- a/pkg/reconciler/cronjobsource/controller_test.go +++ b/pkg/reconciler/cronjobsource/controller_test.go @@ -20,10 +20,11 @@ import ( "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" - // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype/fake" _ "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/cronjobsource/fake" @@ -51,6 +52,15 @@ func TestNew(t *testing.T) { t.Fatalf("Failed to unset env var: %v", err) } }() + + if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil { + t.Fatalf("Failed to set env var: %v", err) + } + defer func() { + if err := os.Unsetenv("METRICS_DOMAIN"); err != nil { + t.Fatalf("Failed to unset env var: %v", err) + } + }() } else { defer func() { r := recover() @@ -61,7 +71,27 @@ func TestNew(t *testing.T) { } ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewFixedWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 0f33366c51c..46e8a2bee55 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "knative.dev/pkg/metrics" "reflect" "time" @@ -41,7 +42,9 @@ import ( "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/cronjobsource/resources" + "knative.dev/eventing/pkg/utils" "knative.dev/pkg/controller" + pkgLogging "knative.dev/pkg/logging" ) const ( @@ -51,6 +54,7 @@ const ( cronJobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" cronJobSourceDeploymentCreated = "CronJobSurceDeploymentCreated" cronJobSourceDeploymentUpdated = "CronJobSourceDeploymentUpdated" + component = "cronjobsource" ) type Reconciler struct { @@ -63,7 +67,10 @@ type Reconciler struct { deploymentLister appsv1listers.DeploymentLister eventTypeLister eventinglisters.EventTypeLister + context context.Context sinkReconciler *duck.SinkReconciler + loggingConfig *pkgLogging.Config + metricsConfig *metrics.ExporterOptions } // Check that our Reconciler implements controller.Reconciler @@ -201,11 +208,23 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro return nil, err } + loggingConfig, err := utils.LoggingConfigToBase64(r.loggingConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting logging config to base64", zap.Any("receiveAdapter", err)) + } + + metricsConfig, err := utils.MetricsOptionsToBase64(r.metricsConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting metrics config to base64", zap.Any("receiveAdapter", err)) + } + adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.env.Image, - Source: src, - Labels: resources.Labels(src.Name), - SinkURI: sinkURI, + Image: r.env.Image, + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + LoggingConfig: loggingConfig, + MetricsConfig: metricsConfig, } expected := resources.MakeReceiveAdapter(&adapterArgs) @@ -320,3 +339,30 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob return cj, err } + +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) + if err != nil { + logging.FromContext(r.context).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + logging.FromContext(r.context).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + logging.FromContext(r.context).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) +} diff --git a/pkg/reconciler/cronjobsource/resources/labels.go b/pkg/reconciler/cronjobsource/resources/labels.go index 6ff6e1c7a76..b2bc3eb7101 100644 --- a/pkg/reconciler/cronjobsource/resources/labels.go +++ b/pkg/reconciler/cronjobsource/resources/labels.go @@ -16,9 +16,16 @@ limitations under the License. package resources +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "cronjob-source-controller" +) + // Labels are the labels attached to all resources based on a CronJobSource. func Labels(name string) map[string]string { return map[string]string{ "sources.eventing.knative.dev/cronJobSource": name, + "eventing.knative.dev/source": controllerAgentName, } } diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter.go b/pkg/reconciler/cronjobsource/resources/receive_adapter.go index 510d4aadf73..a134de04711 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter.go @@ -36,15 +36,18 @@ var ( // ReceiveAdapterArgs are the arguments needed to create a Cron Job Source Receive Adapter. Every // field is required. type ReceiveAdapterArgs struct { - Image string - Source *v1alpha1.CronJobSource - Labels map[string]string - SinkURI string + Image string + Source *v1alpha1.CronJobSource + Labels map[string]string + SinkURI string + MetricsConfig string + LoggingConfig string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for // Cron Job Sources. func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { + name := args.Source.ObjectMeta.Name RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU) if err != nil { RequestResourceCPU = resource.MustParse("250m") @@ -76,7 +79,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, - Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", args.Source.Name)), + Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", name)), Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), @@ -97,6 +100,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, + Ports: []corev1.ContainerPort{ + { + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SCHEDULE", @@ -117,6 +125,15 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "NAMESPACE", Value: args.Source.Namespace, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: args.MetricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: args.LoggingConfig, }, }, Resources: res, diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go index b054f09ad17..5d0bb0e1626 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go @@ -93,6 +93,10 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "receive-adapter", Image: "test-image", + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SCHEDULE", @@ -114,6 +118,18 @@ func TestMakeReceiveAdapter(t *testing.T) { Name: "NAMESPACE", Value: "source-namespace", }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, + { + Name: "K_METRICS_CONFIG", + Value: "", + }, + { + Name: "K_LOGGING_CONFIG", + Value: "", + }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{ From 17a068e5eab8e354e264414777c2c2b96a63deca Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Tue, 10 Sep 2019 14:05:13 +0200 Subject: [PATCH 2/9] Update config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml Co-Authored-By: mattmoor-sockpuppet --- .../metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index 47570672a29..24758c0af36 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -145,4 +145,4 @@ - source_labels: [__meta_kubernetes_pod_name] target_label: pod - source_labels: [__meta_kubernetes_service_name] - target_label: service \ No newline at end of file + target_label: service From f51e19b54c891fa5fa034ccdf948e90cb8c64f33 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Fri, 13 Sep 2019 22:09:30 +0200 Subject: [PATCH 3/9] Rebased and refactored --- cmd/cronjob_receive_adapter/main.go | 37 ++--- pkg/adapter/cronjobevents/adapter.go | 18 +- pkg/adapter/cronjobevents/adapter_test.go | 3 +- pkg/adapter/cronjobevents/stats_reporter.go | 156 ------------------ .../cronjobevents/stats_reporter_test.go | 111 ------------- pkg/reconciler/cronjobsource/controller.go | 1 - pkg/reconciler/cronjobsource/cronjobsource.go | 8 +- 7 files changed, 32 insertions(+), 302 deletions(-) delete mode 100644 pkg/adapter/cronjobevents/stats_reporter.go delete mode 100644 pkg/adapter/cronjobevents/stats_reporter_test.go diff --git a/cmd/cronjob_receive_adapter/main.go b/cmd/cronjob_receive_adapter/main.go index a3054f9aa83..43be39c8b31 100644 --- a/cmd/cronjob_receive_adapter/main.go +++ b/cmd/cronjob_receive_adapter/main.go @@ -19,8 +19,6 @@ package main import ( "flag" "fmt" - "knative.dev/eventing/pkg/utils" - "knative.dev/pkg/metrics" "log" "github.com/kelseyhightower/envconfig" @@ -29,7 +27,9 @@ import ( "knative.dev/eventing/pkg/adapter/cronjobevents" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" + "knative.dev/pkg/source" ) type envConfig struct { @@ -48,16 +48,16 @@ type envConfig struct { // Environment variable containing the namespace of the cron job. Namespace string `envconfig:"NAMESPACE" required:"true"` - // MetricsConfigBase64 is a base64 encoded json string of - // metrics.ExporterOptions. This is used to configure the metrics exporter - // options, the config is stored in a config map inside the controllers + // MetricsConfigJson is a json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, + // the config is stored in a config map inside the controllers // namespace and copied here. - MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"` - // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // LoggingConfigJson is a json string of logging.Config. // This is used to configure the logging config, the config is stored in // a config map inside the controllers namespace and copied here. - LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` + LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"` } const ( @@ -73,9 +73,8 @@ func main() { if err != nil { panic(fmt.Sprintf("Error processing env var: %s", err)) } - // TODO move this util to pkg - // Convert base64 encoded json logging.Config to logging.Config. - loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64) + // Convert json logging.Config to logging.Config. + loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson) if err != nil { fmt.Printf("[ERROR] failed to process logging config: %s", err.Error()) // Use default logging config. @@ -88,10 +87,8 @@ func main() { logger := loggerSugared.Desugar() defer flush(loggerSugared) - // Convert base64 encoded json metrics.ExporterOptions to - // metrics.ExporterOptions. - metricsConfig, err := utils.Base64ToMetricsOptions( - env.MetricsConfigBase64) + // Convert json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson) if err != nil { logger.Error("failed to process metrics options", zap.Error(err)) } @@ -103,18 +100,12 @@ func main() { if err := envconfig.Process("", &env); err != nil { log.Fatal("Failed to process env var", zap.Error(err)) } -<<<<<<< HEAD - - if err = tracing.SetupStaticPublishing(logger, "cronjobsource", tracing.OnePercentSampling); err != nil { - // If tracing doesn't work, we will log an error, but allow the source to continue to -======= - reporter, err := cronjobevents.NewStatsReporter() + reporter, err := source.NewStatsReporter() if err != nil { - logger.Fatal("Error building statsreporter", zap.Error(err)) + logger.Error("error building statsreporter", zap.Error(err)) } if err = tracing.SetupStaticPublishing(loggerSugared, "cronjobsource", tracing.OnePercentSampling); err != nil { // If tracing doesn't work, we will log an error, but allow the importer to continue to ->>>>>>> Added dataplane metrics cronjobsource // start. logger.Error("Error setting up trace publishing", zap.Error(err)) } diff --git a/pkg/adapter/cronjobevents/adapter.go b/pkg/adapter/cronjobevents/adapter.go index 593064b3583..6a99c2ffa8a 100644 --- a/pkg/adapter/cronjobevents/adapter.go +++ b/pkg/adapter/cronjobevents/adapter.go @@ -26,6 +26,7 @@ import ( sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/pkg/logging" + "knative.dev/pkg/source" ) // TODO: this should be a k8s cron. @@ -50,9 +51,13 @@ type Adapter struct { // client sends cloudevents. client cloudevents.Client - Reporter StatsReporter + Reporter source.StatsReporter } +const ( + resourceGroup = "cronjobsources.sources.eventing.knative.dev" +) + // Initialize cloudevent client func (a *Adapter) initClient() error { if a.client == nil { @@ -94,11 +99,12 @@ func (a *Adapter) cronTick() { event.SetType(sourcesv1alpha1.CronJobEventType) event.SetSource(sourcesv1alpha1.CronJobEventSource(a.Namespace, a.Name)) event.SetData(message(a.Data)) - reportArgs := &ReportArgs{ - ns: a.Namespace, - eventSource: event.Source(), - eventType: event.Type(), - name: a.Name, + reportArgs := &source.ReportArgs{ + Namespace: a.Namespace, + EventSource: event.Source(), + EventType: event.Type(), + Name: a.Name, + ResourceGroup: resourceGroup, } rctx, _, err := a.client.Send(context.TODO(), event) diff --git a/pkg/adapter/cronjobevents/adapter_test.go b/pkg/adapter/cronjobevents/adapter_test.go index 1a9972420a5..3616d776584 100644 --- a/pkg/adapter/cronjobevents/adapter_test.go +++ b/pkg/adapter/cronjobevents/adapter_test.go @@ -26,11 +26,12 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "knative.dev/pkg/source" ) type mockReporter struct{} -func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { +func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error { return nil } diff --git a/pkg/adapter/cronjobevents/stats_reporter.go b/pkg/adapter/cronjobevents/stats_reporter.go deleted file mode 100644 index e744b6472cd..00000000000 --- a/pkg/adapter/cronjobevents/stats_reporter.go +++ /dev/null @@ -1,156 +0,0 @@ -/* - * Copyright 2019 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package cronjobevents - -import ( - "context" - "strconv" - - . "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/eventing/pkg/utils" - - "go.opencensus.io/stats" - "go.opencensus.io/stats/view" - "go.opencensus.io/tag" - "knative.dev/pkg/metrics" - "knative.dev/pkg/metrics/metricskey" -) - -var ( - // eventCountM is a counter which records the number of events sent - // by the CronJobSource. - eventCountM = stats.Int64( - "event_count", - "Number of events created", - stats.UnitDimensionless, - ) -) - -type ReportArgs struct { - ns string - eventType string - eventSource string - name string -} - -const ( - importerResourceGroupValue = "cronjobsources.sources.eventing.knative.dev" -) - -// StatsReporter defines the interface for sending filter metrics. -type StatsReporter interface { - ReportEventCount(args *ReportArgs, responseCode int) error -} - -var _ StatsReporter = (*reporter)(nil) - -// reporter holds cached metric objects to report filter metrics. -type reporter struct { - namespaceTagKey tag.Key - eventTypeTagKey tag.Key - eventSourceTagKey tag.Key - importerNameTagKey tag.Key - importerResourceGroupTagKey tag.Key - responseCodeKey tag.Key - responseCodeClassKey tag.Key -} - -// NewStatsReporter creates a reporter that collects and reports cronjobsource -// metrics. -func NewStatsReporter() (StatsReporter, error) { - var r = &reporter{} - - // Create the tag keys that will be used to add tags to our measurements. - nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) - if err != nil { - return nil, err - } - r.namespaceTagKey = nsTag - - eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) - if err != nil { - return nil, err - } - r.eventTypeTagKey = eventTypeTag - - eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) - if err != nil { - return nil, err - } - r.eventSourceTagKey = eventSourceTag - - importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) - if err != nil { - return nil, err - } - r.importerNameTagKey = importerNameTag - - importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) - if err != nil { - return nil, err - } - r.importerResourceGroupTagKey = importerResourceGroupTag - - responseCodeTag, err := tag.NewKey(LabelResponseCode) - if err != nil { - return nil, err - } - r.responseCodeKey = responseCodeTag - - responseCodeClassTag, err := tag.NewKey(LabelResponseCodeClass) - if err != nil { - return nil, err - } - r.responseCodeClassKey = responseCodeClassTag - - // Create view to see our measurements. - err = view.Register( - &view.View{ - Description: eventCountM.Description(), - Measure: eventCountM, - Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.eventTypeTagKey, r.eventSourceTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey, r.responseCodeKey, r.responseCodeClassKey}, - }, - ) - if err != nil { - return nil, err - } - - return r, nil -} - -// ReportEventCount captures the event count. -func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { - ctx, err := r.generateTag(args, responseCode) - if err != nil { - return err - } - metrics.Record(ctx, eventCountM.M(1)) - return nil -} - -func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) { - return tag.New( - context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.eventTypeTagKey, args.eventType), - tag.Insert(r.eventSourceTagKey, args.eventSource), - tag.Insert(r.importerNameTagKey, args.name), - tag.Insert(r.importerResourceGroupTagKey, importerResourceGroupValue), - tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode))) -} diff --git a/pkg/adapter/cronjobevents/stats_reporter_test.go b/pkg/adapter/cronjobevents/stats_reporter_test.go deleted file mode 100644 index f225338e2d6..00000000000 --- a/pkg/adapter/cronjobevents/stats_reporter_test.go +++ /dev/null @@ -1,111 +0,0 @@ -/* -Copyright 2019 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package cronjobevents - -import ( - "net/http" - "testing" - - . "knative.dev/eventing/pkg/metrics/metricskey" - metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" -) - -// unregister, ehm, unregisters the metrics that were registered, by -// virtue of StatsReporter creation. -// Since golang executes test iterations within the same process, the stats reporter -// returns an error if the metric is already registered and the test panics. -func unregister() { - metricstest.Unregister("event_count") -} - -func TestStatsReporter(t *testing.T) { - args := &ReportArgs{ - ns: "testns", - eventType: "test-eventtype", - eventSource: "unit-test", - name: "testimporter", - } - - r, err := NewStatsReporter() - if err != nil { - t.Fatalf("Failed to create a new reporter: %v", err) - } - // Without this `go test ... -count=X`, where X > 1, fails, since - // we get an error about view already being registered. - defer unregister() - - wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskey.LabelEventType: "test-eventtype", - metricskey.LabelEventSource: "unit-test", - metricskey.LabelImporterName: "testimporter", - metricskey.LabelImporterResourceGroup: "cronjobsources.sources.eventing.knative.dev", - LabelResponseCode: "202", - LabelResponseCodeClass: "2xx", - } - - // test ReportEventCount - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - metricstest.CheckCountData(t, "event_count", wantTags, 2) - -} - -func TestReporterFor5xxResponse(t *testing.T) { - r, err := NewStatsReporter() - defer unregister() - - if err != nil { - t.Fatalf("Failed to create a new reporter: %v", err) - } - - args := &ReportArgs{ - ns: "testns", - eventType: "eventtype", - eventSource: "eventsource", - name: "testimporter", - } - - wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskeyEventing.LabelFilterResult: "success", - metricskey.LabelEventType: "eventtype", - metricskey.LabelEventSource: "eventsource", - metricskey.LabelImporterName: "testimporter", - metricskey.LabelImporterResourceGroup: "cronjobsources.sources.eventing.knative.dev", - LabelResponseCode: "500", - LabelResponseCodeClass: "5xx", - } - // test ReportEventCount - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusInternalServerError) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusInternalServerError) - }) - metricstest.CheckCountData(t, "event_count", wantTags, 2) -} - -func expectSuccess(t *testing.T, f func() error) { - t.Helper() - if err := f(); err != nil { - t.Errorf("Reporter expected success but got error: %v", err) - } -} diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index bafee1b7dec..4533cc09b84 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -30,7 +30,6 @@ import ( deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" - deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" ) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 46e8a2bee55..95c2e911a97 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -20,10 +20,11 @@ import ( "context" "errors" "fmt" - "knative.dev/pkg/metrics" "reflect" "time" + "knative.dev/pkg/metrics" + "github.com/robfig/cron" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -42,7 +43,6 @@ import ( "knative.dev/eventing/pkg/logging" "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/cronjobsource/resources" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/controller" pkgLogging "knative.dev/pkg/logging" ) @@ -208,12 +208,12 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro return nil, err } - loggingConfig, err := utils.LoggingConfigToBase64(r.loggingConfig) + loggingConfig, err := pkgLogging.LoggingConfigToJson(r.loggingConfig) if err != nil { logging.FromContext(ctx).Error("error while converting logging config to base64", zap.Any("receiveAdapter", err)) } - metricsConfig, err := utils.MetricsOptionsToBase64(r.metricsConfig) + metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig) if err != nil { logging.FromContext(ctx).Error("error while converting metrics config to base64", zap.Any("receiveAdapter", err)) } From 1f31661c760aac9305d75702a969a71b05d76a39 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Fri, 13 Sep 2019 22:20:20 +0200 Subject: [PATCH 4/9] Changed as per PR comments --- cmd/cronjob_receive_adapter/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/cronjob_receive_adapter/main.go b/cmd/cronjob_receive_adapter/main.go index 43be39c8b31..dc6183b9af7 100644 --- a/cmd/cronjob_receive_adapter/main.go +++ b/cmd/cronjob_receive_adapter/main.go @@ -76,7 +76,7 @@ func main() { // Convert json logging.Config to logging.Config. loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson) if err != nil { - fmt.Printf("[ERROR] failed to process logging config: %s", err.Error()) + fmt.Printf("[ERROR] failed to process logging config: %s", err) // Use default logging config. if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { // If this fails, there is no recovering. @@ -90,7 +90,7 @@ func main() { // Convert json metrics.ExporterOptions to metrics.ExporterOptions. metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson) if err != nil { - logger.Error("failed to process metrics options", zap.Error(err)) + logger.Fatal("failed to process metrics options", zap.Error(err)) } if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil { From 638cc31027893bdebf6513be9a1a9265df5b7426 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Fri, 13 Sep 2019 22:29:45 +0200 Subject: [PATCH 5/9] Added tests to validate metric --- pkg/adapter/cronjobevents/adapter_test.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/pkg/adapter/cronjobevents/adapter_test.go b/pkg/adapter/cronjobevents/adapter_test.go index 3616d776584..65a74d4e3c3 100644 --- a/pkg/adapter/cronjobevents/adapter_test.go +++ b/pkg/adapter/cronjobevents/adapter_test.go @@ -29,9 +29,12 @@ import ( "knative.dev/pkg/source" ) -type mockReporter struct{} +type mockReporter struct { + eventCount int +} func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error { + r.eventCount++ return nil } @@ -86,6 +89,7 @@ func TestStart_ServeHTTP(t *testing.T) { }() a.cronTick() // force a tick. + validateMetric(t, a.Reporter, 1) if tc.reqBody != string(h.body) { t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body) @@ -110,6 +114,8 @@ func TestStartBadCron(t *testing.T) { t.Errorf("failed to fail, %v", err) } + + validateMetric(t, a.Reporter, 0) } func TestPostMessage_ServeHTTP(t *testing.T) { @@ -152,6 +158,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) { if tc.reqBody != string(h.body) { t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body) } + validateMetric(t, a.Reporter, 1) }) } } @@ -219,3 +226,11 @@ func sinkAccepted(writer http.ResponseWriter, req *http.Request) { func sinkRejected(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusRequestTimeout) } + +func validateMetric(t *testing.T, reporter source.StatsReporter, want int) { + if mockReporter, ok := reporter.(*mockReporter); !ok { + t.Errorf("reporter is not a mockReporter") + } else if mockReporter.eventCount != want { + t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount) + } +} From a7e652468b693808c7d662a3431779d12374f9b8 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Fri, 13 Sep 2019 22:33:14 +0200 Subject: [PATCH 6/9] Renamed var --- pkg/reconciler/cronjobsource/controller.go | 2 +- pkg/reconciler/cronjobsource/cronjobsource.go | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index 4533cc09b84..9dbf0415b40 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -71,7 +71,7 @@ func NewController( deploymentLister: deploymentInformer.Lister(), eventTypeLister: eventTypeInformer.Lister(), env: *env, - context: ctx, + loggingContext: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) r.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 95c2e911a97..f86f0a6005c 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -67,7 +67,7 @@ type Reconciler struct { deploymentLister appsv1listers.DeploymentLister eventTypeLister eventinglisters.EventTypeLister - context context.Context + loggingContext context.Context sinkReconciler *duck.SinkReconciler loggingConfig *pkgLogging.Config metricsConfig *metrics.ExporterOptions @@ -347,11 +347,11 @@ func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) if err != nil { - logging.FromContext(r.context).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + logging.FromContext(r.loggingContext).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) return } r.loggingConfig = logcfg - logging.FromContext(r.context).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) } func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { @@ -364,5 +364,5 @@ func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { Component: component, ConfigMap: cfg.Data, } - logging.FromContext(r.context).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) + logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) } From d3ed9ffaa983a3549c37a05a17ce4e2280b6a55a Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sat, 14 Sep 2019 11:10:32 +0200 Subject: [PATCH 7/9] Update pkg/reconciler/cronjobsource/cronjobsource.go Co-Authored-By: Adam Harwayne --- pkg/reconciler/cronjobsource/cronjobsource.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index f86f0a6005c..c24df5eab5c 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -210,7 +210,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro loggingConfig, err := pkgLogging.LoggingConfigToJson(r.loggingConfig) if err != nil { - logging.FromContext(ctx).Error("error while converting logging config to base64", zap.Any("receiveAdapter", err)) + logging.FromContext(ctx).Error("error while converting logging config to JSON", zap.Any("receiveAdapter", err)) } metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig) From 0bf166217365c53c00760963425523458db87b10 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sat, 14 Sep 2019 11:11:23 +0200 Subject: [PATCH 8/9] Update pkg/reconciler/cronjobsource/cronjobsource.go Co-Authored-By: Adam Harwayne --- pkg/reconciler/cronjobsource/cronjobsource.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index c24df5eab5c..2ee03ae26c6 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -215,7 +215,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig) if err != nil { - logging.FromContext(ctx).Error("error while converting metrics config to base64", zap.Any("receiveAdapter", err)) + logging.FromContext(ctx).Error("error while converting metrics config to JSON", zap.Any("receiveAdapter", err)) } adapterArgs := resources.ReceiveAdapterArgs{ From 93c87372a4edec956cf13c4afc7889035cee8479 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sat, 14 Sep 2019 14:53:47 +0200 Subject: [PATCH 9/9] Added TODO --- pkg/reconciler/cronjobsource/cronjobsource.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 2ee03ae26c6..62d7f39175b 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -340,6 +340,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob return cj, err } +// TODO determine how to push the updated logging config to existing data plane Pods. func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { if cfg != nil { delete(cfg.Data, "_example") @@ -354,6 +355,7 @@ func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) } +// TODO determine how to push the updated metrics config to existing data plane Pods. func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { if cfg != nil { delete(cfg.Data, "_example")