From 7cfc9966d17d6353799dfff0d0705aa56de80ba5 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 10 Sep 2019 14:04:36 -0700 Subject: [PATCH 1/4] using pkg source reporter --- Gopkg.lock | 4 +- Gopkg.toml | 2 +- pkg/adapter/apiserver/adapter.go | 7 +- pkg/adapter/apiserver/adapter_test.go | 3 +- pkg/adapter/apiserver/ref.go | 14 ++- pkg/adapter/apiserver/resource.go | 22 ++-- pkg/adapter/apiserver/stats_reporter_test.go | 113 ------------------ vendor/knative.dev/pkg/Gopkg.lock | 4 +- .../knative.dev/pkg/controller/controller.go | 15 ++- vendor/knative.dev/pkg/logging/config.go | 40 +++++++ vendor/knative.dev/pkg/metrics/config.go | 30 +++++ .../pkg/metrics/metricskey/constants.go | 6 + .../pkg/metrics/source_stats_reporter.go | 75 +++++------- vendor/knative.dev/pkg/metrics/utils.go | 26 ++++ 14 files changed, 180 insertions(+), 181 deletions(-) delete mode 100644 pkg/adapter/apiserver/stats_reporter_test.go rename pkg/adapter/apiserver/stats_reporter.go => vendor/knative.dev/pkg/metrics/source_stats_reporter.go (61%) create mode 100644 vendor/knative.dev/pkg/metrics/utils.go diff --git a/Gopkg.lock b/Gopkg.lock index 57a3045d819..621b3c77fb9 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1201,7 +1201,7 @@ [[projects]] branch = "master" - digest = "1:0dca180a6449f2cd457f8efea003336352c5a412212fce072cad9661d5a71b61" + digest = "1:d984ab764aacf483c1edfc0a8e52fdde9e6c78617ac305c2238b5943152c842a" name = "knative.dev/pkg" packages = [ "apis", @@ -1284,7 +1284,7 @@ "webhook", ] pruneopts = "T" - revision = "528ad1c1dd627059b95aef17ccf27f9ab0f46c10" + revision = "b2eb686f97c5241658502c2a4b5d375af076aeb4" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 7ee93b4179e..c9ff95ad6a7 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -118,4 +118,4 @@ required = [ [[constraint]] name = "github.com/tsenart/vegeta" - version = "12.7.0" \ No newline at end of file + version = "12.7.0" diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 71805e4790f..948d7afe61a 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -30,6 +30,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go" "go.uber.org/zap" + "knative.dev/pkg/metrics" ) type Adapter interface { @@ -41,6 +42,8 @@ const ( RefMode = "Ref" // ResourceMode produces payloads of ResourceEvent ResourceMode = "Resource" + + resourceGroup = "apiserversources.sources.eventing.knative.dev" ) // Options hold the options for the Adapter. @@ -67,13 +70,13 @@ type adapter struct { mode string delegate eventDelegate - reporter StatsReporter + reporter metrics.StatsReporter name string } func NewAdaptor(source string, k8sClient dynamic.Interface, ceClient cloudevents.Client, logger *zap.SugaredLogger, - opt Options, reporter StatsReporter, name string) Adapter { + opt Options, reporter metrics.StatsReporter, name string) Adapter { mode := opt.Mode switch mode { case ResourceMode, RefMode: diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index df576abdc8c..5ce772f9fb1 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,6 +17,7 @@ limitations under the License. package apiserver import ( + "knative.dev/pkg/metrics" "testing" "github.com/google/go-cmp/cmp" @@ -34,7 +35,7 @@ import ( type mockReporter struct{} -func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error { +func (r *mockReporter) ReportEventCount(args *metrics.ReportArgs, responseCode int) error { return nil } diff --git a/pkg/adapter/apiserver/ref.go b/pkg/adapter/apiserver/ref.go index 2b6848afcca..a3e7b7a1a79 100644 --- a/pkg/adapter/apiserver/ref.go +++ b/pkg/adapter/apiserver/ref.go @@ -18,6 +18,7 @@ package apiserver import ( "context" + "knative.dev/pkg/metrics" "reflect" "k8s.io/apimachinery/pkg/api/meta" @@ -37,7 +38,7 @@ type ref struct { logger *zap.SugaredLogger controlledGVRs []schema.GroupVersionResource - reporter StatsReporter + reporter metrics.StatsReporter namespace string name string } @@ -105,11 +106,12 @@ func (a *ref) addControllerWatch(gvr schema.GroupVersionResource) { } func (a *ref) sendEvent(ctx context.Context, event *cloudevents.Event) error { - reportArgs := &ReportArgs{ - ns: a.namespace, - eventSource: event.Source(), - eventType: event.Type(), - name: a.name, + reportArgs := &metrics.ReportArgs{ + Namespace: a.namespace, + EventSource: event.Source(), + EventType: event.Type(), + Name: a.name, + ResourceGroup: resourceGroup, } rctx, _, err := a.ce.Send(ctx, *event) diff --git a/pkg/adapter/apiserver/resource.go b/pkg/adapter/apiserver/resource.go index 0c2bf1f2e37..8f129c5980b 100644 --- a/pkg/adapter/apiserver/resource.go +++ b/pkg/adapter/apiserver/resource.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/adapter/apiserver/events" + "knative.dev/pkg/metrics" ) type resource struct { @@ -31,7 +32,7 @@ type resource struct { source string eventType string logger *zap.SugaredLogger - reporter StatsReporter + reporter metrics.StatsReporter namespace string name string } @@ -45,7 +46,7 @@ func (a *resource) Add(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) } func (a *resource) Update(obj interface{}) error { @@ -55,7 +56,7 @@ func (a *resource) Update(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) return nil } @@ -67,15 +68,16 @@ func (a *resource) Delete(obj interface{}) error { return err } - return a.sendEvent(context.Background(), event, a.reporter) + return a.sendEvent(context.Background(), event) } -func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event, reporter StatsReporter) error { - reportArgs := &ReportArgs{ - ns: a.namespace, - eventSource: event.Source(), - eventType: event.Type(), - name: a.name, +func (a *resource) sendEvent(ctx context.Context, event *cloudevents.Event) error { + reportArgs := &metrics.ReportArgs{ + Namespace: a.namespace, + EventSource: event.Source(), + EventType: event.Type(), + Name: a.name, + ResourceGroup: resourceGroup, } rctx, _, err := a.ce.Send(ctx, *event) diff --git a/pkg/adapter/apiserver/stats_reporter_test.go b/pkg/adapter/apiserver/stats_reporter_test.go deleted file mode 100644 index 6f86870db80..00000000000 --- a/pkg/adapter/apiserver/stats_reporter_test.go +++ /dev/null @@ -1,113 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package apiserver - -import ( - "net/http" - "testing" - - . "knative.dev/eventing/pkg/metrics/metricskey" - metricskeyEventing "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricskey" - "knative.dev/pkg/metrics/metricstest" -) - -// unregister, ehm, unregisters the metrics that were registered, by -// virtue of StatsReporter creation. -// Since golang executes test iterations within the same process, the stats reporter -// returns an error if the metric is already registered and the test panics. -func unregister() { - metricstest.Unregister("event_count") -} - -func TestStatsReporter(t *testing.T) { - args := &ReportArgs{ - ns: "testns", - eventType: "dev.knative.apiserver.resource.update", - eventSource: "unit-test", - name: "testimporter", - } - - r, err := NewStatsReporter() - if err != nil { - t.Fatalf("Failed to create a new reporter: %v", err) - } - // Without this `go test ... -count=X`, where X > 1, fails, since - // we get an error about view already being registered. - defer unregister() - - wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskey.LabelEventType: "dev.knative.apiserver.resource.update", - metricskey.LabelEventSource: "unit-test", - metricskey.LabelImporterName: "testimporter", - metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", - LabelResponseCode: "202", - LabelResponseCodeClass: "2xx", - } - - // test ReportEventCount - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusAccepted) - }) - metricstest.CheckCountData(t, "event_count", wantTags, 2) -} - -func TestReporterFor5xxResponse(t *testing.T) { - r, err := NewStatsReporter() - defer unregister() - - if err != nil { - t.Fatalf("Failed to create a new reporter: %v", err) - } - - args := &ReportArgs{ - ns: "testns", - eventType: "eventtype", - eventSource: "eventsource", - name: "testimporter", - } - - wantTags := map[string]string{ - metricskey.LabelNamespaceName: "testns", - metricskeyEventing.LabelFilterResult: "success", - metricskey.LabelEventType: "eventtype", - metricskey.LabelEventSource: "eventsource", - metricskey.LabelImporterName: "testimporter", - metricskey.LabelImporterResourceGroup: "apiserversources.sources.eventing.knative.dev", - LabelResponseCode: "500", - LabelResponseCodeClass: "5xx", - } - // test ReportEventCount - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusInternalServerError) - }) - expectSuccess(t, func() error { - return r.ReportEventCount(args, http.StatusInternalServerError) - }) - metricstest.CheckCountData(t, "event_count", wantTags, 2) -} - -func expectSuccess(t *testing.T, f func() error) { - t.Helper() - if err := f(); err != nil { - t.Errorf("Reporter expected success but got error: %v", err) - } -} diff --git a/vendor/knative.dev/pkg/Gopkg.lock b/vendor/knative.dev/pkg/Gopkg.lock index 3af18f831c7..1274587e2cc 100644 --- a/vendor/knative.dev/pkg/Gopkg.lock +++ b/vendor/knative.dev/pkg/Gopkg.lock @@ -1245,14 +1245,14 @@ [[projects]] branch = "master" - digest = "1:230d4d8a9aa64db75413202a9d1cde3b80bcc417b858093f0fd27c2017bb4955" + digest = "1:e873112f19e9ce823bc01ca715de355bd5cf8e4d6045ed5f34b3eb0d6ec2c655" name = "knative.dev/test-infra" packages = [ "scripts", "tools/dep-collector", ] pruneopts = "UT" - revision = "90a447afb0f4562ff89ac2975334aefcf864d21d" + revision = "5449c8bad49d3528b5cb0b58a0cdce0f8a526f03" [[projects]] digest = "1:8730e0150dfb2b7e173890c8b9868e7a273082ef8e39f4940e3506a481cf895c" diff --git a/vendor/knative.dev/pkg/controller/controller.go b/vendor/knative.dev/pkg/controller/controller.go index 6c933275447..6587313f1f4 100644 --- a/vendor/knative.dev/pkg/controller/controller.go +++ b/vendor/knative.dev/pkg/controller/controller.go @@ -273,7 +273,12 @@ func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error { defer runtime.HandleCrash() sg := sync.WaitGroup{} defer sg.Wait() - defer c.WorkQueue.ShutDown() + defer func() { + c.WorkQueue.ShutDown() + for c.WorkQueue.Len() > 0 { + time.Sleep(time.Millisecond * 100) + } + }() // Launch workers to process resources that get enqueued to our workqueue. logger := c.logger @@ -350,7 +355,10 @@ func (c *Impl) handleErr(err error, key string) { c.logger.Errorw("Reconcile error", zap.Error(err)) // Re-queue the key if it's an transient error. - if !IsPermanentError(err) { + // We want to check that the queue is shutting down here + // since controller Run might have exited by now (since while this item was + // being processed, queue.Len==0). + if !IsPermanentError(err) && !c.WorkQueue.ShuttingDown() { c.WorkQueue.AddRateLimited(key) c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", key, c.WorkQueue.Len()) return @@ -368,6 +376,9 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) { // FilteredGlobalResync enqueues (with a delay) all objects from the // SharedInformer that pass the filter function func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { + if c.WorkQueue.ShuttingDown() { + return + } list := si.GetStore().List() count := float64(len(list)) for _, obj := range list { diff --git a/vendor/knative.dev/pkg/logging/config.go b/vendor/knative.dev/pkg/logging/config.go index bed0ba167da..b6100c2dc5c 100644 --- a/vendor/knative.dev/pkg/logging/config.go +++ b/vendor/knative.dev/pkg/logging/config.go @@ -33,6 +33,8 @@ import ( const ConfigMapNameEnv = "CONFIG_LOGGING_NAME" +var zapLoggerConfig = "zap-logger-config" + // NewLogger creates a logger with the supplied configuration. // In addition to the logger, it returns AtomicLevel that can // be used to change the logging level at runtime. @@ -196,3 +198,41 @@ func ConfigMapName() string { } return cm } + +// JsonToLoggingConfig converts a json string of a Config. +// Returns a non-nil Config always. +func JsonToLoggingConfig(jsonCfg string) (*Config, error) { + if jsonCfg == "" { + return nil, errors.New("json logging string is empty") + } + + var configMap map[string]string + if err := json.Unmarshal([]byte(jsonCfg), &configMap); err != nil { + return nil, err + } + + cfg, err := NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + if cfg, err = NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } + } + return cfg, nil +} + +// LoggingConfigToJson converts a Config to a json string. +func LoggingConfigToJson(cfg *Config) (string, error) { + if cfg == nil || cfg.LoggingConfig == "" { + return "", nil + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return "", err + } + + return string(jsonCfg), nil +} diff --git a/vendor/knative.dev/pkg/metrics/config.go b/vendor/knative.dev/pkg/metrics/config.go index 627e064a341..c74d8f1c83d 100644 --- a/vendor/knative.dev/pkg/metrics/config.go +++ b/vendor/knative.dev/pkg/metrics/config.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "encoding/json" "errors" "fmt" "os" @@ -291,3 +292,32 @@ import ( _ "knative.dev/pkg/metrics/testing" )`, DomainEnv, DomainEnv)) } + +// JsonToMetricsOptions converts a json string of a +// ExporterOptions. Returns a non-nil ExporterOptions always. +func JsonToMetricsOptions(jsonOpts string) (*ExporterOptions, error) { + var opts ExporterOptions + if jsonOpts == "" { + return nil, errors.New("json options string is empty") + } + + if err := json.Unmarshal([]byte(jsonOpts), &opts); err != nil { + return nil, err + } + + return &opts, nil +} + +// MetricsOptionsToJson converts a ExporterOptions to a json string. +func MetricsOptionsToJson(opts *ExporterOptions) (string, error) { + if opts == nil { + return "", nil + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return "", err + } + + return string(jsonOpts), nil +} diff --git a/vendor/knative.dev/pkg/metrics/metricskey/constants.go b/vendor/knative.dev/pkg/metrics/metricskey/constants.go index 01e5adff7e8..b2508807f8f 100644 --- a/vendor/knative.dev/pkg/metrics/metricskey/constants.go +++ b/vendor/knative.dev/pkg/metrics/metricskey/constants.go @@ -29,6 +29,12 @@ const ( // LabelNamespaceName is the label for immutable name of the namespace that the service is deployed LabelNamespaceName = "namespace_name" + // LabelResponseCode is the label for the HTTP response status code. + LabelResponseCode = "response_code" + + // LabelResponseCodeClass is the label for the HTTP response status code class. For example, "2xx", "3xx", etc. + LabelResponseCodeClass = "response_code_class" + // ValueUnknown is the default value if the field is unknown, e.g. project will be unknown if Knative // is not running on GKE. ValueUnknown = "unknown" diff --git a/pkg/adapter/apiserver/stats_reporter.go b/vendor/knative.dev/pkg/metrics/source_stats_reporter.go similarity index 61% rename from pkg/adapter/apiserver/stats_reporter.go rename to vendor/knative.dev/pkg/metrics/source_stats_reporter.go index 00602ae7bb6..6848230f847 100644 --- a/pkg/adapter/apiserver/stats_reporter.go +++ b/vendor/knative.dev/pkg/metrics/source_stats_reporter.go @@ -14,63 +14,55 @@ * limitations under the License. */ -package apiserver +package metrics import ( "context" "strconv" - . "knative.dev/eventing/pkg/metrics/metricskey" - "knative.dev/eventing/pkg/utils" - "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" - "knative.dev/pkg/metrics" "knative.dev/pkg/metrics/metricskey" ) var ( - // eventCountM is a counter which records the number of events sent - // by the ApiServerSource. + // eventCountM is a counter which records the number of events sent by the source. eventCountM = stats.Int64( "event_count", - "Number of events created", + "Number of events sent", stats.UnitDimensionless, ) ) type ReportArgs struct { - ns string - eventType string - eventSource string - name string + Namespace string + EventType string + EventSource string + Name string + ResourceGroup string } -const ( - importerResourceGroupValue = "apiserversources.sources.eventing.knative.dev" -) - -// StatsReporter defines the interface for sending filter metrics. +// StatsReporter defines the interface for sending source metrics. type StatsReporter interface { + // ReportEventCount captures the event count. It records one per call. ReportEventCount(args *ReportArgs, responseCode int) error } var _ StatsReporter = (*reporter)(nil) -// reporter holds cached metric objects to report filter metrics. +// reporter holds cached metric objects to report source metrics. type reporter struct { - namespaceTagKey tag.Key - eventTypeTagKey tag.Key - eventSourceTagKey tag.Key - importerNameTagKey tag.Key - importerResourceGroupTagKey tag.Key - responseCodeKey tag.Key - responseCodeClassKey tag.Key + namespaceTagKey tag.Key + eventSourceTagKey tag.Key + eventTypeTagKey tag.Key + sourceNameTagKey tag.Key + sourceResourceGroupTagKey tag.Key + responseCodeKey tag.Key + responseCodeClassKey tag.Key } -// NewStatsReporter creates a reporter that collects and reports apiserversource -// metrics. +// NewStatsReporter creates a reporter that collects and reports source metrics. func NewStatsReporter() (StatsReporter, error) { var r = &reporter{} @@ -93,24 +85,24 @@ func NewStatsReporter() (StatsReporter, error) { } r.eventTypeTagKey = eventTypeTag - importerNameTag, err := tag.NewKey(metricskey.LabelImporterName) + nameTag, err := tag.NewKey(metricskey.LabelImporterName) if err != nil { return nil, err } - r.importerNameTagKey = importerNameTag + r.sourceNameTagKey = nameTag - importerResourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) + resourceGroupTag, err := tag.NewKey(metricskey.LabelImporterResourceGroup) if err != nil { return nil, err } - r.importerResourceGroupTagKey = importerResourceGroupTag + r.sourceResourceGroupTagKey = resourceGroupTag - responseCodeTag, err := tag.NewKey(LabelResponseCode) + responseCodeTag, err := tag.NewKey(metricskey.LabelResponseCode) if err != nil { return nil, err } r.responseCodeKey = responseCodeTag - responseCodeClassTag, err := tag.NewKey(LabelResponseCodeClass) + responseCodeClassTag, err := tag.NewKey(metricskey.LabelResponseCodeClass) if err != nil { return nil, err } @@ -122,7 +114,7 @@ func NewStatsReporter() (StatsReporter, error) { Description: eventCountM.Description(), Measure: eventCountM, Aggregation: view.Count(), - TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceTagKey, r.eventTypeTagKey, r.importerNameTagKey, r.importerResourceGroupTagKey, r.responseCodeKey, r.responseCodeClassKey}, + TagKeys: []tag.Key{r.namespaceTagKey, r.eventSourceTagKey, r.eventTypeTagKey, r.sourceNameTagKey, r.sourceResourceGroupTagKey, r.responseCodeKey, r.responseCodeClassKey}, }, ) if err != nil { @@ -132,24 +124,23 @@ func NewStatsReporter() (StatsReporter, error) { return r, nil } -// ReportEventCount captures the event count. func (r *reporter) ReportEventCount(args *ReportArgs, responseCode int) error { ctx, err := r.generateTag(args, responseCode) if err != nil { return err } - metrics.Record(ctx, eventCountM.M(1)) + Record(ctx, eventCountM.M(1)) return nil } func (r *reporter) generateTag(args *ReportArgs, responseCode int) (context.Context, error) { return tag.New( context.Background(), - tag.Insert(r.namespaceTagKey, args.ns), - tag.Insert(r.eventSourceTagKey, args.eventSource), - tag.Insert(r.eventTypeTagKey, args.eventType), - tag.Insert(r.importerNameTagKey, args.name), - tag.Insert(r.importerResourceGroupTagKey, importerResourceGroupValue), + tag.Insert(r.namespaceTagKey, args.Namespace), + tag.Insert(r.eventSourceTagKey, args.EventSource), + tag.Insert(r.eventTypeTagKey, args.EventType), + tag.Insert(r.sourceNameTagKey, args.Name), + tag.Insert(r.sourceResourceGroupTagKey, args.ResourceGroup), tag.Insert(r.responseCodeKey, strconv.Itoa(responseCode)), - tag.Insert(r.responseCodeClassKey, utils.ResponseCodeClass(responseCode))) + tag.Insert(r.responseCodeClassKey, ResponseCodeClass(responseCode))) } diff --git a/vendor/knative.dev/pkg/metrics/utils.go b/vendor/knative.dev/pkg/metrics/utils.go new file mode 100644 index 00000000000..7345cec920f --- /dev/null +++ b/vendor/knative.dev/pkg/metrics/utils.go @@ -0,0 +1,26 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +import "strconv" + +// ResponseCodeClass converts an HTTP response code to a string representing its response code class. +// E.g., The response code class is "5xx" for response code 503. +func ResponseCodeClass(responseCode int) string { + // Get the hundred digit of the response code and concatenate "xx". + return strconv.Itoa(responseCode/100) + "xx" +} From f4da23ec73d0445c195aac3c2c22c75c35a8bd46 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 10 Sep 2019 14:12:41 -0700 Subject: [PATCH 2/4] updates --- cmd/apiserver_receive_adapter/main.go | 30 ++-- .../apiserversource/apiserversource.go | 8 +- pkg/utils/config.go | 130 ---------------- pkg/utils/config_test.go | 144 ------------------ 4 files changed, 17 insertions(+), 295 deletions(-) delete mode 100644 pkg/utils/config.go delete mode 100644 pkg/utils/config_test.go diff --git a/cmd/apiserver_receive_adapter/main.go b/cmd/apiserver_receive_adapter/main.go index 4870cccfa56..372a57bc882 100644 --- a/cmd/apiserver_receive_adapter/main.go +++ b/cmd/apiserver_receive_adapter/main.go @@ -33,7 +33,6 @@ import ( "knative.dev/eventing/pkg/adapter/apiserver" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/pkg/tracing" - "knative.dev/eventing/pkg/utils" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/signals" @@ -67,16 +66,16 @@ type envConfig struct { Controller []bool `required:"true"` LabelSelector StringList `envconfig:"SELECTOR" required:"true"` Name string `envconfig:"NAME" required:"true"` - // MetricsConfigBase64 is a base64 encoded json string of - // metrics.ExporterOptions. This is used to configure the metrics exporter - // options, the config is stored in a config map inside the controllers + // MetricsConfigJson is a json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, + // the config is stored in a config map inside the controllers // namespace and copied here. - MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + MetricsConfigJson string `envconfig:"K_METRICS_CONFIG" required:"true"` - // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // LoggingConfigJson is a json string of logging.Config. // This is used to configure the logging config, the config is stored in // a config map inside the controllers namespace and copied here. - LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` + LoggingConfigJson string `envconfig:"K_LOGGING_CONFIG" required:"true"` } // TODO: the controller should take the list of GVR @@ -89,9 +88,8 @@ func main() { if err != nil { panic(fmt.Sprintf("Error processing env var: %s", err)) } - // TODO move this util to pkg - // Convert base64 encoded json logging.Config to logging.Config. - loggingConfig, err := utils.Base64ToLoggingConfig(env.LoggingConfigBase64) + // Convert json logging.Config to logging.Config. + loggingConfig, err := logging.JsonToLoggingConfig(env.LoggingConfigJson) if err != nil { fmt.Printf("[ERROR] failed to process logging config: %s", err.Error()) // Use default logging config. @@ -104,10 +102,8 @@ func main() { logger := loggerSugared.Desugar() defer flush(loggerSugared) - // Convert base64 encoded json metrics.ExporterOptions to - // metrics.ExporterOptions. - metricsConfig, err := utils.Base64ToMetricsOptions( - env.MetricsConfigBase64) + // Convert json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig, err := metrics.JsonToMetricsOptions(env.MetricsConfigJson) if err != nil { logger.Error("failed to process metrics options", zap.Error(err)) } @@ -116,7 +112,7 @@ func main() { logger.Error("failed to create the metrics exporter", zap.Error(err)) } - reporter, err := apiserver.NewStatsReporter() + reporter, err := metrics.NewStatsReporter() if err != nil { logger.Error("error building statsreporter", zap.Error(err)) } @@ -173,9 +169,9 @@ func main() { } a := apiserver.NewAdaptor(cfg.Host, client, eventsClient, loggerSugared, opt, reporter, env.Name) - logger.Info("starting kubernetes api adapter.", zap.Any("adapter", env)) + logger.Info("starting kubernetes api adapter", zap.Any("adapter", env)) if err := a.Start(stopCh); err != nil { - logger.Warn("start returned an error,", zap.Error(err)) + logger.Warn("start returned an error", zap.Error(err)) } } diff --git a/pkg/reconciler/apiserversource/apiserversource.go b/pkg/reconciler/apiserversource/apiserversource.go index 1044381f1f4..761662ceef1 100644 --- a/pkg/reconciler/apiserversource/apiserversource.go +++ b/pkg/reconciler/apiserversource/apiserversource.go @@ -193,13 +193,13 @@ func (r *Reconciler) getReceiveAdapterImage() string { } func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.ApiServerSource, sinkURI string) (*appsv1.Deployment, error) { - loggingConfig, err := utils.LoggingConfigToBase64(r.loggingConfig) + loggingConfig, err := pkgLogging.LoggingConfigToJson(r.loggingConfig) if err != nil { - logging.FromContext(ctx).Error("error while converting logging config to base64", zap.Any("receiveAdapter", err)) + logging.FromContext(ctx).Error("error while converting logging config to json", zap.Any("receiveAdapter", err)) } - metricsConfig, err := utils.MetricsOptionsToBase64(r.metricsConfig) + metricsConfig, err := metrics.MetricsOptionsToJson(r.metricsConfig) if err != nil { - logging.FromContext(ctx).Error("error while converting metrics config to base64", zap.Any("receiveAdapter", err)) + logging.FromContext(ctx).Error("error while converting metrics config to json", zap.Any("receiveAdapter", err)) } adapterArgs := resources.ReceiveAdapterArgs{ Image: r.getReceiveAdapterImage(), diff --git a/pkg/utils/config.go b/pkg/utils/config.go deleted file mode 100644 index e682b797f52..00000000000 --- a/pkg/utils/config.go +++ /dev/null @@ -1,130 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package utils - -// TODO to move this to pkg -import ( - "encoding/json" - "errors" - "strconv" - - "knative.dev/pkg/logging" - "knative.dev/pkg/metrics" -) - -var zapLoggerConfig = "zap-logger-config" - -// Base64ToMetricsOptions converts a json+base64 string of a -// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. -func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { - var opts metrics.ExporterOptions - if base64 == "" { - return nil, errors.New("base64 metrics string is empty") - } - - quoted64 := strconv.Quote(string(base64)) - - var bytes []byte - if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { - return nil, err - } - - if err := json.Unmarshal(bytes, &opts); err != nil { - return nil, err - } - - return &opts, nil -} - -// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 -// string. -func MetricsOptionsToBase64(opts *metrics.ExporterOptions) (string, error) { - if opts == nil { - return "", nil - } - - jsonOpts, err := json.Marshal(opts) - if err != nil { - return "", err - } - // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. - base64Opts, err := json.Marshal(jsonOpts) - if err != nil { - return "", err - } - - // Turn the base64 encoded []byte back into a string. - base64, err := strconv.Unquote(string(base64Opts)) - if err != nil { - return "", err - } - return base64, nil -} - -// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. -// Returns a non-nil logging.Config always. -func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { - if base64 == "" { - return nil, errors.New("base64 logging string is empty") - } - - quoted64 := strconv.Quote(string(base64)) - - var bytes []byte - if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { - return nil, err - } - - var configMap map[string]string - if err := json.Unmarshal(bytes, &configMap); err != nil { - return nil, err - } - - cfg, err := logging.NewConfigFromMap(configMap) - if err != nil { - // Get the default config from logging package. - if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { - return nil, err - } - } - return cfg, nil -} - -// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. -func LoggingConfigToBase64(cfg *logging.Config) (string, error) { - if cfg == nil || cfg.LoggingConfig == "" { - return "", nil - } - - jsonCfg, err := json.Marshal(map[string]string{ - zapLoggerConfig: cfg.LoggingConfig, - }) - if err != nil { - return "", err - } - // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. - base64Cfg, err := json.Marshal(jsonCfg) - if err != nil { - return "", err - } - - // Turn the base64 encoded []byte back into a string. - base64, err := strconv.Unquote(string(base64Cfg)) - if err != nil { - return "", err - } - return base64, nil -} diff --git a/pkg/utils/config_test.go b/pkg/utils/config_test.go deleted file mode 100644 index cf2c8ab293b..00000000000 --- a/pkg/utils/config_test.go +++ /dev/null @@ -1,144 +0,0 @@ -/* -Copyright 2019 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package utils - -import ( - "testing" - - "go.uber.org/zap/zapcore" - "knative.dev/pkg/logging" - "knative.dev/pkg/metrics" - - "github.com/google/go-cmp/cmp" -) - -func TestMetricsOptions(t *testing.T) { - testCases := map[string]struct { - opts *metrics.ExporterOptions - want string - wantErr string - }{ - "nil": { - opts: nil, - want: "", - wantErr: "base64 metrics string is empty", - }, - "happy": { - opts: &metrics.ExporterOptions{ - Domain: "domain", - Component: "component", - PrometheusPort: 9090, - ConfigMap: map[string]string{ - "foo": "bar", - "boosh": "kakow", - }, - }, - want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - base64, err := MetricsOptionsToBase64(tc.opts) - if err != nil { - t.Errorf("error while converting metrics config to base64: %v", err) - } - // Test to base64. - { - want := tc.want - got := base64 - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected (-want, +got) = %v", diff) - t.Log(got) - } - } - // Test to options. - { - want := tc.opts - got, gotErr := Base64ToMetricsOptions(base64) - - if gotErr != nil { - if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { - t.Errorf("unexpected err (-want, +got) = %v", diff) - } - } else if tc.wantErr != "" { - t.Errorf("expected err %v", tc.wantErr) - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected (-want, +got) = %v", diff) - t.Log(got) - } - } - }) - } -} - -func TestLoggingConfig(t *testing.T) { - testCases := map[string]struct { - cfg *logging.Config - want string - wantErr string - }{ - "nil": { - cfg: nil, - want: "", - wantErr: "base64 logging string is empty", - }, - "happy": { - cfg: &logging.Config{ - LoggingConfig: "{}", - LoggingLevel: map[string]zapcore.Level{}, - }, - want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", - }, - } - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - base64, err := LoggingConfigToBase64(tc.cfg) - if err != nil { - t.Errorf("error while converting logging config to base64: %v", err) - } - // Test to base64. - { - want := tc.want - got := base64 - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected (-want, +got) = %v", diff) - t.Log(got) - } - } - // Test to config. - if tc.cfg != nil { - want := tc.cfg - got, gotErr := Base64ToLoggingConfig(base64) - - if gotErr != nil { - if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { - t.Errorf("unexpected err (-want, +got) = %v", diff) - } - } else if tc.wantErr != "" { - t.Errorf("expected err %v", tc.wantErr) - } - - if diff := cmp.Diff(want, got); diff != "" { - t.Errorf("unexpected (-want, +got) = %v", diff) - t.Log(got) - } - } - }) - } -} From 576fab9e98f8ba7ca1c1a84926f4fc9f1d474cd7 Mon Sep 17 00:00:00 2001 From: nachocano Date: Tue, 10 Sep 2019 17:24:17 -0700 Subject: [PATCH 3/4] updating UTs to check for sending of metric --- pkg/adapter/apiserver/adapter_test.go | 13 ++++++++++++- pkg/adapter/apiserver/ref_test.go | 9 +++++++++ pkg/adapter/apiserver/resource_test.go | 6 ++++++ 3 files changed, 27 insertions(+), 1 deletion(-) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 5ce772f9fb1..722dc05b4d6 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -33,9 +33,12 @@ import ( rectesting "knative.dev/eventing/pkg/reconciler/testing" ) -type mockReporter struct{} +type mockReporter struct { + eventCount int +} func (r *mockReporter) ReportEventCount(args *metrics.ReportArgs, responseCode int) error { + r.eventCount += 1 return nil } @@ -320,3 +323,11 @@ func makeRefAndTestingClient() (*ref, *kncetesting.TestCloudEventsClient) { reporter: r, }, ce } + +func validateMetric(t *testing.T, reporter metrics.StatsReporter, want int) { + if mockReporter, ok := reporter.(*mockReporter); !ok { + t.Errorf("reporter is not a mockReporter") + } else if mockReporter.eventCount != want { + t.Errorf("Expected %d for metric, got %d", want, mockReporter.eventCount) + } +} diff --git a/pkg/adapter/apiserver/ref_test.go b/pkg/adapter/apiserver/ref_test.go index 60a95b10aeb..76cdb71197b 100644 --- a/pkg/adapter/apiserver/ref_test.go +++ b/pkg/adapter/apiserver/ref_test.go @@ -11,36 +11,42 @@ func TestRefAddEvent(t *testing.T) { d, ce := makeRefAndTestingClient() d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) + validateMetric(t, d.reporter, 1) } func TestRefUpdateEvent(t *testing.T) { d, ce := makeRefAndTestingClient() d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) + validateMetric(t, d.reporter, 1) } func TestRefDeleteEvent(t *testing.T) { d, ce := makeRefAndTestingClient() d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) + validateMetric(t, d.reporter, 1) } func TestRefAddEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) + validateMetric(t, d.reporter, 0) } func TestRefUpdateEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) + validateMetric(t, d.reporter, 0) } func TestRefDeleteEventNil(t *testing.T) { d, ce := makeRefAndTestingClient() d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) + validateMetric(t, d.reporter, 0) } func TestRefAddEventAsController(t *testing.T) { @@ -52,6 +58,7 @@ func TestRefAddEventAsController(t *testing.T) { }) d.Add(simpleOwnedPod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddRefEventType) + validateMetric(t, d.reporter, 1) } func TestRefUpdateEventAsController(t *testing.T) { @@ -63,6 +70,7 @@ func TestRefUpdateEventAsController(t *testing.T) { }) d.Update(simpleOwnedPod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateRefEventType) + validateMetric(t, d.reporter, 1) } func TestRefDeleteEventAsController(t *testing.T) { @@ -74,6 +82,7 @@ func TestRefDeleteEventAsController(t *testing.T) { }) d.Delete(simpleOwnedPod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteRefEventType) + validateMetric(t, d.reporter, 1) } // HACKHACKHACK For test coverage. diff --git a/pkg/adapter/apiserver/resource_test.go b/pkg/adapter/apiserver/resource_test.go index 0a7d75151fd..91e930379b4 100644 --- a/pkg/adapter/apiserver/resource_test.go +++ b/pkg/adapter/apiserver/resource_test.go @@ -11,36 +11,42 @@ func TestResourceAddEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Add(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) + validateMetric(t, d.reporter, 1) } func TestResourceUpdateEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Update(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) + validateMetric(t, d.reporter, 1) } func TestResourceDeleteEvent(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Delete(simplePod("unit", "test")) validateSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) + validateMetric(t, d.reporter, 1) } func TestResourceAddEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Add(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceAddEventType) + validateMetric(t, d.reporter, 0) } func TestResourceUpdateEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Update(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceUpdateEventType) + validateMetric(t, d.reporter, 0) } func TestResourceDeleteEventNil(t *testing.T) { d, ce := makeResourceAndTestingClient() d.Delete(nil) validateNotSent(t, ce, sourcesv1alpha1.ApiServerSourceDeleteEventType) + validateMetric(t, d.reporter, 0) } func TestResourceCoverageHacks(t *testing.T) { From 489948603493ac03bf8ccca47659e160b77cea66 Mon Sep 17 00:00:00 2001 From: nachocano Date: Wed, 11 Sep 2019 10:00:29 -0700 Subject: [PATCH 4/4] go imports --- pkg/adapter/apiserver/adapter_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 722dc05b4d6..4e5e0ca8665 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -17,7 +17,6 @@ limitations under the License. package apiserver import ( - "knative.dev/pkg/metrics" "testing" "github.com/google/go-cmp/cmp" @@ -31,6 +30,7 @@ import ( dynamicfake "k8s.io/client-go/dynamic/fake" kncetesting "knative.dev/eventing/pkg/kncloudevents/testing" rectesting "knative.dev/eventing/pkg/reconciler/testing" + "knative.dev/pkg/metrics" ) type mockReporter struct {