diff --git a/cmd/cronjob_receive_adapter/main.go b/cmd/cronjob_receive_adapter/main.go index 189f492d997..dc6183b9af7 100644 --- a/cmd/cronjob_receive_adapter/main.go +++ b/cmd/cronjob_receive_adapter/main.go @@ -18,15 +18,18 @@ package main import ( "flag" + "fmt" "log" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "golang.org/x/net/context" "knative.dev/eventing/pkg/adapter/cronjobevents" "knative.dev/eventing/pkg/tracing" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" + "knative.dev/pkg/source" ) type envConfig struct { @@ -44,29 +47,67 @@ type envConfig struct { // Environment variable containing the namespace of the cron job. Namespace string `envconfig:"NAMESPACE" required:"true"` + + // MetricsConfigJson is a json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, + // the config is stored in a config map inside the controllers + // namespace and copied here. + MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigJson is a json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"` } +const ( + component = "cronjobsource" +) + func main() { flag.Parse() ctx := context.Background() - logCfg := zap.NewProductionConfig() - logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - dlogger, err := logCfg.Build() + var env envConfig + err := envconfig.Process("", &env) if err != nil { - log.Fatalf("Error building logger: %v", err) + panic(fmt.Sprintf("Error processing env var: %s", err)) + } + // Convert json logging.Config to logging.Config. + loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson) + if err != nil { + fmt.Printf("[ERROR] failed to process logging config: %s", err) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } + } + loggerSugared, _ := logging.NewLoggerFromConfig(loggingConfig, component) + logger := loggerSugared.Desugar() + defer flush(loggerSugared) + + // Convert json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson) + if err != nil { + logger.Fatal("failed to process metrics options", zap.Error(err)) + } + + if err := metrics.UpdateExporter(*metricsConfig, loggerSugared); err != nil { + logger.Error("failed to create the metrics exporter", zap.Error(err)) } - logger := dlogger.Sugar() - var env envConfig if err := envconfig.Process("", &env); err != nil { log.Fatal("Failed to process env var", zap.Error(err)) } - - if err = tracing.SetupStaticPublishing(logger, "cronjobsource", tracing.OnePercentSampling); err != nil { - // If tracing doesn't work, we will log an error, but allow the source to continue to + reporter, err := source.NewStatsReporter() + if err != nil { + logger.Error("error building statsreporter", zap.Error(err)) + } + if err = tracing.SetupStaticPublishing(loggerSugared, "cronjobsource", tracing.OnePercentSampling); err != nil { + // If tracing doesn't work, we will log an error, but allow the importer to continue to // start. - logger.Errorw("Error setting up trace publishing", err) + logger.Error("Error setting up trace publishing", zap.Error(err)) } adapter := &cronjobevents.Adapter{ @@ -75,6 +116,7 @@ func main() { SinkURI: env.Sink, Name: env.Name, Namespace: env.Namespace, + Reporter: reporter, } logger.Info("Starting Receive Adapter", zap.Any("adapter", adapter)) @@ -85,3 +127,8 @@ func main() { logger.Fatal("Failed to start adapter", zap.Error(err)) } } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index b75ff61b8aa..24758c0af36 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -127,3 +127,22 @@ target_label: pod - source_labels: [__meta_kubernetes_service_name] target_label: service + +# cronjob-source +- job_name: cronjob-source + scrape_interval: 3s + scrape_timeout: 3s + kubernetes_sd_configs: + - role: pod + relabel_configs: + # Scrape only the the targets matching the following metadata + - source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name] + action: keep + regex: cronjob-source-controller;metrics + # Rename metadata labels to be reader friendly + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_service_name] + target_label: service diff --git a/pkg/adapter/cronjobevents/adapter.go b/pkg/adapter/cronjobevents/adapter.go index 60c047a9146..6a99c2ffa8a 100644 --- a/pkg/adapter/cronjobevents/adapter.go +++ b/pkg/adapter/cronjobevents/adapter.go @@ -26,6 +26,7 @@ import ( sourcesv1alpha1 "knative.dev/eventing/pkg/apis/sources/v1alpha1" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/pkg/logging" + "knative.dev/pkg/source" ) // TODO: this should be a k8s cron. @@ -49,8 +50,14 @@ type Adapter struct { // client sends cloudevents. client cloudevents.Client + + Reporter source.StatsReporter } +const ( + resourceGroup = "cronjobsources.sources.eventing.knative.dev" +) + // Initialize cloudevent client func (a *Adapter) initClient() error { if a.client == nil { @@ -92,10 +99,20 @@ func (a *Adapter) cronTick() { event.SetType(sourcesv1alpha1.CronJobEventType) event.SetSource(sourcesv1alpha1.CronJobEventSource(a.Namespace, a.Name)) event.SetData(message(a.Data)) + reportArgs := &source.ReportArgs{ + Namespace: a.Namespace, + EventSource: event.Source(), + EventType: event.Type(), + Name: a.Name, + ResourceGroup: resourceGroup, + } - if _, _, err := a.client.Send(context.TODO(), event); err != nil { - logger.Error("failed to send cloudevent", err) + rctx, _, err := a.client.Send(context.TODO(), event) + rtctx := cloudevents.HTTPTransportContextFrom(rctx) + if err != nil { + logger.Error("failed to send cloudevent", zap.Error(err)) } + a.Reporter.ReportEventCount(reportArgs, rtctx.StatusCode) } type Message struct { diff --git a/pkg/adapter/cronjobevents/adapter_test.go b/pkg/adapter/cronjobevents/adapter_test.go index ae9ca48dd42..65a74d4e3c3 100644 --- a/pkg/adapter/cronjobevents/adapter_test.go +++ b/pkg/adapter/cronjobevents/adapter_test.go @@ -26,8 +26,18 @@ import ( "testing" "github.com/google/go-cmp/cmp" + "knative.dev/pkg/source" ) +type mockReporter struct { + eventCount int +} + +func (r *mockReporter) ReportEventCount(args *source.ReportArgs, responseCode int) error { + r.eventCount++ + return nil +} + func TestStart_ServeHTTP(t *testing.T) { testCases := map[string]struct { schedule string @@ -55,10 +65,12 @@ func TestStart_ServeHTTP(t *testing.T) { sinkServer := httptest.NewServer(h) defer sinkServer.Close() + r := &mockReporter{} a := &Adapter{ Schedule: tc.schedule, Data: "data", SinkURI: sinkServer.URL, + Reporter: r, } if err := a.initClient(); err != nil { @@ -77,6 +89,7 @@ func TestStart_ServeHTTP(t *testing.T) { }() a.cronTick() // force a tick. + validateMetric(t, a.Reporter, 1) if tc.reqBody != string(h.body) { t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body) @@ -89,8 +102,10 @@ func TestStart_ServeHTTP(t *testing.T) { func TestStartBadCron(t *testing.T) { schedule := "bad" + r := &mockReporter{} a := &Adapter{ Schedule: schedule, + Reporter: r, } stop := make(chan struct{}) @@ -99,6 +114,8 @@ func TestStartBadCron(t *testing.T) { t.Errorf("failed to fail, %v", err) } + + validateMetric(t, a.Reporter, 0) } func TestPostMessage_ServeHTTP(t *testing.T) { @@ -125,9 +142,11 @@ func TestPostMessage_ServeHTTP(t *testing.T) { sinkServer := httptest.NewServer(h) defer sinkServer.Close() + r := &mockReporter{} a := &Adapter{ - Data: "data", - SinkURI: sinkServer.URL, + Data: "data", + SinkURI: sinkServer.URL, + Reporter: r, } if err := a.initClient(); err != nil { @@ -139,6 +158,7 @@ func TestPostMessage_ServeHTTP(t *testing.T) { if tc.reqBody != string(h.body) { t.Errorf("expected request body %q, but got %q", tc.reqBody, h.body) } + validateMetric(t, a.Reporter, 1) }) } } @@ -206,3 +226,11 @@ func sinkAccepted(writer http.ResponseWriter, req *http.Request) { func sinkRejected(writer http.ResponseWriter, _ *http.Request) { writer.WriteHeader(http.StatusRequestTimeout) } + +func validateMetric(t *testing.T, reporter source.StatsReporter, want int) { + if mockReporter, ok := reporter.(*mockReporter); !ok { + t.Errorf("reporter is not a mockReporter") + } else if mockReporter.eventCount != want { + t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount) + } +} diff --git a/pkg/reconciler/cronjobsource/controller.go b/pkg/reconciler/cronjobsource/controller.go index 5109b8fcce1..9dbf0415b40 100644 --- a/pkg/reconciler/cronjobsource/controller.go +++ b/pkg/reconciler/cronjobsource/controller.go @@ -30,6 +30,8 @@ import ( deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" ) const ( @@ -69,6 +71,7 @@ func NewController( deploymentLister: deploymentInformer.Lister(), eventTypeLister: eventTypeInformer.Lister(), env: *env, + loggingContext: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) r.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey) @@ -85,6 +88,8 @@ func NewController( FilterFunc: controller.Filter(v1alpha1.SchemeGroupVersion.WithKind("CronJobSource")), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) return impl } diff --git a/pkg/reconciler/cronjobsource/controller_test.go b/pkg/reconciler/cronjobsource/controller_test.go index 4bb70ce8d0b..a799ad95e68 100644 --- a/pkg/reconciler/cronjobsource/controller_test.go +++ b/pkg/reconciler/cronjobsource/controller_test.go @@ -20,10 +20,11 @@ import ( "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" - // Fake injection informers _ "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype/fake" _ "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/cronjobsource/fake" @@ -51,6 +52,15 @@ func TestNew(t *testing.T) { t.Fatalf("Failed to unset env var: %v", err) } }() + + if err := os.Setenv("METRICS_DOMAIN", "knative.dev/eventing"); err != nil { + t.Fatalf("Failed to set env var: %v", err) + } + defer func() { + if err := os.Unsetenv("METRICS_DOMAIN"); err != nil { + t.Fatalf("Failed to unset env var: %v", err) + } + }() } else { defer func() { r := recover() @@ -61,7 +71,27 @@ func TestNew(t *testing.T) { } ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewFixedWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/cronjobsource/cronjobsource.go b/pkg/reconciler/cronjobsource/cronjobsource.go index 0f33366c51c..62d7f39175b 100644 --- a/pkg/reconciler/cronjobsource/cronjobsource.go +++ b/pkg/reconciler/cronjobsource/cronjobsource.go @@ -23,6 +23,8 @@ import ( "reflect" "time" + "knative.dev/pkg/metrics" + "github.com/robfig/cron" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" @@ -42,6 +44,7 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/cronjobsource/resources" "knative.dev/pkg/controller" + pkgLogging "knative.dev/pkg/logging" ) const ( @@ -51,6 +54,7 @@ const ( cronJobUpdateStatusFailed = "CronJobSourceUpdateStatusFailed" cronJobSourceDeploymentCreated = "CronJobSurceDeploymentCreated" cronJobSourceDeploymentUpdated = "CronJobSourceDeploymentUpdated" + component = "cronjobsource" ) type Reconciler struct { @@ -63,7 +67,10 @@ type Reconciler struct { deploymentLister appsv1listers.DeploymentLister eventTypeLister eventinglisters.EventTypeLister + loggingContext context.Context sinkReconciler *duck.SinkReconciler + loggingConfig *pkgLogging.Config + metricsConfig *metrics.ExporterOptions } // Check that our Reconciler implements controller.Reconciler @@ -201,11 +208,23 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Cro return nil, err } + loggingConfig, err := pkgLogging.LoggingConfigToJson(r.loggingConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting logging config to JSON", zap.Any("receiveAdapter", err)) + } + + metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig) + if err != nil { + logging.FromContext(ctx).Error("error while converting metrics config to JSON", zap.Any("receiveAdapter", err)) + } + adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.env.Image, - Source: src, - Labels: resources.Labels(src.Name), - SinkURI: sinkURI, + Image: r.env.Image, + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + LoggingConfig: loggingConfig, + MetricsConfig: metricsConfig, } expected := resources.MakeReceiveAdapter(&adapterArgs) @@ -320,3 +339,32 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.CronJob return cj, err } + +// TODO determine how to push the updated logging config to existing data plane Pods. +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) + if err != nil { + logging.FromContext(r.loggingContext).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + logging.FromContext(r.loggingContext).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) +} + +// TODO determine how to push the updated metrics config to existing data plane Pods. +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + logging.FromContext(r.loggingContext).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) +} diff --git a/pkg/reconciler/cronjobsource/resources/labels.go b/pkg/reconciler/cronjobsource/resources/labels.go index 6ff6e1c7a76..b2bc3eb7101 100644 --- a/pkg/reconciler/cronjobsource/resources/labels.go +++ b/pkg/reconciler/cronjobsource/resources/labels.go @@ -16,9 +16,16 @@ limitations under the License. package resources +const ( + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "cronjob-source-controller" +) + // Labels are the labels attached to all resources based on a CronJobSource. func Labels(name string) map[string]string { return map[string]string{ "sources.eventing.knative.dev/cronJobSource": name, + "eventing.knative.dev/source": controllerAgentName, } } diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter.go b/pkg/reconciler/cronjobsource/resources/receive_adapter.go index 510d4aadf73..a134de04711 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter.go @@ -36,15 +36,18 @@ var ( // ReceiveAdapterArgs are the arguments needed to create a Cron Job Source Receive Adapter. Every // field is required. type ReceiveAdapterArgs struct { - Image string - Source *v1alpha1.CronJobSource - Labels map[string]string - SinkURI string + Image string + Source *v1alpha1.CronJobSource + Labels map[string]string + SinkURI string + MetricsConfig string + LoggingConfig string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for // Cron Job Sources. func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { + name := args.Source.ObjectMeta.Name RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU) if err != nil { RequestResourceCPU = resource.MustParse("250m") @@ -76,7 +79,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, - Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", args.Source.Name)), + Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("cronjobsource-%s", name)), Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), @@ -97,6 +100,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, + Ports: []corev1.ContainerPort{ + { + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SCHEDULE", @@ -117,6 +125,15 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "NAMESPACE", Value: args.Source.Namespace, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: args.MetricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: args.LoggingConfig, }, }, Resources: res, diff --git a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go index b054f09ad17..5d0bb0e1626 100644 --- a/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go +++ b/pkg/reconciler/cronjobsource/resources/receive_adapter_test.go @@ -93,6 +93,10 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "receive-adapter", Image: "test-image", + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SCHEDULE", @@ -114,6 +118,18 @@ func TestMakeReceiveAdapter(t *testing.T) { Name: "NAMESPACE", Value: "source-namespace", }, + { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, + { + Name: "K_METRICS_CONFIG", + Value: "", + }, + { + Name: "K_LOGGING_CONFIG", + Value: "", + }, }, Resources: corev1.ResourceRequirements{ Limits: corev1.ResourceList{