From 2b202a48bbff5f6a09ed40d2c64678298b085db4 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Thu, 29 Aug 2019 18:07:54 +0200 Subject: [PATCH 1/9] Added metrics endpoint --- cmd/apiserver_receive_adapter/main.go | 64 +++++++-- pkg/adapter/apiserver/adapter.go | 25 ++-- pkg/adapter/apiserver/ref.go | 8 ++ pkg/adapter/apiserver/resource.go | 14 +- pkg/adapter/apiserver/stats_reporter.go | 122 ++++++++++++++++ .../apiserversource/apiserversource.go | 44 +++++- pkg/reconciler/apiserversource/controller.go | 6 + .../apiserversource/resources/config.go | 134 ++++++++++++++++++ .../resources/receive_adapter.go | 24 +++- 9 files changed, 406 insertions(+), 35 deletions(-) create mode 100644 pkg/adapter/apiserver/stats_reporter.go create mode 100644 pkg/reconciler/apiserversource/resources/config.go diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 04b083391c7..cf09b79cfc7 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -18,14 +18,15 @@ package main import ( "flag" - "log" + "fmt" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "knative.dev/eventing/pkg/reconciler/apiserversource/resources" + "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - "go.uber.org/zap/zapcore" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" @@ -33,9 +34,15 @@ import ( "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "knative.dev/pkg/signals" ) +const ( + component = "ApiServerSource::ReceiveAdapter" +) + var ( masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.") kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.") @@ -48,6 +55,15 @@ type envConfig struct { ApiVersion []string `split_words:"true" required:"true"` Kind []string `required:"true"` Controller []bool `required:"true"` + // MetricsConfigBase64 is a base64 encoded json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, the config is + // stored in a config map inside the controllers namespace and copied here. + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` } // TODO: the controller should take the list of GVR @@ -55,20 +71,33 @@ type envConfig struct { func main() { flag.Parse() - logCfg := zap.NewProductionConfig() - logCfg.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - dlogger, err := logCfg.Build() + var env envConfig + err := envconfig.Process("", &env) if err != nil { - log.Fatalf("Error building logger: %v", err) + panic(fmt.Sprintf("Error processing env var: %s", err)) } - logger := dlogger.Sugar() - var env envConfig - err = envconfig.Process("", &env) + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig, err := resources.Base64ToLoggingConfig(env.LoggingConfigBase64) if err != nil { - logger.Fatalw("Error processing environment", zap.Error(err)) + panic(err) } + // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig, err := resources.Base64ToMetricsOptions(env.MetricsConfigBase64) + if err != nil { + panic(err) + } + + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) + + if err := metrics.UpdateExporter(*metricsConfig, logger); err != nil { + logger.Fatalf("Failed to create the metrics exporter: %v", err) + } + + reporter, err := apiserver.NewStatsReporter() + // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -77,9 +106,7 @@ func main() { logger.Fatalw("Error building kubeconfig", zap.Error(err)) } - logger = logger.With(zap.String("controller/apiserver", "adapter")) logger.Info("Starting the controller") - client, err := dynamic.NewForConfig(cfg) if err != nil { logger.Fatalw("Error building dynamic client", zap.Error(err)) @@ -120,9 +147,18 @@ func main() { GVRCs: gvrcs, } - a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt) - logger.Info("starting kubernetes api adapter") + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt, reporter) + logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) } } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} + +// func mainMetrics(logger *zap.SugaredLogger, opts *metrics.ExporterOptions) { + +// } diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index d91985a09a7..4b2034f38f0 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -66,9 +66,10 @@ type adapter struct { mode string delegate eventDelegate + reporter StatsReporter } -func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options) Adapter { +func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options, reporter StatsReporter) Adapter { mode := opt.Mode switch mode { case ResourceMode, RefMode: @@ -87,6 +88,7 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents gvrcs: opt.GVRCs, namespace: opt.Namespace, mode: mode, + reporter: reporter, } return a } @@ -101,21 +103,28 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { stop := make(chan struct{}) resyncPeriod := time.Duration(10 * time.Hour) - + reportArgs := &ReportArgs{ + ns: a.namespace, + } + a.reporter.ReportEventCount(reportArgs, nil) var d eventDelegate switch a.mode { case ResourceMode: d = &resource{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, } case RefMode: d = &ref{ - ce: a.ce, - source: a.source, - logger: a.logger, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, } default: diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 9fcbcede367..f33381eb17f 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -18,6 +18,7 @@ package apiserver import ( "context" + "fmt" "reflect" "k8s.io/apimachinery/pkg/api/meta" @@ -36,6 +37,8 @@ type ref struct { logger *zap.SugaredLogger controlledGVRs []schema.GroupVersionResource + reporter StatsReporter + namespace string } var _ cache.Store = (*ref)(nil) @@ -105,6 +108,11 @@ func (a *ref) Delete(obj interface{}) error { } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { + fmt.Printf("Are we here?\n", gvr) + reportArgs := &ReportArgs{ + ns: a.namespace, + } + a.reporter.ReportEventCount(reportArgs, nil) if a.controlledGVRs == nil { a.controlledGVRs = []schema.GroupVersionResource{gvr} return diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 043327df91a..c367e1e18e6 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -18,6 +18,7 @@ package apiserver import ( "context" + "fmt" cloudevents "github.com/cloudevents/sdk-go" "go.uber.org/zap" @@ -27,9 +28,11 @@ import ( ) type resource struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger + ce cloudevents.Client + source string + logger *zap.SugaredLogger + reporter StatsReporter + namespace string } var _ cache.Store = (*resource)(nil) @@ -81,6 +84,11 @@ func (a *resource) Delete(obj interface{}) error { func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { // not supported for resource. + fmt.Printf("Are we here?\n", gvr) + reportArgs := &ReportArgs{ + ns: a.namespace, + } + a.reporter.ReportEventCount(reportArgs, nil) a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) } diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go new file mode 100644 index 00000000000..d066f5a01e8 --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -0,0 +1,122 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package apiserver + +import ( + "context" + + "go.opencensus.io/stats" + "go.opencensus.io/stats/view" + "go.opencensus.io/tag" + utils "knative.dev/eventing/pkg/broker" + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics" +) + +var ( + // eventCountM is a counter which records the number of events received + // by a Trigger. + eventCountM = stats.Int64( + "event_count", + "Number of events created", + stats.UnitDimensionless, + ) +) + +type ReportArgs struct { + ns string + eventType string + eventSource string +} + +// StatsReporter defines the interface for sending filter metrics. +type StatsReporter interface { + ReportEventCount(args *ReportArgs, err error) error +} + +var _ StatsReporter = (*reporter)(nil) + +// reporter holds cached metric objects to report filter metrics. +type reporter struct { + namespaceTagKey tag.Key + resultKey tag.Key + filterResultKey tag.Key +} + +// NewStatsReporter creates a reporter that collects and reports filter metrics. +func NewStatsReporter() (StatsReporter, error) { + var r = &reporter{} + + // Create the tag keys that will be used to add tags to our measurements. + nsTag, err := tag.NewKey(metricskey.NamespaceName) + if err != nil { + return nil, err + } + r.namespaceTagKey = nsTag + + filterResultTag, err := tag.NewKey(metricskey.FilterResult) + if err != nil { + return nil, err + } + r.filterResultKey = filterResultTag + resultTag, err := tag.NewKey(metricskey.Result) + if err != nil { + return nil, err + } + r.resultKey = resultTag + + // Create view to see our measurements. + err = view.Register( + &view.View{ + Description: eventCountM.Description(), + Measure: eventCountM, + // TODO count or sum aggregation? + Aggregation: view.Count(), + TagKeys: []tag.Key{r.namespaceTagKey, r.resultKey}, + }, + ) + if err != nil { + return nil, err + } + + return r, nil +} + +// ReportEventCount captures the event count. +func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) + if err != nil { + return err + } + metrics.Record(ctx, eventCountM.M(1)) + return nil +} + +func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) { + // Note that eventType and eventSource can be empty strings, so they need a special treatment. + return tag.New( + context.Background(), + tag.Insert(r.namespaceTagKey, args.ns), + t) +} + +func valueOrAny(v string) string { + if v != "" { + return v + } + return metricskey.Any +} diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index d3c319f7eb0..da77b30dd76 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -43,6 +43,8 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/eventing/pkg/reconciler/apiserversource/resources" "knative.dev/eventing/pkg/utils" + pkgLogging "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" ) const ( @@ -56,6 +58,8 @@ const ( // raImageEnvVar is the name of the environment variable that contains the receive adapter's // image. It must be defined. raImageEnvVar = "APISERVER_RA_IMAGE" + + component = "apiserver-source-controller" ) var ( @@ -87,6 +91,9 @@ type Reconciler struct { source string sinkReconciler *duck.SinkReconciler + context context.Context + loggingConfig *pkgLogging.Config + metricsConfig *metrics.ExporterOptions } // Reconcile compares the actual state with the desired, and attempts to @@ -187,10 +194,12 @@ func (r *Reconciler) getReceiveAdapterImage() string { func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) { adapterArgs := resources.ReceiveAdapterArgs{ - Image: r.getReceiveAdapterImage(), - Source: src, - Labels: resources.Labels(src.Name), - SinkURI: sinkURI, + Image: r.getReceiveAdapterImage(), + Source: src, + Labels: resources.Labels(src.Name), + SinkURI: sinkURI, + LoggingConfig: resources.LoggingConfigToBase64(r.loggingConfig), + MetricsConfig: resources.MetricsOptionsToBase64(r.metricsConfig), } expected := resources.MakeReceiveAdapter(&adapterArgs) @@ -374,3 +383,30 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.ApiServ return cj, err } + +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := pkgLogging.NewConfigFromConfigMap(cfg) + if err != nil { + logging.FromContext(r.context).Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + logging.FromContext(r.context).Info("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + logging.FromContext(r.context).Info("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) +} diff --git a/pkg/reconciler/apiserversource/controller.go b/pkg/reconciler/apiserversource/controller.go index df8dc477de9..9cb6a7165eb 100644 --- a/pkg/reconciler/apiserversource/controller.go +++ b/pkg/reconciler/apiserversource/controller.go @@ -25,10 +25,12 @@ import ( "knative.dev/eventing/pkg/reconciler" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" + "knative.dev/pkg/metrics" eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" apiserversourceinformer "knative.dev/eventing/pkg/client/injection/informers/sources/v1alpha1/apiserversource" deploymentinformer "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment" + "knative.dev/pkg/logging" ) const ( @@ -57,6 +59,7 @@ func NewController( apiserversourceLister: apiServerSourceInformer.Lister(), deploymentLister: deploymentInformer.Lister(), source: GetCfgHost(ctx), + context: ctx, } impl := controller.NewImpl(r, r.Logger, ReconcilerName) @@ -76,5 +79,8 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) + cmw.Watch(logging.ConfigMapName(), r.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), r.UpdateFromMetricsConfigMap) + return impl } diff --git a/pkg/reconciler/apiserversource/resources/config.go b/pkg/reconciler/apiserversource/resources/config.go new file mode 100644 index 00000000000..9e411ac830f --- /dev/null +++ b/pkg/reconciler/apiserversource/resources/config.go @@ -0,0 +1,134 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package resources + +import ( + "encoding/json" + "errors" + "fmt" + "strconv" + + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" +) + +var zapLoggerConfig = "zap-logger-config" + +// Base64ToMetricsOptions converts a json+base64 string of a +// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. +func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { + var opts metrics.ExporterOptions + if base64 == "" { + return nil, errors.New("base64 metrics string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &opts); err != nil { + return nil, err + } + + return &opts, nil +} + +// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 +// string. +func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { + if opts == nil { + return "" + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Opts, err := json.Marshal(jsonOpts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Opts)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} + +// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. +// Returns a non-nil logging.Config always. +func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { + if base64 == "" { + return nil, errors.New("base64 logging string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + var configMap map[string]string + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &configMap); err != nil { + return nil, err + } + + cfg, err := logging.NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } + } + return cfg, nil +} + +// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. +func LoggingConfigToBase64(cfg *logging.Config) string { + if cfg == nil || cfg.LoggingConfig == "" { + return "" + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Cfg, err := json.Marshal(jsonCfg) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Cfg)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index a4b961bf2ee..564b916dee6 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -30,10 +30,12 @@ import ( // ReceiveAdapterArgs are the arguments needed to create a ApiServer Receive Adapter. // Every field is required. type ReceiveAdapterArgs struct { - Image string - Source *v1alpha1.ApiServerSource - Labels map[string]string - SinkURI string + Image string + Source *v1alpha1.ApiServerSource + Labels map[string]string + SinkURI string + MetricsConfig string + LoggingConfig string } // MakeReceiveAdapter generates (but does not insert into K8s) the Receive Adapter Deployment for @@ -67,7 +69,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, - Env: makeEnv(args.SinkURI, &args.Source.Spec), + Env: makeEnv(args.SinkURI, args.LoggingConfig, args.MetricsConfig, &args.Source.Spec), + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, }, }, }, @@ -76,7 +82,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { } } -func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { +func makeEnv(sinkURI, loggingConfig, metricsConfig string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { apiversions := "" kinds := "" controlled := "" @@ -115,5 +121,11 @@ func makeEnv(sinkURI string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar FieldPath: "metadata.namespace", }, }, + }, { + Name: "K_METRICS_CONFIG", + Value: metricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: loggingConfig, }} } From 1d4183706d4778d4fa41fcdefa0207ccbea2aed4 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Thu, 29 Aug 2019 23:26:35 +0200 Subject: [PATCH 2/9] Added event type and event source --- cmd/apiserver_receive_adapter/main.go | 41 ++++--- .../100-prometheus-scrape-kn-eventing.yaml | 18 +++ pkg/adapter/apiserver/adapter.go | 4 - pkg/adapter/apiserver/adapter_test.go | 52 +++++--- pkg/adapter/apiserver/metrics.go | 25 ++++ pkg/adapter/apiserver/ref.go | 46 ++++---- pkg/adapter/apiserver/ref_test.go | 20 ++-- pkg/adapter/apiserver/resource.go | 33 +++--- pkg/adapter/apiserver/resource_test.go | 16 +-- pkg/adapter/apiserver/stats_reporter.go | 21 +++- pkg/adapter/apiserver/stats_reporter_test.go | 111 ++++++++++++++++++ .../apiserversource/apiserversource.go | 2 +- .../apiserversource/controller_test.go | 26 +++- .../resources/receive_adapter.go | 3 + .../resources/receive_adapter_test.go | 14 +++ 15 files changed, 330 insertions(+), 102 deletions(-) create mode 100644 pkg/adapter/apiserver/metrics.go create mode 100644 pkg/adapter/apiserver/stats_reporter_test.go diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index cf09b79cfc7..ad72fef2164 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -20,7 +20,8 @@ import ( "flag" "fmt" - // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // Uncomment the following line to load the gcp plugin + // (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "knative.dev/eventing/pkg/reconciler/apiserversource/resources" @@ -55,9 +56,10 @@ type envConfig struct { ApiVersion []string `split_words:"true" required:"true"` Kind []string `required:"true"` Controller []bool `required:"true"` - // MetricsConfigBase64 is a base64 encoded json string of metrics.ExporterOptions. - // This is used to configure the metrics exporter options, the config is - // stored in a config map inside the controllers namespace and copied here. + // MetricsConfigBase64 is a base64 encoded json string of + // metrics.ExporterOptions. This is used to configure the metrics exporter + // options, the config is stored in a config map inside the controllers + // namespace and copied here. MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` // LoggingConfigBase64 is a base64 encoded json string of logging.Config. @@ -78,13 +80,16 @@ func main() { } // Convert base64 encoded json logging.Config to logging.Config. - loggingConfig, err := resources.Base64ToLoggingConfig(env.LoggingConfigBase64) + loggingConfig, err := resources.Base64ToLoggingConfig( + env.LoggingConfigBase64) if err != nil { panic(err) } - // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. - metricsConfig, err := resources.Base64ToMetricsOptions(env.MetricsConfigBase64) + // Convert base64 encoded json metrics.ExporterOptions to + // metrics.ExporterOptions. + metricsConfig, err := resources.Base64ToMetricsOptions( + env.MetricsConfigBase64) if err != nil { panic(err) } @@ -97,6 +102,9 @@ func main() { } reporter, err := apiserver.NewStatsReporter() + if err != nil { + logger.Fatalw("Error building statsreporter", zap.Error(err)) + } // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() @@ -112,9 +120,10 @@ func main() { logger.Fatalw("Error building dynamic client", zap.Error(err)) } - if err = tracing.SetupStaticPublishing(logger, "apiserversource", tracing.OnePercentSampling); err != nil { - // If tracing doesn't work, we will log an error, but allow the importer to continue to - // start. + if err = tracing.SetupStaticPublishing(logger, "apiserversource", + tracing.OnePercentSampling); err != nil { + // If tracing doesn't work, we will log an error, but allow the importer + // to continue to start. logger.Error("Error setting up trace publishing", zap.Error(err)) } @@ -134,7 +143,10 @@ func main() { logger.Fatalw("Error parsing APIVersion", zap.Error(err)) } // TODO: pass down the resource and the kind so we do not have to guess. - gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{Kind: kind, Group: gv.Group, Version: gv.Version}) + gvr, _ := meta.UnsafeGuessKindToResource(schema.GroupVersionKind{ + Kind: kind, + Group: gv.Group, + Version: gv.Version}) gvrcs = append(gvrcs, apiserver.GVRC{ GVR: gvr, Controller: controlled, @@ -147,7 +159,8 @@ func main() { GVRCs: gvrcs, } - a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt, reporter) + a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt, + reporter) logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) @@ -158,7 +171,3 @@ func flush(logger *zap.SugaredLogger) { _ = logger.Sync() metrics.FlushExporter() } - -// func mainMetrics(logger *zap.SugaredLogger, opts *metrics.ExporterOptions) { - -// } diff --git a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml index 02641e717f9..b75ff61b8aa 100644 --- a/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml +++ b/config/monitoring/metrics/prometheus/100-prometheus-scrape-kn-eventing.yaml @@ -109,3 +109,21 @@ - source_labels: [__meta_kubernetes_service_name] target_label: service +# apiserver-source +- job_name: apiserver-source + scrape_interval: 3s + scrape_timeout: 3s + kubernetes_sd_configs: + - role: pod + relabel_configs: + # Scrape only the the targets matching the following metadata + - source_labels: [ __meta_kubernetes_pod_label_eventing_knative_dev_source, __meta_kubernetes_pod_container_port_name] + action: keep + regex: apiserver-source-controller;metrics + # Rename metadata labels to be reader friendly + - source_labels: [__meta_kubernetes_namespace] + target_label: namespace + - source_labels: [__meta_kubernetes_pod_name] + target_label: pod + - source_labels: [__meta_kubernetes_service_name] + target_label: service diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 4b2034f38f0..b35c65defc1 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -103,10 +103,6 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { stop := make(chan struct{}) resyncPeriod := time.Duration(10 * time.Hour) - reportArgs := &ReportArgs{ - ns: a.namespace, - } - a.reporter.ReportEventCount(reportArgs, nil) var d eventDelegate switch a.mode { case ResourceMode: diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index df3f4bfa27b..4341200357f 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -108,8 +108,11 @@ func TestNewAdaptor(t *testing.T) { } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - - a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt) + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r) got, ok := a.(*adapter) if !ok { @@ -147,10 +150,14 @@ func TestAdapter_StartRef(t *testing.T) { }, }}, } + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } - a := NewAdaptor(source, k8s, ce, logger, opt) + a := NewAdaptor(source, k8s, ce, logger, opt, r) - err := errors.New("test never ran") + err = errors.New("test never ran") stopCh := make(chan struct{}) done := make(chan struct{}) go func() { @@ -182,10 +189,13 @@ func TestAdapter_StartResource(t *testing.T) { }, }}, } + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + a := NewAdaptor(source, k8s, ce, logger, opt, r) - a := NewAdaptor(source, k8s, ce, logger, opt) - - err := errors.New("test never ran") + err = errors.New("test never ran") stopCh := make(chan struct{}) done := make(chan struct{}) go func() { @@ -264,26 +274,34 @@ func validateNotSent(t *testing.T, ce *kncetesting.TestCloudEventsClient, want s } } -func makeResourceAndTestingClient() (*resource, *kncetesting.TestCloudEventsClient) { +func makeResourceAndTestingClient(t *testing.T) (*resource, *kncetesting.TestCloudEventsClient) { ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } return &resource{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } -func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { +func makeRefAndTestingClient(t *testing.T) (*ref, *kncetesting.TestCloudEventsClient) { ce := kncetesting.NewTestClient() source := "unit-test" logger := zap.NewExample().Sugar() - + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } return &ref{ - ce: ce, - source: source, - logger: logger, + ce: ce, + source: source, + logger: logger, + reporter: r, }, ce } diff --git a/pkg/adapter/apiserver/metrics.go b/pkg/adapter/apiserver/metrics.go new file mode 100644 index 00000000000..ce91a166007 --- /dev/null +++ b/pkg/adapter/apiserver/metrics.go @@ -0,0 +1,25 @@ +/* + * Copyright 2019 The Knative Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package apiserver + +// Type gets the eventtype. +func Result(err error) string { + if err != nil { + return "error" + } + return "success" +} diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index f33381eb17f..220c1184694 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -18,7 +18,6 @@ package apiserver import ( "context" - "fmt" "reflect" "k8s.io/apimachinery/pkg/api/meta" @@ -32,9 +31,10 @@ import ( ) type ref struct { - ce cloudevents.Client - source string - logger *zap.SugaredLogger + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger controlledGVRs []schema.GroupVersionResource reporter StatsReporter @@ -70,11 +70,7 @@ func (a *ref) Add(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event, a.reporter) } // Implements cache.Store @@ -85,11 +81,7 @@ func (a *ref) Update(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event, a.reporter) } // Implements cache.Store @@ -100,17 +92,14 @@ func (a *ref) Delete(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - return nil + return a.sendEvent(context.Background(), event, a.reporter) } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { - fmt.Printf("Are we here?\n", gvr) reportArgs := &ReportArgs{ - ns: a.namespace, + ns: a.namespace, + eventSource: a.source, + eventType: a.eventType, } a.reporter.ReportEventCount(reportArgs, nil) if a.controlledGVRs == nil { @@ -120,6 +109,21 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { a.controlledGVRs = append(a.controlledGVRs, gvr) } +func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + } + a.reporter.ReportEventCount(reportArgs, nil) + + if _, err := a.ce.Send(ctx, *event); err != nil { + a.logger.Info("event delivery failed", zap.Error(err)) + return err + } + return nil +} + // Stub cache.Store impl // Implements cache.Store diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go index 60a95b10aeb..7c4cc44296e 100644 --- a/pkg/adapter/apiserver/ref_test.go +++ b/pkg/adapter/apiserver/ref_test.go @@ -8,43 +8,43 @@ import ( ) func TestRefAddEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) } func TestRefUpdateEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) } func TestRefDeleteEvent(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) } func TestRefAddEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) } func TestRefUpdateEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) } func TestRefDeleteEventNil(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) } func TestRefAddEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -55,7 +55,7 @@ func TestRefAddEventAsController(t *testing.T) { } func TestRefUpdateEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -66,7 +66,7 @@ func TestRefUpdateEventAsController(t *testing.T) { } func TestRefDeleteEventAsController(t *testing.T) { - d, ce := makeRefAndTestingClient() + d, ce := makeRefAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{ Group: "", Version: "v1", @@ -78,7 +78,7 @@ func TestRefDeleteEventAsController(t *testing.T) { // HACKHACKHACK For test coverage. func TestRefStub(t *testing.T) { - d, _ := makeRefAndTestingClient() + d, _ := makeRefAndTestingClient(t) d.List() d.ListKeys() diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index c367e1e18e6..ac5e31c4794 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -18,7 +18,6 @@ package apiserver import ( "context" - "fmt" cloudevents "github.com/cloudevents/sdk-go" "go.uber.org/zap" @@ -30,6 +29,7 @@ import ( type resource struct { ce cloudevents.Client source string + eventType string logger *zap.SugaredLogger reporter StatsReporter namespace string @@ -44,12 +44,7 @@ func (a *resource) Add(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } - - return nil + return a.sendEvent(context.Background(), event, a.reporter) } func (a *resource) Update(obj interface{}) error { @@ -59,10 +54,7 @@ func (a *resource) Update(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { - a.logger.Info("event delivery failed", zap.Error(err)) - return err - } + return a.sendEvent(context.Background(), event, a.reporter) return nil } @@ -74,21 +66,26 @@ func (a *resource) Delete(obj interface{}) error { return err } - if _, err := a.ce.Send(context.Background(), *event); err != nil { + return a.sendEvent(context.Background(), event, a.reporter) +} + +func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { + reportArgs := &ReportArgs{ + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + } + a.reporter.ReportEventCount(reportArgs, nil) + + if _, err := a.ce.Send(ctx, *event); err != nil { a.logger.Info("event delivery failed", zap.Error(err)) return err } - return nil } func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { // not supported for resource. - fmt.Printf("Are we here?\n", gvr) - reportArgs := &ReportArgs{ - ns: a.namespace, - } - a.reporter.ReportEventCount(reportArgs, nil) a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) } diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go index 0a7d75151fd..fedc1219059 100644 --- a/pkg/adapter/apiserver/resource_test.go +++ b/pkg/adapter/apiserver/resource_test.go @@ -8,49 +8,49 @@ import ( ) func TestResourceAddEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) } func TestResourceUpdateEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) } func TestResourceDeleteEvent(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) } func TestResourceAddEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) } func TestResourceUpdateEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) } func TestResourceDeleteEventNil(t *testing.T) { - d, ce := makeResourceAndTestingClient() + d, ce := makeResourceAndTestingClient(t) d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) } func TestResourceCoverageHacks(t *testing.T) { - d, _ := makeResourceAndTestingClient() + d, _ := makeResourceAndTestingClient(t) d.addControllerWatch(schema.GroupVersionResource{}) // for coverage. } // HACKHACKHACK For test coverage. func TestResourceStub(t *testing.T) { - d, _ := makeResourceAndTestingClient() + d, _ := makeResourceAndTestingClient(t) d.List() d.ListKeys() diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go index d066f5a01e8..a24ad83d03a 100644 --- a/pkg/adapter/apiserver/stats_reporter.go +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -22,7 +22,6 @@ import ( "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - utils "knative.dev/eventing/pkg/broker" "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" ) @@ -53,6 +52,8 @@ var _ StatsReporter = (*reporter)(nil) // reporter holds cached metric objects to report filter metrics. type reporter struct { namespaceTagKey tag.Key + eventTypeKey tag.Key + eventSourceKey tag.Key resultKey tag.Key filterResultKey tag.Key } @@ -67,7 +68,16 @@ func NewStatsReporter() (StatsReporter, error) { return nil, err } r.namespaceTagKey = nsTag - + eventTypeTag, err := tag.NewKey(metricskey.EventType) + if err != nil { + return nil, err + } + r.eventTypeKey = eventTypeTag + eventSourceTag, err := tag.NewKey(metricskey.EventSource) + if err != nil { + return nil, err + } + r.eventSourceKey = eventSourceTag filterResultTag, err := tag.NewKey(metricskey.FilterResult) if err != nil { return nil, err @@ -86,7 +96,8 @@ func NewStatsReporter() (StatsReporter, error) { Measure: eventCountM, // TODO count or sum aggregation? Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.resultKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceKey, + r.eventTypeKey}, }, ) if err != nil { @@ -98,7 +109,7 @@ func NewStatsReporter() (StatsReporter, error) { // ReportEventCount captures the event count. func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { - ctx, err := r.generateTag(args, tag.Insert(r.resultKey, utils.Result(err))) + ctx, err := r.generateTag(args, tag.Insert(r.resultKey, Result(err))) if err != nil { return err } @@ -111,6 +122,8 @@ func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), + tag.Insert(r.eventSourceKey, valueOrAny(args.eventSource)), + tag.Insert(r.eventTypeKey, valueOrAny(args.eventType)), t) } diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go new file mode 100644 index 00000000000..e8984836c25 --- /dev/null +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package apiserver + +import ( + "testing" + + "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricstest" +) + +// unregister, ehm, unregisters the metrics that were registered, by +// virtue of StatsReporter creation. +// Since golang executes test iterations within the same process, the stats reporter +// returns an error if the metric is already registered and the test panics. +func unregister() { + metricstest.Unregister("event_count") +} + +func TestStatsReporter(t *testing.T) { + args := &ReportArgs{ + ns: "testns", + eventType: "dev.knative.apiserver.ref.delete", + eventSource: "unit-test", + } + + r, err := NewStatsReporter() + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + // Without this `go test ... -count=X`, where X > 1, fails, since + // we get an error about view already being registered. + defer unregister() + + wantTags := map[string]string{ + metricskey.NamespaceName: "testns", + metricskey.EventType: "dev.knative.apiserver.ref.delete", + metricskey.EventSource: "unit-test", + } + + wantTags1 := map[string]string(wantTags) + wantTags1[metricskey.Result] = "success" + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags1, 2) + +} + +func TestReporterEmptySourceAndType(t *testing.T) { + r, err := NewStatsReporter() + defer unregister() + + if err != nil { + t.Fatalf("Failed to create a new reporter: %v", err) + } + + args := &ReportArgs{ + ns: "testns", + eventType: "", + eventSource: "", + } + + wantTags := map[string]string{ + metricskey.NamespaceName: "testns", + metricskey.Result: "success", + metricskey.EventSource: metricskey.Any, + metricskey.EventType: metricskey.Any, + } + + // test ReportEventCount + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + expectSuccess(t, func() error { + return r.ReportEventCount(args, nil) + }) + metricstest.CheckCountData(t, "event_count", wantTags, 4) +} + +func expectSuccess(t *testing.T, f func() error) { + t.Helper() + if err := f(); err != nil { + t.Errorf("Reporter expected success but got error: %v", err) + } +} diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index da77b30dd76..9aeb69eb115 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -59,7 +59,7 @@ const ( // image. It must be defined. raImageEnvVar = "APISERVER_RA_IMAGE" - component = "apiserver-source-controller" + component = "apiserversource" ) var ( diff --git a/pkg/reconciler/apiserversource/controller_test.go b/pkg/reconciler/apiserversource/controller_test.go index e1895f92518..1f18800a706 100644 --- a/pkg/reconciler/apiserversource/controller_test.go +++ b/pkg/reconciler/apiserversource/controller_test.go @@ -17,11 +17,13 @@ limitations under the License. package apiserversource import ( + "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "knative.dev/pkg/configmap" - logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" @@ -35,8 +37,26 @@ func TestNew(t *testing.T) { defer logtesting.ClearAll() ctx, _ := SetupFakeContext(t) ctx = withCfgHost(ctx, &rest.Config{Host: "unit_test"}) - - c := NewController(ctx, configmap.NewFixedWatcher()) + os.Setenv("METRICS_DOMAIN", "knative.dev/eventing") + c := NewController(ctx, configmap.NewFixedWatcher(&corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-observability", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "_example": "test-config", + }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-logging", + Namespace: "knative-eventing", + }, + Data: map[string]string{ + "zap-logger-config": "test-config", + "loglevel.controller": "info", + "loglevel.webhook": "info", + }, + })) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 564b916dee6..c1b122bf88e 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -121,6 +121,9 @@ func makeEnv(sinkURI, loggingConfig, metricsConfig string, spec *v1alpha1.ApiSer FieldPath: "metadata.namespace", }, }, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", }, { Name: "K_METRICS_CONFIG", Value: metricsConfig, diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index 9a99c94cfee..a636aac0dc5 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/apis/sources/v1alpha1" + _ "knative.dev/pkg/metrics/testing" ) func TestMakeReceiveAdapter(t *testing.T) { @@ -104,6 +105,10 @@ func TestMakeReceiveAdapter(t *testing.T) { { Name: "receive-adapter", Image: "test-image", + Ports: []corev1.ContainerPort{{ + Name: "metrics", + ContainerPort: 9090, + }}, Env: []corev1.EnvVar{ { Name: "SINK_URI", @@ -126,6 +131,15 @@ func TestMakeReceiveAdapter(t *testing.T) { FieldPath: "metadata.namespace", }, }, + }, { + Name: "METRICS_DOMAIN", + Value: "knative.dev/eventing", + }, { + Name: "K_METRICS_CONFIG", + Value: "", + }, { + Name: "K_LOGGING_CONFIG", + Value: "", }, }, }, From b9cbf287a0a86b4278d724bfceddf542977be54b Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Fri, 30 Aug 2019 17:49:39 +0200 Subject: [PATCH 3/9] Added config test --- .../apiserversource/resources/config_test.go | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 pkg/reconciler/apiserversource/resources/config_test.go diff --git a/pkg/reconciler/apiserversource/resources/config_test.go b/pkg/reconciler/apiserversource/resources/config_test.go new file mode 100644 index 00000000000..7bff49f3ee1 --- /dev/null +++ b/pkg/reconciler/apiserversource/resources/config_test.go @@ -0,0 +1,138 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "testing" + + "go.uber.org/zap/zapcore" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + + "github.com/google/go-cmp/cmp" +) + +func TestMetricsOptions(t *testing.T) { + testCases := map[string]struct { + opts *metrics.ExporterOptions + want string + wantErr string + }{ + "nil": { + opts: nil, + want: "", + wantErr: "base64 metrics string is empty", + }, + "happy": { + opts: &metrics.ExporterOptions{ + Domain: "domain", + Component: "component", + PrometheusPort: 9090, + ConfigMap: map[string]string{ + "foo": "bar", + "boosh": "kakow", + }, + }, + want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := MetricsOptionsToBase64(tc.opts) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to options. + { + want := tc.opts + got, gotErr := Base64ToMetricsOptions(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} + +func TestLoggingConfig(t *testing.T) { + testCases := map[string]struct { + cfg *logging.Config + want string + wantErr string + }{ + "nil": { + cfg: nil, + want: "", + wantErr: "base64 logging string is empty", + }, + "happy": { + cfg: &logging.Config{ + LoggingConfig: "{}", + LoggingLevel: map[string]zapcore.Level{}, + }, + want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := LoggingConfigToBase64(tc.cfg) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to config. + if tc.cfg != nil { + want := tc.cfg + got, gotErr := Base64ToLoggingConfig(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} From 1a32cece46c4211141f52067479caf97f75bcfa0 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sat, 31 Aug 2019 00:47:56 +0200 Subject: [PATCH 4/9] Changes based on PR review --- cmd/apiserver_receive_adapter/main.go | 19 +++++++----- pkg/adapter/apiserver/metrics.go | 1 + pkg/adapter/apiserver/ref.go | 18 +++++------- pkg/adapter/apiserver/resource.go | 4 ++- pkg/adapter/apiserver/stats_reporter.go | 26 ++++------------ pkg/adapter/apiserver/stats_reporter_test.go | 31 ++++++++------------ 6 files changed, 41 insertions(+), 58 deletions(-) diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index ad72fef2164..591bb54a7a7 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -24,8 +24,6 @@ import ( // (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" - "knative.dev/eventing/pkg/reconciler/apiserversource/resources" - "github.com/kelseyhightower/envconfig" "go.uber.org/zap" "k8s.io/apimachinery/pkg/api/meta" @@ -34,6 +32,7 @@ import ( "k8s.io/client-go/tools/clientcmd" "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/pkg/reconciler/apiserversource/resources" "knative.dev/eventing/pkg/tracing" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" @@ -41,7 +40,7 @@ import ( ) const ( - component = "ApiServerSource::ReceiveAdapter" + component = "apiserversource" ) var ( @@ -83,20 +82,24 @@ func main() { loggingConfig, err := resources.Base64ToLoggingConfig( env.LoggingConfigBase64) if err != nil { - panic(err) + fmt.Printf("[ERROR] filed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } } + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) // Convert base64 encoded json metrics.ExporterOptions to // metrics.ExporterOptions. metricsConfig, err := resources.Base64ToMetricsOptions( env.MetricsConfigBase64) if err != nil { - panic(err) + logger.Errorf("failed to process metrics options: %s", err.Error()) } - logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) - defer flush(logger) - if err := metrics.UpdateExporter(*metricsConfig, logger); err != nil { logger.Fatalf("Failed to create the metrics exporter: %v", err) } diff --git a/pkg/adapter/apiserver/metrics.go b/pkg/adapter/apiserver/metrics.go index ce91a166007..2ae9dc2ee03 100644 --- a/pkg/adapter/apiserver/metrics.go +++ b/pkg/adapter/apiserver/metrics.go @@ -16,6 +16,7 @@ package apiserver +// TODO: use HTTP status codes // Type gets the eventtype. func Result(err error) string { if err != nil { diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 220c1184694..b2e590bc783 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -70,7 +70,7 @@ func (a *ref) Add(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -81,7 +81,7 @@ func (a *ref) Update(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) } // Implements cache.Store @@ -92,16 +92,10 @@ func (a *ref) Delete(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) } func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { - reportArgs := &ReportArgs{ - ns: a.namespace, - eventSource: a.source, - eventType: a.eventType, - } - a.reporter.ReportEventCount(reportArgs, nil) if a.controlledGVRs == nil { a.controlledGVRs = []schema.GroupVersionResource{gvr} return @@ -109,7 +103,7 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { a.controlledGVRs = append(a.controlledGVRs, gvr) } -func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { +func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error { reportArgs := &ReportArgs{ ns: a.namespace, eventSource: event.Source(), @@ -117,8 +111,10 @@ func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event, reporter } a.reporter.ReportEventCount(reportArgs, nil) - if _, err := a.ce.Send(ctx, *event); err != nil { + _, err := a.ce.Send(ctx, *event) + if err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + a.reporter.ReportEventCount(reportArgs, err) return err } return nil diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index ac5e31c4794..65a3a6fe79b 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -77,8 +77,10 @@ func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, repo } a.reporter.ReportEventCount(reportArgs, nil) - if _, err := a.ce.Send(ctx, *event); err != nil { + _, err := a.ce.Send(ctx, *event) + if err != nil { a.logger.Info("event delivery failed", zap.Error(err)) + a.reporter.ReportEventCount(reportArgs, err) return err } return nil diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go index a24ad83d03a..71dca312d9b 100644 --- a/pkg/adapter/apiserver/stats_reporter.go +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -27,8 +27,8 @@ import ( ) var ( - // eventCountM is a counter which records the number of events received - // by a Trigger. + // eventCountM is a counter which records the number of events sent + // by an Importer. eventCountM = stats.Int64( "event_count", "Number of events created", @@ -55,10 +55,10 @@ type reporter struct { eventTypeKey tag.Key eventSourceKey tag.Key resultKey tag.Key - filterResultKey tag.Key } -// NewStatsReporter creates a reporter that collects and reports filter metrics. +// NewStatsReporter creates a reporter that collects and reports apiserversource +// metrics. func NewStatsReporter() (StatsReporter, error) { var r = &reporter{} @@ -78,11 +78,6 @@ func NewStatsReporter() (StatsReporter, error) { return nil, err } r.eventSourceKey = eventSourceTag - filterResultTag, err := tag.NewKey(metricskey.FilterResult) - if err != nil { - return nil, err - } - r.filterResultKey = filterResultTag resultTag, err := tag.NewKey(metricskey.Result) if err != nil { return nil, err @@ -94,7 +89,6 @@ func NewStatsReporter() (StatsReporter, error) { &view.View{ Description: eventCountM.Description(), Measure: eventCountM, - // TODO count or sum aggregation? Aggregation: view.Count(), TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceKey, r.eventTypeKey}, @@ -118,18 +112,10 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { } func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context, error) { - // Note that eventType and eventSource can be empty strings, so they need a special treatment. return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.eventSourceKey, valueOrAny(args.eventSource)), - tag.Insert(r.eventTypeKey, valueOrAny(args.eventType)), + tag.Insert(r.eventSourceKey, args.eventSource), + tag.Insert(r.eventTypeKey, args.eventType), t) } - -func valueOrAny(v string) string { - if v != "" { - return v - } - return metricskey.Any -} diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go index e8984836c25..2026857ed30 100644 --- a/pkg/adapter/apiserver/stats_reporter_test.go +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "fmt" "testing" "knative.dev/eventing/pkg/metrics/metricskey" @@ -34,7 +35,7 @@ func unregister() { func TestStatsReporter(t *testing.T) { args := &ReportArgs{ ns: "testns", - eventType: "dev.knative.apiserver.ref.delete", + eventType: "dev.knative.apiserver.resource.update", eventSource: "unit-test", } @@ -48,7 +49,7 @@ func TestStatsReporter(t *testing.T) { wantTags := map[string]string{ metricskey.NamespaceName: "testns", - metricskey.EventType: "dev.knative.apiserver.ref.delete", + metricskey.EventType: "dev.knative.apiserver.resource.update", metricskey.EventSource: "unit-test", } @@ -62,11 +63,11 @@ func TestStatsReporter(t *testing.T) { expectSuccess(t, func() error { return r.ReportEventCount(args, nil) }) - metricstest.CheckCountData(t, "event_count", wantTags1, 2) + metricstest.CheckCountData(t, "event_count", wantTags, 2) } -func TestReporterEmptySourceAndType(t *testing.T) { +func TestReporterForErrorTag(t *testing.T) { r, err := NewStatsReporter() defer unregister() @@ -76,31 +77,25 @@ func TestReporterEmptySourceAndType(t *testing.T) { args := &ReportArgs{ ns: "testns", - eventType: "", - eventSource: "", + eventType: "eventtype", + eventSource: "eventsource", } wantTags := map[string]string{ metricskey.NamespaceName: "testns", metricskey.Result: "success", - metricskey.EventSource: metricskey.Any, - metricskey.EventType: metricskey.Any, + metricskey.EventType: "eventtype", + metricskey.EventSource: "eventsource", } - + e := fmt.Errorf("test error") // test ReportEventCount expectSuccess(t, func() error { - return r.ReportEventCount(args, nil) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, nil) + return r.ReportEventCount(args, e) }) expectSuccess(t, func() error { - return r.ReportEventCount(args, nil) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, nil) + return r.ReportEventCount(args, e) }) - metricstest.CheckCountData(t, "event_count", wantTags, 4) + metricstest.CheckCountData(t, "event_count", wantTags, 2) } func expectSuccess(t *testing.T, f func() error) { From b19e8ecb2a7645762b5a79a0ac3062ac115cdd97 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sat, 31 Aug 2019 01:36:43 +0200 Subject: [PATCH 5/9] Updated knative.dev/pkg --- Gopkg.lock | 2 ++ pkg/adapter/apiserver/stats_reporter_test.go | 20 +++++++++---------- .../k8s.io/client-go/dynamic/fake/simple.go | 2 +- vendor/k8s.io/code-generator/Godeps/OWNERS | 2 ++ vendor/k8s.io/code-generator/OWNERS | 11 ++++++++++ .../code-generator/cmd/client-gen/OWNERS | 8 ++++++++ .../code-generator/cmd/go-to-protobuf/OWNERS | 4 ++++ vendor/knative.dev/pkg/OWNERS | 4 ++++ vendor/knative.dev/pkg/OWNERS_ALIASES | 7 +++++-- vendor/knative.dev/pkg/apis/OWNERS | 4 ++++ vendor/knative.dev/pkg/apis/duck/OWNERS | 4 ++++ vendor/knative.dev/pkg/apis/istio/OWNERS | 4 ++++ vendor/knative.dev/pkg/cloudevents/OWNERS | 4 ++++ vendor/knative.dev/pkg/configmap/OWNERS | 4 ++++ vendor/knative.dev/pkg/controller/OWNERS | 4 ++++ vendor/knative.dev/pkg/hack/OWNERS | 10 ++++++++++ vendor/knative.dev/pkg/injection/OWNERS | 5 +++++ vendor/knative.dev/pkg/kmeta/OWNERS | 4 ++++ vendor/knative.dev/pkg/logging/OWNERS | 4 ++++ vendor/knative.dev/pkg/metrics/OWNERS | 4 ++++ vendor/knative.dev/pkg/network/OWNERS | 4 ++++ vendor/knative.dev/pkg/reconciler/OWNERS | 4 ++++ vendor/knative.dev/pkg/resolver/OWNERS | 5 +++++ vendor/knative.dev/pkg/test/OWNERS | 10 ++++++++++ vendor/knative.dev/pkg/testutils/OWNERS | 4 ++++ vendor/knative.dev/pkg/webhook/OWNERS | 4 ++++ 26 files changed, 128 insertions(+), 14 deletions(-) create mode 100644 vendor/k8s.io/code-generator/Godeps/OWNERS create mode 100644 vendor/k8s.io/code-generator/OWNERS create mode 100644 vendor/k8s.io/code-generator/cmd/client-gen/OWNERS create mode 100644 vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS create mode 100644 vendor/knative.dev/pkg/OWNERS create mode 100644 vendor/knative.dev/pkg/apis/OWNERS create mode 100644 vendor/knative.dev/pkg/apis/duck/OWNERS create mode 100644 vendor/knative.dev/pkg/apis/istio/OWNERS create mode 100644 vendor/knative.dev/pkg/cloudevents/OWNERS create mode 100644 vendor/knative.dev/pkg/configmap/OWNERS create mode 100644 vendor/knative.dev/pkg/controller/OWNERS create mode 100644 vendor/knative.dev/pkg/hack/OWNERS create mode 100644 vendor/knative.dev/pkg/injection/OWNERS create mode 100644 vendor/knative.dev/pkg/kmeta/OWNERS create mode 100644 vendor/knative.dev/pkg/logging/OWNERS create mode 100644 vendor/knative.dev/pkg/metrics/OWNERS create mode 100644 vendor/knative.dev/pkg/network/OWNERS create mode 100644 vendor/knative.dev/pkg/reconciler/OWNERS create mode 100644 vendor/knative.dev/pkg/resolver/OWNERS create mode 100644 vendor/knative.dev/pkg/test/OWNERS create mode 100644 vendor/knative.dev/pkg/testutils/OWNERS create mode 100644 vendor/knative.dev/pkg/webhook/OWNERS diff --git a/Gopkg.lock b/Gopkg.lock index b7a41ecb942..49487686875 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1235,6 +1235,7 @@ "metrics", "metrics/metricskey", "metrics/metricstest", + "metrics/testing", "profiling", "reconciler/testing", "signals", @@ -1388,6 +1389,7 @@ "knative.dev/pkg/metrics", "knative.dev/pkg/metrics/metricskey", "knative.dev/pkg/metrics/metricstest", + "knative.dev/pkg/metrics/testing", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", "knative.dev/pkg/system", diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go index 2026857ed30..7df7bfb476f 100644 --- a/pkg/adapter/apiserver/stats_reporter_test.go +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -20,7 +20,8 @@ import ( "fmt" "testing" - "knative.dev/eventing/pkg/metrics/metricskey" + metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" + "knative.dev/pkg/metrics/metricskey" "knative.dev/pkg/metrics/metricstest" ) @@ -48,14 +49,11 @@ func TestStatsReporter(t *testing.T) { defer unregister() wantTags := map[string]string{ - metricskey.NamespaceName: "testns", - metricskey.EventType: "dev.knative.apiserver.resource.update", - metricskey.EventSource: "unit-test", + metricskey.LabelNamespaceName: "testns", + metricskey.LabelEventType: "dev.knative.apiserver.resource.update", + metricskey.LabelEventSource: "unit-test", } - wantTags1 := map[string]string(wantTags) - wantTags1[metricskey.Result] = "success" - // test ReportEventCount expectSuccess(t, func() error { return r.ReportEventCount(args, nil) @@ -82,10 +80,10 @@ func TestReporterForErrorTag(t *testing.T) { } wantTags := map[string]string{ - metricskey.NamespaceName: "testns", - metricskey.Result: "success", - metricskey.EventType: "eventtype", - metricskey.EventSource: "eventsource", + metricskey.LabelNamespaceName: "testns", + metricskeyEventing.Result: "success", + metricskey.LabelEventType: "eventtype", + metricskey.LabelEventSource: "eventsource", } e := fmt.Errorf("test error") // test ReportEventCount diff --git a/vendor/k8s.io/client-go/dynamic/fake/simple.go b/vendor/k8s.io/client-go/dynamic/fake/simple.go index dde45892f8e..13e2d8055e8 100644 --- a/vendor/k8s.io/client-go/dynamic/fake/simple.go +++ b/vendor/k8s.io/client-go/dynamic/fake/simple.go @@ -45,7 +45,7 @@ func NewSimpleDynamicClient(scheme *runtime.Scheme, objects ...runtime.Object) * } } - cs := &FakeDynamicClient{scheme: scheme} + cs := &FakeDynamicClient{} cs.AddReactor("*", "*", testing.ObjectReaction(o)) cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) { gvr := action.GetResource() diff --git a/vendor/k8s.io/code-generator/Godeps/OWNERS b/vendor/k8s.io/code-generator/Godeps/OWNERS new file mode 100644 index 00000000000..3d49f30605a --- /dev/null +++ b/vendor/k8s.io/code-generator/Godeps/OWNERS @@ -0,0 +1,2 @@ +approvers: +- dep-approvers diff --git a/vendor/k8s.io/code-generator/OWNERS b/vendor/k8s.io/code-generator/OWNERS new file mode 100644 index 00000000000..4155fc60ccd --- /dev/null +++ b/vendor/k8s.io/code-generator/OWNERS @@ -0,0 +1,11 @@ +approvers: +- lavalamp +- wojtek-t +- sttts +reviewers: +- lavalamp +- wojtek-t +- sttts +labels: +- sig/api-machinery +- area/code-generation diff --git a/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS b/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS new file mode 100644 index 00000000000..0c408a1aa94 --- /dev/null +++ b/vendor/k8s.io/code-generator/cmd/client-gen/OWNERS @@ -0,0 +1,8 @@ +approvers: +- lavalamp +- wojtek-t +- caesarxuchao +reviewers: +- lavalamp +- wojtek-t +- caesarxuchao diff --git a/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS b/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS new file mode 100644 index 00000000000..05d4b2a6574 --- /dev/null +++ b/vendor/k8s.io/code-generator/cmd/go-to-protobuf/OWNERS @@ -0,0 +1,4 @@ +approvers: +- smarterclayton +reviewers: +- smarterclayton diff --git a/vendor/knative.dev/pkg/OWNERS b/vendor/knative.dev/pkg/OWNERS new file mode 100644 index 00000000000..030df98f78d --- /dev/null +++ b/vendor/knative.dev/pkg/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- pkg-approvers diff --git a/vendor/knative.dev/pkg/OWNERS_ALIASES b/vendor/knative.dev/pkg/OWNERS_ALIASES index 51ae932f7fb..86d5793c851 100644 --- a/vendor/knative.dev/pkg/OWNERS_ALIASES +++ b/vendor/knative.dev/pkg/OWNERS_ALIASES @@ -26,13 +26,16 @@ aliases: - dprotaso controller-approvers: - - mattmoor + - dgerd - grantr + - mattmoor - tcnghia + - vagababov kmeta-approvers: - mattmoor - - jonjohnsonjr + - dgerd + - vagababov logging-approvers: - mdemirhan diff --git a/vendor/knative.dev/pkg/apis/OWNERS b/vendor/knative.dev/pkg/apis/OWNERS new file mode 100644 index 00000000000..a25420ebc0d --- /dev/null +++ b/vendor/knative.dev/pkg/apis/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-approvers diff --git a/vendor/knative.dev/pkg/apis/duck/OWNERS b/vendor/knative.dev/pkg/apis/duck/OWNERS new file mode 100644 index 00000000000..ad4d83c51e4 --- /dev/null +++ b/vendor/knative.dev/pkg/apis/duck/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-duck-approvers diff --git a/vendor/knative.dev/pkg/apis/istio/OWNERS b/vendor/knative.dev/pkg/apis/istio/OWNERS new file mode 100644 index 00000000000..c09668f13f1 --- /dev/null +++ b/vendor/knative.dev/pkg/apis/istio/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- apis-istio-approvers diff --git a/vendor/knative.dev/pkg/cloudevents/OWNERS b/vendor/knative.dev/pkg/cloudevents/OWNERS new file mode 100644 index 00000000000..27343aba0a9 --- /dev/null +++ b/vendor/knative.dev/pkg/cloudevents/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- cloudevents-approvers diff --git a/vendor/knative.dev/pkg/configmap/OWNERS b/vendor/knative.dev/pkg/configmap/OWNERS new file mode 100644 index 00000000000..2480fc6d43f --- /dev/null +++ b/vendor/knative.dev/pkg/configmap/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- configmap-approvers diff --git a/vendor/knative.dev/pkg/controller/OWNERS b/vendor/knative.dev/pkg/controller/OWNERS new file mode 100644 index 00000000000..afa22257a26 --- /dev/null +++ b/vendor/knative.dev/pkg/controller/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- controller-approvers diff --git a/vendor/knative.dev/pkg/hack/OWNERS b/vendor/knative.dev/pkg/hack/OWNERS new file mode 100644 index 00000000000..c50adc84930 --- /dev/null +++ b/vendor/knative.dev/pkg/hack/OWNERS @@ -0,0 +1,10 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers + +reviewers: +- productivity-reviewers + +labels: +- area/test-and-release diff --git a/vendor/knative.dev/pkg/injection/OWNERS b/vendor/knative.dev/pkg/injection/OWNERS new file mode 100644 index 00000000000..dda47512a47 --- /dev/null +++ b/vendor/knative.dev/pkg/injection/OWNERS @@ -0,0 +1,5 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- mattmoor +- n3wscott diff --git a/vendor/knative.dev/pkg/kmeta/OWNERS b/vendor/knative.dev/pkg/kmeta/OWNERS new file mode 100644 index 00000000000..29b0d9f2562 --- /dev/null +++ b/vendor/knative.dev/pkg/kmeta/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- kmeta-approvers diff --git a/vendor/knative.dev/pkg/logging/OWNERS b/vendor/knative.dev/pkg/logging/OWNERS new file mode 100644 index 00000000000..fa4854ba0a5 --- /dev/null +++ b/vendor/knative.dev/pkg/logging/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- logging-approvers diff --git a/vendor/knative.dev/pkg/metrics/OWNERS b/vendor/knative.dev/pkg/metrics/OWNERS new file mode 100644 index 00000000000..6d3966df44e --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- metrics-approvers diff --git a/vendor/knative.dev/pkg/network/OWNERS b/vendor/knative.dev/pkg/network/OWNERS new file mode 100644 index 00000000000..5fa3f1016d9 --- /dev/null +++ b/vendor/knative.dev/pkg/network/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- tcnghia diff --git a/vendor/knative.dev/pkg/reconciler/OWNERS b/vendor/knative.dev/pkg/reconciler/OWNERS new file mode 100644 index 00000000000..afa22257a26 --- /dev/null +++ b/vendor/knative.dev/pkg/reconciler/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- controller-approvers diff --git a/vendor/knative.dev/pkg/resolver/OWNERS b/vendor/knative.dev/pkg/resolver/OWNERS new file mode 100644 index 00000000000..acf2ee2c1cf --- /dev/null +++ b/vendor/knative.dev/pkg/resolver/OWNERS @@ -0,0 +1,5 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- n3wscott +- vaikas-google diff --git a/vendor/knative.dev/pkg/test/OWNERS b/vendor/knative.dev/pkg/test/OWNERS new file mode 100644 index 00000000000..c50adc84930 --- /dev/null +++ b/vendor/knative.dev/pkg/test/OWNERS @@ -0,0 +1,10 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers + +reviewers: +- productivity-reviewers + +labels: +- area/test-and-release diff --git a/vendor/knative.dev/pkg/testutils/OWNERS b/vendor/knative.dev/pkg/testutils/OWNERS new file mode 100644 index 00000000000..46b7acf91e9 --- /dev/null +++ b/vendor/knative.dev/pkg/testutils/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- productivity-approvers diff --git a/vendor/knative.dev/pkg/webhook/OWNERS b/vendor/knative.dev/pkg/webhook/OWNERS new file mode 100644 index 00000000000..b87878d94ae --- /dev/null +++ b/vendor/knative.dev/pkg/webhook/OWNERS @@ -0,0 +1,4 @@ +# The OWNERS file is used by prow to automatically merge approved PRs. + +approvers: +- webhook-approvers From 0f3b53d8a5d6b5b1e5ff4a3451dcda9ddaf1f967 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sun, 1 Sep 2019 20:56:16 +0200 Subject: [PATCH 6/9] More refactorings based on PR review --- cmd/apiserver_receive_adapter/main.go | 15 ++-- pkg/adapter/apiserver/adapter.go | 50 ++++++++------ pkg/adapter/apiserver/adapter_test.go | 6 +- pkg/adapter/apiserver/events/events.go | 6 +- pkg/adapter/apiserver/ref.go | 14 ++-- pkg/adapter/apiserver/resource.go | 32 +++++---- pkg/adapter/apiserver/stats_reporter.go | 69 +++++++++++++------ pkg/adapter/apiserver/stats_reporter_test.go | 32 +++++---- .../resources/receive_adapter.go | 15 ++-- .../resources/receive_adapter_test.go | 12 +++- 10 files changed, 158 insertions(+), 93 deletions(-) diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 591bb54a7a7..ea41a27a69a 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -49,12 +49,13 @@ var ( ) type envConfig struct { - Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` - Mode string `envconfig:"MODE"` - SinkURI string `split_words:"true" required:"true"` - ApiVersion []string `split_words:"true" required:"true"` - Kind []string `required:"true"` - Controller []bool `required:"true"` + Namespace string `envconfig:"SYSTEM_NAMESPACE" default:"default"` + Mode string `envconfig:"MODE"` + SinkURI string `split_words:"true" required:"true"` + ApiVersion []string `split_words:"true" required:"true"` + Kind []string `required:"true"` + Controller []bool `required:"true"` + ApiServerImporter string `envconfig:"APISERVERIMPORTER" required:"true"` // MetricsConfigBase64 is a base64 encoded json string of // metrics.ExporterOptions. This is used to configure the metrics exporter // options, the config is stored in a config map inside the controllers @@ -163,7 +164,7 @@ func main() { } a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, logger, opt, - reporter) + reporter, env.ApiServerImporter) logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { logger.Warn("start returned an error,", zap.Error(err)) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index b35c65defc1..1206d4cc2ae 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -64,12 +64,15 @@ type adapter struct { namespace string logger *zap.SugaredLogger - mode string - delegate eventDelegate - reporter StatsReporter + mode string + delegate eventDelegate + reporter StatsReporter + apiServerImporter string } -func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, opt Options, reporter StatsReporter) Adapter { +func NewAdaptor(source string, k8sClient dynamic.Interface, + ceClient cloudevents.Client, logger *zap.SugaredLogger, + opt Options, reporter StatsReporter, apiServerImporter string) Adapter { mode := opt.Mode switch mode { case ResourceMode, RefMode: @@ -81,14 +84,15 @@ func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents } a := &adapter{ - k8s: k8sClient, - ce: ceClient, - source: source, - logger: logger, - gvrcs: opt.GVRCs, - namespace: opt.Namespace, - mode: mode, - reporter: reporter, + k8s: k8sClient, + ce: ceClient, + source: source, + logger: logger, + gvrcs: opt.GVRCs, + namespace: opt.Namespace, + mode: mode, + reporter: reporter, + apiServerImporter: apiServerImporter, } return a } @@ -107,20 +111,22 @@ func (a *adapter) Start(stopCh <-chan struct{}) error { switch a.mode { case ResourceMode: d = &resource{ - ce: a.ce, - source: a.source, - logger: a.logger, - reporter: a.reporter, - namespace: a.namespace, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + apiServerImporter: a.apiServerImporter, } case RefMode: d = &ref{ - ce: a.ce, - source: a.source, - logger: a.logger, - reporter: a.reporter, - namespace: a.namespace, + ce: a.ce, + source: a.source, + logger: a.logger, + reporter: a.reporter, + namespace: a.namespace, + apiServerImporter: a.apiServerImporter, } default: diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 4341200357f..2c7cbc80afc 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -112,7 +112,7 @@ func TestNewAdaptor(t *testing.T) { if err != nil { t.Fatalf("Failed to create a new reporter: %v", err) } - a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r) + a := NewAdaptor(tc.source, k8s, ce, logger, tc.opt, r, "test-importer") got, ok := a.(*adapter) if !ok { @@ -155,7 +155,7 @@ func TestAdapter_StartRef(t *testing.T) { t.Fatalf("Failed to create a new reporter: %v", err) } - a := NewAdaptor(source, k8s, ce, logger, opt, r) + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") err = errors.New("test never ran") stopCh := make(chan struct{}) @@ -193,7 +193,7 @@ func TestAdapter_StartResource(t *testing.T) { if err != nil { t.Fatalf("Failed to create a new reporter: %v", err) } - a := NewAdaptor(source, k8s, ce, logger, opt, r) + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-adapter") err = errors.New("test never ran") stopCh := make(chan struct{}) diff --git a/pkg/adapter/apiserver/events/events.go b/pkg/adapter/apiserver/events/events.go index d5f5ff8b5ad..5812423bf3c 100644 --- a/pkg/adapter/apiserver/events/events.go +++ b/pkg/adapter/apiserver/events/events.go @@ -34,7 +34,8 @@ func MakeAddEvent(source string, obj interface{}) (*cloudevents.Event, error) { } object := obj.(*unstructured.Unstructured) - return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType, object, object) + return makeEvent(source, sourcesv1alpha1.ApiServerSourceAddEventType, + object, object) } func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) { @@ -43,7 +44,8 @@ func MakeUpdateEvent(source string, obj interface{}) (*cloudevents.Event, error) } object := obj.(*unstructured.Unstructured) - return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType, object, object) + return makeEvent(source, sourcesv1alpha1.ApiServerSourceUpdateEventType, + object, object) } func MakeDeleteEvent(source string, obj interface{}) (*cloudevents.Event, error) { diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index b2e590bc783..154a2b48f31 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -36,9 +36,10 @@ type ref struct { eventType string logger *zap.SugaredLogger - controlledGVRs []schema.GroupVersionResource - reporter StatsReporter - namespace string + controlledGVRs []schema.GroupVersionResource + reporter StatsReporter + namespace string + apiServerImporter string } var _ cache.Store = (*ref)(nil) @@ -105,9 +106,10 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error { reportArgs := &ReportArgs{ - ns: a.namespace, - eventSource: event.Source(), - eventType: event.Type(), + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + apiServerImporter: a.apiServerImporter, } a.reporter.ReportEventCount(reportArgs, nil) diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 65a3a6fe79b..2e8fef9cbdc 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -27,12 +27,13 @@ import ( ) type resource struct { - ce cloudevents.Client - source string - eventType string - logger *zap.SugaredLogger - reporter StatsReporter - namespace string + ce cloudevents.Client + source string + eventType string + logger *zap.SugaredLogger + reporter StatsReporter + namespace string + apiServerImporter string } var _ cache.Store = (*resource)(nil) @@ -69,11 +70,13 @@ func (a *resource) Delete(obj interface{}) error { return a.sendEvent(context.Background(), event, a.reporter) } -func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { +func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, + reporter StatsReporter) error { reportArgs := &ReportArgs{ - ns: a.namespace, - eventSource: event.Source(), - eventType: event.Type(), + ns: a.namespace, + eventSource: event.Source(), + eventType: event.Type(), + apiServerImporter: a.apiServerImporter, } a.reporter.ReportEventCount(reportArgs, nil) @@ -88,7 +91,8 @@ func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, repo func (a *resource) addControllerWatch(gvr schema.GroupVersionResource) { // not supported for resource. - a.logger.Warn("ignored controller watch request on gvr.", zap.String("gvr", gvr.String())) + a.logger.Warn("ignored controller watch request on gvr.", + zap.String("gvr", gvr.String())) } // Stub cache.Store impl @@ -104,12 +108,14 @@ func (a *resource) ListKeys() []string { } // Implements cache.Store -func (a *resource) Get(obj interface{}) (item interface{}, exists bool, err error) { +func (a *resource) Get(obj interface{}) (item interface{}, exists bool, + err error) { return nil, false, nil } // Implements cache.Store -func (a *resource) GetByKey(key string) (item interface{}, exists bool, err error) { +func (a *resource) GetByKey(key string) (item interface{}, exists bool, + err error) { return nil, false, nil } diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go index 71dca312d9b..25f75c04ee1 100644 --- a/pkg/adapter/apiserver/stats_reporter.go +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -19,10 +19,12 @@ package apiserver import ( "context" + "knative.dev/pkg/metrics/metricskey" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "knative.dev/eventing/pkg/metrics/metricskey" + metricsKeyEventing "knative.dev/eventing/pkg/metrics/metricskey" "knative.dev/pkg/metrics" ) @@ -34,27 +36,33 @@ var ( "Number of events created", stats.UnitDimensionless, ) + _ StatsReporter = (*reporter)(nil) ) type ReportArgs struct { - ns string - eventType string - eventSource string + ns string + eventType string + eventSource string + apiServerImporter string } +const ( + importerResourceGroupValue = "apiserversources.sources.eventing.knative.dev" +) + // StatsReporter defines the interface for sending filter metrics. type StatsReporter interface { ReportEventCount(args *ReportArgs, err error) error } -var _ StatsReporter = (*reporter)(nil) - // reporter holds cached metric objects to report filter metrics. type reporter struct { - namespaceTagKey tag.Key - eventTypeKey tag.Key - eventSourceKey tag.Key - resultKey tag.Key + namespaceTagKey tag.Key + eventTypeTagKey tag.Key + eventSourceTagKey tag.Key + importerNameTagKey tag.Key + importerResourceGroupTagKey tag.Key + resultKey tag.Key } // NewStatsReporter creates a reporter that collects and reports apiserversource @@ -63,22 +71,37 @@ func NewStatsReporter() (StatsReporter, error) { var r = &reporter{} // Create the tag keys that will be used to add tags to our measurements. - nsTag, err := tag.NewKey(metricskey.NamespaceName) + nsTag, err := tag.NewKey(metricskey.LabelNamespaceName) if err != nil { return nil, err } r.namespaceTagKey = nsTag - eventTypeTag, err := tag.NewKey(metricskey.EventType) + + eventTypeTag, err := tag.NewKey(metricskey.LabelEventType) + if err != nil { + return nil, err + } + r.eventTypeTagKey = eventTypeTag + + eventSourceTag, err := tag.NewKey(metricskey.LabelEventSource) if err != nil { return nil, err } - r.eventTypeKey = eventTypeTag - eventSourceTag, err := tag.NewKey(metricskey.EventSource) + r.eventSourceTagKey = eventSourceTag + + importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) if err != nil { return nil, err } - r.eventSourceKey = eventSourceTag - resultTag, err := tag.NewKey(metricskey.Result) + r.importerNameTagKey = importerNameTag + + importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) + if err != nil { + return nil, err + } + r.importerResourceGroupTagKey = importerResourceGroupTag + + resultTag, err := tag.NewKey(metricsKeyEventing.Result) if err != nil { return nil, err } @@ -90,8 +113,8 @@ func NewStatsReporter() (StatsReporter, error) { Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceKey, - r.eventTypeKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceTagKey, + r.eventTypeTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey}, }, ) if err != nil { @@ -107,6 +130,11 @@ func (r *reporter) ReportEventCount(args *ReportArgs, err error) error { if err != nil { return err } + ctx, err = r.generateTag(args, tag.Insert(r.importerResourceGroupTagKey, + importerResourceGroupValue)) + if err != nil { + return err + } metrics.Record(ctx, eventCountM.M(1)) return nil } @@ -115,7 +143,8 @@ func (r *reporter) generateTag(args *ReportArgs, t tag.Mutator) (context.Context return tag.New( context.Background(), tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.eventSourceKey, args.eventSource), - tag.Insert(r.eventTypeKey, args.eventType), + tag.Insert(r.eventSourceTagKey, args.eventSource), + tag.Insert(r.eventTypeTagKey, args.eventType), + tag.Insert(r.importerNameTagKey, args.apiServerImporter), t) } diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go index 7df7bfb476f..364e823313e 100644 --- a/pkg/adapter/apiserver/stats_reporter_test.go +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -35,9 +35,10 @@ func unregister() { func TestStatsReporter(t *testing.T) { args := &ReportArgs{ - ns: "testns", - eventType: "dev.knative.apiserver.resource.update", - eventSource: "unit-test", + ns: "testns", + eventType: "dev.knative.apiserver.resource.update", + eventSource: "unit-test", + apiServerImporter: "testimporter", } r, err := NewStatsReporter() @@ -49,9 +50,11 @@ func TestStatsReporter(t *testing.T) { defer unregister() wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskey.LabelEventType: "dev.knative.apiserver.resource.update", - metricskey.LabelEventSource: "unit-test", + metricskey.LabelNamespaceName: "testns", + metricskey.LabelEventType: "dev.knative.apiserver.resource.update", + metricskey.LabelEventSource: "unit-test", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", } // test ReportEventCount @@ -74,16 +77,19 @@ func TestReporterForErrorTag(t *testing.T) { } args := &ReportArgs{ - ns: "testns", - eventType: "eventtype", - eventSource: "eventsource", + ns: "testns", + eventType: "eventtype", + eventSource: "eventsource", + apiServerImporter: "testimporter", } wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskeyEventing.Result: "success", - metricskey.LabelEventType: "eventtype", - metricskey.LabelEventSource: "eventsource", + metricskey.LabelNamespaceName: "testns", + metricskeyEventing.Result: "success", + metricskey.LabelEventType: "eventtype", + metricskey.LabelEventSource: "eventsource", + metricskey.LabelImporterName: "testimporter", + metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", } e := fmt.Errorf("test error") // test ReportEventCount diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index c1b122bf88e..48a82737b04 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -45,8 +45,9 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, - Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("apiserversource-%s", args.Source.Name)), - Labels: args.Labels, + Name: utils.GenerateFixedName(args.Source, + fmt.Sprintf("apiserversource-%s", args.Source.Name)), + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{ *kmeta.NewControllerRef(args.Source), }, @@ -69,7 +70,9 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { { Name: "receive-adapter", Image: args.Image, - Env: makeEnv(args.SinkURI, args.LoggingConfig, args.MetricsConfig, &args.Source.Spec), + Env: makeEnv(args.SinkURI, args.LoggingConfig, + args.MetricsConfig, &args.Source.Spec, + args.Source.ObjectMeta.Name), Ports: []corev1.ContainerPort{{ Name: "metrics", ContainerPort: 9090, @@ -82,7 +85,8 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { } } -func makeEnv(sinkURI, loggingConfig, metricsConfig string, spec *v1alpha1.ApiServerSourceSpec) []corev1.EnvVar { +func makeEnv(sinkURI, loggingConfig, metricsConfig string, + spec *v1alpha1.ApiServerSourceSpec, name string) []corev1.EnvVar { apiversions := "" kinds := "" controlled := "" @@ -121,6 +125,9 @@ func makeEnv(sinkURI, loggingConfig, metricsConfig string, spec *v1alpha1.ApiSer FieldPath: "metadata.namespace", }, }, + }, { + Name: "APISERVERIMPORTER", + Value: name, }, { Name: "METRICS_DOMAIN", Value: "knative.dev/eventing", diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go index a636aac0dc5..bf23ac31d29 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter_test.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package resources import ( + "fmt" "testing" "github.com/google/go-cmp/cmp" @@ -28,9 +29,10 @@ import ( ) func TestMakeReceiveAdapter(t *testing.T) { + name := "source-name" src := &v1alpha1.ApiServerSource{ ObjectMeta: metav1.ObjectMeta{ - Name: "source-name", + Name: name, Namespace: "source-namespace", UID: "1234", }, @@ -62,10 +64,11 @@ func TestMakeReceiveAdapter(t *testing.T) { one := int32(1) trueValue := true + want := &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Namespace: "source-namespace", - Name: "apiserversource-source-name-1234", + Name: fmt.Sprintf("apiserversource-%s-1234", name), Labels: map[string]string{ "test-key1": "test-value1", "test-key2": "test-value2", @@ -74,7 +77,7 @@ func TestMakeReceiveAdapter(t *testing.T) { { APIVersion: "sources.eventing.knative.dev/v1alpha1", Kind: "ApiServerSource", - Name: "source-name", + Name: name, UID: "1234", Controller: &trueValue, BlockOwnerDeletion: &trueValue, @@ -131,6 +134,9 @@ func TestMakeReceiveAdapter(t *testing.T) { FieldPath: "metadata.namespace", }, }, + }, { + Name: "APISERVERIMPORTER", + Value: name, }, { Name: "METRICS_DOMAIN", Value: "knative.dev/eventing", From e262e44aeef1a01700f7975a11b1ec6378e3e2f5 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sun, 1 Sep 2019 22:07:07 +0200 Subject: [PATCH 7/9] Fixed result --- pkg/adapter/apiserver/stats_reporter.go | 2 +- pkg/adapter/apiserver/stats_reporter_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/adapter/apiserver/stats_reporter.go b/pkg/adapter/apiserver/stats_reporter.go index 25f75c04ee1..3193975fa8f 100644 --- a/pkg/adapter/apiserver/stats_reporter.go +++ b/pkg/adapter/apiserver/stats_reporter.go @@ -101,7 +101,7 @@ func NewStatsReporter() (StatsReporter, error) { } r.importerResourceGroupTagKey = importerResourceGroupTag - resultTag, err := tag.NewKey(metricsKeyEventing.Result) + resultTag, err := tag.NewKey(metricsKeyEventing.LabelResult) if err != nil { return nil, err } diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go index 364e823313e..140eee65c74 100644 --- a/pkg/adapter/apiserver/stats_reporter_test.go +++ b/pkg/adapter/apiserver/stats_reporter_test.go @@ -85,7 +85,7 @@ func TestReporterForErrorTag(t *testing.T) { wantTags := map[string]string{ metricskey.LabelNamespaceName: "testns", - metricskeyEventing.Result: "success", + metricskeyEventing.LabelFilterResult: "success", metricskey.LabelEventType: "eventtype", metricskey.LabelEventSource: "eventsource", metricskey.LabelImporterName: "testimporter", From 6782e96fc07db4bd347999990a771d7cb7b8bcbf Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sun, 1 Sep 2019 22:41:26 +0200 Subject: [PATCH 8/9] Moved config to utils --- cmd/apiserver_receive_adapter/main.go | 6 +++--- pkg/reconciler/apiserversource/apiserversource.go | 4 ++-- .../apiserversource/resources => utils}/config.go | 2 +- .../apiserversource/resources => utils}/config_test.go | 2 +- 4 files changed, 7 insertions(+), 7 deletions(-) rename pkg/{reconciler/apiserversource/resources => utils}/config.go (99%) rename pkg/{reconciler/apiserversource/resources => utils}/config_test.go (99%) diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index ea41a27a69a..9f931a15202 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -32,8 +32,8 @@ import ( "k8s.io/client-go/tools/clientcmd" "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/pkg/reconciler/apiserversource/resources" "knative.dev/eventing/pkg/tracing" + "knative.dev/eventing/pkg/utils" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" @@ -80,7 +80,7 @@ func main() { } // Convert base64 encoded json logging.Config to logging.Config. - loggingConfig, err := resources.Base64ToLoggingConfig( + loggingConfig, err := utils.Base64ToLoggingConfig( env.LoggingConfigBase64) if err != nil { fmt.Printf("[ERROR] filed to process logging config: %s", err.Error()) @@ -95,7 +95,7 @@ func main() { // Convert base64 encoded json metrics.ExporterOptions to // metrics.ExporterOptions. - metricsConfig, err := resources.Base64ToMetricsOptions( + metricsConfig, err := utils.Base64ToMetricsOptions( env.MetricsConfigBase64) if err != nil { logger.Errorf("failed to process metrics options: %s", err.Error()) diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index 9aeb69eb115..2c788c8df31 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -198,8 +198,8 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Api Source: src, Labels: resources.Labels(src.Name), SinkURI: sinkURI, - LoggingConfig: resources.LoggingConfigToBase64(r.loggingConfig), - MetricsConfig: resources.MetricsOptionsToBase64(r.metricsConfig), + LoggingConfig: utils.LoggingConfigToBase64(r.loggingConfig), + MetricsConfig: utils.MetricsOptionsToBase64(r.metricsConfig), } expected := resources.MakeReceiveAdapter(&adapterArgs) diff --git a/pkg/reconciler/apiserversource/resources/config.go b/pkg/utils/config.go similarity index 99% rename from pkg/reconciler/apiserversource/resources/config.go rename to pkg/utils/config.go index 9e411ac830f..48fbb2adf3b 100644 --- a/pkg/reconciler/apiserversource/resources/config.go +++ b/pkg/utils/config.go @@ -13,7 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ -package resources +package utils import ( "encoding/json" diff --git a/pkg/reconciler/apiserversource/resources/config_test.go b/pkg/utils/config_test.go similarity index 99% rename from pkg/reconciler/apiserversource/resources/config_test.go rename to pkg/utils/config_test.go index 7bff49f3ee1..8ca4c0c081b 100644 --- a/pkg/reconciler/apiserversource/resources/config_test.go +++ b/pkg/utils/config_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package resources +package utils import ( "testing" From 7cb9e28c6331ff75af22c000e939d1257e4383a0 Mon Sep 17 00:00:00 2001 From: Sayan Hazra Date: Sun, 1 Sep 2019 22:52:02 +0200 Subject: [PATCH 9/9] Fix name --- pkg/adapter/apiserver/adapter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 2c7cbc80afc..f4bc1404eec 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -193,7 +193,7 @@ func TestAdapter_StartResource(t *testing.T) { if err != nil { t.Fatalf("Failed to create a new reporter: %v", err) } - a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-adapter") + a := NewAdaptor(source, k8s, ce, logger, opt, r, "test-importer") err = errors.New("test never ran") stopCh := make(chan struct{})