From 516cb66cf247148769f086c08cfe50353a6bf316 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 27 Aug 2019 11:30:15 -0700 Subject: [PATCH 1/8] adding base64 converters for logging and metrics config. --- pkg/reconciler/pullsubscription/controller.go | 4 + .../pullsubscription/controller_test.go | 29 ++++- .../pullsubscription/pullsubscription.go | 25 ++++ .../pullsubscription/resources/config.go | 116 +++++++++++++++++ .../pullsubscription/resources/config_test.go | 117 ++++++++++++++++++ 5 files changed, 288 insertions(+), 3 deletions(-) create mode 100644 pkg/reconciler/pullsubscription/resources/config.go create mode 100644 pkg/reconciler/pullsubscription/resources/config_test.go diff --git a/pkg/reconciler/pullsubscription/controller.go b/pkg/reconciler/pullsubscription/controller.go index 35f673c22b..d1e2943bf9 100644 --- a/pkg/reconciler/pullsubscription/controller.go +++ b/pkg/reconciler/pullsubscription/controller.go @@ -18,6 +18,7 @@ package pullsubscription import ( "context" + "knative.dev/pkg/metrics" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -97,5 +98,8 @@ func NewController( c.tracker = tracker.New(impl.EnqueueKey, controller.GetTrackerLease(ctx)) + cmw.Watch(logging.ConfigMapName(), c.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), c.UpdateFromMetricsConfigMap) + return impl } diff --git a/pkg/reconciler/pullsubscription/controller_test.go b/pkg/reconciler/pullsubscription/controller_test.go index 7754329345..d2ede8df7b 100644 --- a/pkg/reconciler/pullsubscription/controller_test.go +++ b/pkg/reconciler/pullsubscription/controller_test.go @@ -17,18 +17,26 @@ limitations under the License. package pullsubscription import ( + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/configmap" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" + "knative.dev/pkg/system" - // Fake injection informers + _ "knative.dev/pkg/metrics/testing" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" + // Fake injection informers _ "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment/fake" _ "knative.dev/pkg/injection/informers/kubeinformers/batchv1/job/fake" + + _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" ) func TestNew(t *testing.T) { @@ -38,7 +46,22 @@ func TestNew(t *testing.T) { _ = os.Setenv("PUBSUB_RA_IMAGE", "PUBSUB_RA_IMAGE") _ = os.Setenv("PUBSUB_SUB_IMAGE", "PUBSUB_SUB_IMAGE") - c := NewController(ctx, configmap.NewFixedWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: logging.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: metrics.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/pullsubscription/pullsubscription.go b/pkg/reconciler/pullsubscription/pullsubscription.go index 0fdbce2084..04e22430eb 100644 --- a/pkg/reconciler/pullsubscription/pullsubscription.go +++ b/pkg/reconciler/pullsubscription/pullsubscription.go @@ -19,6 +19,7 @@ package pullsubscription import ( "context" "encoding/json" + "knative.dev/pkg/metrics" "time" appsv1 "k8s.io/api/apps/v1" @@ -54,6 +55,8 @@ const ( // ReconcilerName is the name of the reconciler ReconcilerName = "PullSubscriptions" + component = "pullsubscriptions" + finalizerName = controllerAgentName ) @@ -70,6 +73,9 @@ type Reconciler struct { receiveAdapterImage string + loggingConfig *logging.Config + metricsConfig *metrics.ExporterOptions + // eventTypeReconciler eventtype.Reconciler // TODO: event types. } @@ -447,6 +453,25 @@ func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.PullSu return nil, apierrors.NewNotFound(schema.GroupResource{}, "") } +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + logcfg, err := logging.NewConfigFromConfigMap(cfg) + if err != nil { + r.Logger.Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + // TODO: requeue all pullsubscriptions +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + // TODO: requeue all pullsubscriptions +} + // TODO: Registry //func (r *Reconciler) reconcileEventTypes(ctx context.Context, src *v1alpha1.PullSubscription) error { // args := r.newEventTypeReconcilerArgs(src) diff --git a/pkg/reconciler/pullsubscription/resources/config.go b/pkg/reconciler/pullsubscription/resources/config.go new file mode 100644 index 0000000000..750d0e629e --- /dev/null +++ b/pkg/reconciler/pullsubscription/resources/config.go @@ -0,0 +1,116 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "encoding/json" + "fmt" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + "strconv" +) + +var zapLoggerConfig = "zap-logger-config" + +func Base64ToMetricsOptions(base64 string) *metrics.ExporterOptions { + if base64 == "" { + return nil + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + _ = json.Unmarshal([]byte(quoted64), &bytes) + + var opts metrics.ExporterOptions + // Do not care about the unmarshal error. + _ = json.Unmarshal(bytes, &opts) + + return &opts +} + +func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { + if opts == nil { + return "" + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Opts, err := json.Marshal(jsonOpts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Opts)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} + +func Base64ToLoggingConfig(base64 string) *logging.Config { + if base64 == "" { + return nil + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + _ = json.Unmarshal([]byte(quoted64), &bytes) + + var configMap map[string]string + // Do not care about the unmarshal error. + _ = json.Unmarshal(bytes, &configMap) + + cfg, err := logging.NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + cfg, _ = logging.NewConfigFromMap(map[string]string{}) + } + return cfg +} + +func LoggingConfigToBase64(cfg *logging.Config) string { + if cfg == nil || cfg.LoggingConfig == "" { + return "" + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Cfg, err := json.Marshal(jsonCfg) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Cfg)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} diff --git a/pkg/reconciler/pullsubscription/resources/config_test.go b/pkg/reconciler/pullsubscription/resources/config_test.go new file mode 100644 index 0000000000..2e7fed62b8 --- /dev/null +++ b/pkg/reconciler/pullsubscription/resources/config_test.go @@ -0,0 +1,117 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "go.uber.org/zap/zapcore" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestMetricsOptions(t *testing.T) { + testCases := map[string]struct { + opts *metrics.ExporterOptions + want string + }{ + "nil": { + opts: nil, + want: "", + }, + "happy": { + opts: &metrics.ExporterOptions{ + Domain: "domain", + Component: "component", + PrometheusPort: 9090, + ConfigMap: map[string]string{ + "foo": "bar", + "boosh": "kakow", + }, + }, + want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := MetricsOptionsToBase64(tc.opts) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to options. + { + want := tc.opts + got := Base64ToMetricsOptions(base64) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} + +func TestLoggingConfig(t *testing.T) { + testCases := map[string]struct { + cfg *logging.Config + want string + }{ + "nil": { + cfg: nil, + want: "", + }, + "happy": { + cfg: &logging.Config{ + LoggingConfig: "{}", + LoggingLevel: map[string]zapcore.Level{}, + }, + want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := LoggingConfigToBase64(tc.cfg) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to config. + if tc.cfg != nil { + want := tc.cfg + got := Base64ToLoggingConfig(base64) + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} From 500134092730e343f3b5e88d120f8fef4bc2408c Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 27 Aug 2019 13:55:07 -0700 Subject: [PATCH 2/8] wire config up to adapter. --- pkg/pubsub/adapter/adapter.go | 46 ++++++++++++++++--- .../pullsubscription/pullsubscription.go | 12 +++++ .../resources/receive_adapter.go | 11 +++-- .../resources/receive_adapter_test.go | 22 ++++++--- 4 files changed, 76 insertions(+), 15 deletions(-) diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index 29a5e8c483..d563274e04 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -19,6 +19,7 @@ package adapter import ( "context" "fmt" + "knative.dev/pkg/metrics" "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" @@ -31,7 +32,12 @@ import ( "github.com/google/knative-gcp/pkg/kncloudevents" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" - "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" + decoderresources "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" + psresources "github.com/google/knative-gcp/pkg/reconciler/pullsubscription/resources" +) + +var ( + component = "pullsubscriptions" ) // Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a @@ -55,10 +61,10 @@ type Adapter struct { // subscription to use. Subscription string `envconfig:"PUBSUB_SUBSCRIPTION_ID" required:"true"` - // ExtensionsBased64 is a based64 encoded json string of a map of + // ExtensionsBase64 is a based64 encoded json string of a map of // CloudEvents extensions (key-value pairs) override onto the outbound // event. - ExtensionsBased64 string `envconfig:"K_CE_EXTENSIONS" required:"true"` + ExtensionsBase64 string `envconfig:"K_CE_EXTENSIONS" required:"true"` // extensions is the converted ExtensionsBased64 value. extensions map[string]string @@ -67,8 +73,17 @@ type Adapter struct { // One of [binary, structured, push]. Default: binary SendMode converters.ModeType `envconfig:"SEND_MODE" default:"binary" required:"true"` - // MetricsDomain holds the metrics domain to use for surfacing metrics. - MetricsDomain string `envconfig:"METRICS_DOMAIN" required:"true"` + // TODO + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // metricsConfig is the converted MetricsConfigBase64 value. + metricsConfig *metrics.ExporterOptions + + // TODO + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` + + // loggingConfig is the converted LoggingConfigBase64 value. + loggingConfig *logging.Config // inbound is the cloudevents client to use to receive events. inbound cloudevents.Client @@ -90,7 +105,13 @@ func (a *Adapter) Start(ctx context.Context) error { // Convert base64 encoded json map to extensions map. // This implementation comes from the Decorator object. - a.extensions = resources.MakeDecoratorExtensionsMap(a.ExtensionsBased64) + a.extensions = decoderresources.MakeDecoratorExtensionsMap(a.ExtensionsBase64) + + // Convert base64 encoded json logging.Config to logging.Config. + a.loggingConfig = psresources.Base64ToLoggingConfig(a.LoggingConfigBase64) + + // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. + a.metricsConfig = psresources.Base64ToMetricsOptions(a.MetricsConfigBase64) // Receive Events on Pub/Sub. if a.inbound == nil { @@ -115,6 +136,14 @@ func (a *Adapter) Start(ctx context.Context) error { } } + logger, _ := logging.NewLoggerFromConfig(a.loggingConfig, component) + defer flush(logger) + ctx = logging.WithLogger(ctx, logger) + + if err := metrics.UpdateExporter(*a.metricsConfig, logger); err != nil { + return fmt.Errorf("failed to update metrics exporter: %s", err.Error()) + } + return a.inbound.StartReceiver(ctx, a.receive) } @@ -252,3 +281,8 @@ func (a *Adapter) obsNewHTTPClient(ctx context.Context, target string) (cloudeve // Use the transport to make a new CloudEvents client. return cloudevents.NewClient(t) } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/pkg/reconciler/pullsubscription/pullsubscription.go b/pkg/reconciler/pullsubscription/pullsubscription.go index 04e22430eb..53487efe8f 100644 --- a/pkg/reconciler/pullsubscription/pullsubscription.go +++ b/pkg/reconciler/pullsubscription/pullsubscription.go @@ -414,6 +414,8 @@ func (r *Reconciler) createOrUpdateReceiveAdapter(ctx context.Context, src *v1al SubscriptionID: src.Status.SubscriptionID, SinkURI: src.Status.SinkURI, TransformerURI: src.Status.TransformerURI, + LoggingConfig: resources.LoggingConfigToBase64(r.loggingConfig), + MetricsConfig: resources.MetricsOptionsToBase64(r.metricsConfig), }) if existing == nil { @@ -454,21 +456,31 @@ func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.PullSu } func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + logcfg, err := logging.NewConfigFromConfigMap(cfg) if err != nil { r.Logger.Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) return } r.loggingConfig = logcfg + r.Logger.Infow("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) // TODO: requeue all pullsubscriptions } func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + r.metricsConfig = &metrics.ExporterOptions{ Domain: metrics.Domain(), Component: component, ConfigMap: cfg.Data, } + r.Logger.Infow("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) // TODO: requeue all pullsubscriptions } diff --git a/pkg/reconciler/pullsubscription/resources/receive_adapter.go b/pkg/reconciler/pullsubscription/resources/receive_adapter.go index 35c9125bb6..cb94e79ac9 100644 --- a/pkg/reconciler/pullsubscription/resources/receive_adapter.go +++ b/pkg/reconciler/pullsubscription/resources/receive_adapter.go @@ -39,6 +39,8 @@ type ReceiveAdapterArgs struct { SubscriptionID string SinkURI string TransformerURI string + MetricsConfig string + LoggingConfig string } const ( @@ -92,9 +94,6 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Name: "receive-adapter", Image: args.Image, Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: credsFile, }, { @@ -118,6 +117,12 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { }, { Name: "K_CE_EXTENSIONS", Value: ceExtensions, + }, { + Name: "K_METRICS_CONFIG", + Value: args.MetricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: args.LoggingConfig, }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume, diff --git a/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go b/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go index b4a67c2c16..ed86d97d19 100644 --- a/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go +++ b/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go @@ -53,6 +53,8 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { }, SubscriptionID: "sub-id", SinkURI: "sink-uri", + LoggingConfig: "LoggingConfig-ABC123", + MetricsConfig: "MetricsConfig-ABC123", }) one := int32(1) @@ -94,9 +96,6 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { Name: "receive-adapter", Image: "test-image", Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/var/secrets/google/eventing-secret-key", }, { @@ -118,6 +117,12 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { Value: "binary", }, { Name: "K_CE_EXTENSIONS", + }, { + Name: "K_METRICS_CONFIG", + Value: "MetricsConfig-ABC123", + }, { + Name: "K_LOGGING_CONFIG", + Value: "LoggingConfig-ABC123", }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume, @@ -175,6 +180,8 @@ func TestMakeFullReceiveAdapter(t *testing.T) { SubscriptionID: "sub-id", SinkURI: "sink-uri", TransformerURI: "transformer-uri", + LoggingConfig: "LoggingConfig-ABC123", + MetricsConfig: "MetricsConfig-ABC123", }) one := int32(1) @@ -216,9 +223,6 @@ func TestMakeFullReceiveAdapter(t *testing.T) { Name: "receive-adapter", Image: "test-image", Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/var/secrets/google/eventing-secret-key", }, { @@ -242,6 +246,12 @@ func TestMakeFullReceiveAdapter(t *testing.T) { }, { Name: "K_CE_EXTENSIONS", Value: "eyJmb28iOiJiYXIifQ==", + }, { + Name: "K_METRICS_CONFIG", + Value: "MetricsConfig-ABC123", + }, { + Name: "K_LOGGING_CONFIG", + Value: "LoggingConfig-ABC123", }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume, From a369a2a3cd6aa0aea28b30a325c7e39edf39ec71 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Tue, 27 Aug 2019 15:32:51 -0700 Subject: [PATCH 3/8] adding metrics testing. --- Gopkg.lock | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Gopkg.lock b/Gopkg.lock index ee39d8d8b1..b8c8e80f0e 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1188,6 +1188,7 @@ "logging/testing", "metrics", "metrics/metricskey", + "metrics/testing", "profiling", "ptr", "reconciler/testing", @@ -1369,6 +1370,7 @@ "knative.dev/pkg/logging", "knative.dev/pkg/logging/testing", "knative.dev/pkg/metrics", + "knative.dev/pkg/metrics/testing", "knative.dev/pkg/ptr", "knative.dev/pkg/reconciler/testing", "knative.dev/pkg/signals", From cd881b48eba848d6a02ee3f323e18752359d21a2 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 28 Aug 2019 08:56:41 -0700 Subject: [PATCH 4/8] claify some methods. --- pkg/pubsub/adapter/adapter.go | 12 ++++++++---- pkg/reconciler/pullsubscription/resources/config.go | 13 ++++++++++--- .../pullsubscription/resources/config_test.go | 2 +- 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index d563274e04..619ad12e7d 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -32,7 +32,7 @@ import ( "github.com/google/knative-gcp/pkg/kncloudevents" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" - decoderresources "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" + decoratorresources "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" psresources "github.com/google/knative-gcp/pkg/reconciler/pullsubscription/resources" ) @@ -73,13 +73,17 @@ type Adapter struct { // One of [binary, structured, push]. Default: binary SendMode converters.ModeType `envconfig:"SEND_MODE" default:"binary" required:"true"` - // TODO + // MetricsConfigBase64 is a base64 encoded json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, the config is + // stored in a config map inside the controllers namespace and copied here. MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` // metricsConfig is the converted MetricsConfigBase64 value. metricsConfig *metrics.ExporterOptions - // TODO + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` // loggingConfig is the converted LoggingConfigBase64 value. @@ -105,7 +109,7 @@ func (a *Adapter) Start(ctx context.Context) error { // Convert base64 encoded json map to extensions map. // This implementation comes from the Decorator object. - a.extensions = decoderresources.MakeDecoratorExtensionsMap(a.ExtensionsBase64) + a.extensions = decoratorresources.MakeDecoratorExtensionsMap(a.ExtensionsBase64) // Convert base64 encoded json logging.Config to logging.Config. a.loggingConfig = psresources.Base64ToLoggingConfig(a.LoggingConfigBase64) diff --git a/pkg/reconciler/pullsubscription/resources/config.go b/pkg/reconciler/pullsubscription/resources/config.go index 750d0e629e..d7e618b142 100644 --- a/pkg/reconciler/pullsubscription/resources/config.go +++ b/pkg/reconciler/pullsubscription/resources/config.go @@ -26,9 +26,12 @@ import ( var zapLoggerConfig = "zap-logger-config" +// Base64ToMetricsOptions converts a json+base64 string of a +// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. func Base64ToMetricsOptions(base64 string) *metrics.ExporterOptions { + var opts metrics.ExporterOptions if base64 == "" { - return nil + return &opts } quoted64 := strconv.Quote(string(base64)) @@ -37,13 +40,14 @@ func Base64ToMetricsOptions(base64 string) *metrics.ExporterOptions { // Do not care about the unmarshal error. _ = json.Unmarshal([]byte(quoted64), &bytes) - var opts metrics.ExporterOptions // Do not care about the unmarshal error. _ = json.Unmarshal(bytes, &opts) return &opts } +// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 +// string. func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { if opts == nil { return "" @@ -67,9 +71,11 @@ func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { return base64 } +// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. +// Returns a non-nil logging.Config always. func Base64ToLoggingConfig(base64 string) *logging.Config { if base64 == "" { - return nil + return &logging.Config{} } quoted64 := strconv.Quote(string(base64)) @@ -90,6 +96,7 @@ func Base64ToLoggingConfig(base64 string) *logging.Config { return cfg } +// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. func LoggingConfigToBase64(cfg *logging.Config) string { if cfg == nil || cfg.LoggingConfig == "" { return "" diff --git a/pkg/reconciler/pullsubscription/resources/config_test.go b/pkg/reconciler/pullsubscription/resources/config_test.go index 2e7fed62b8..2c1fa82d8c 100644 --- a/pkg/reconciler/pullsubscription/resources/config_test.go +++ b/pkg/reconciler/pullsubscription/resources/config_test.go @@ -60,7 +60,7 @@ func TestMetricsOptions(t *testing.T) { } } // Test to options. - { + if tc.opts != nil { want := tc.opts got := Base64ToMetricsOptions(base64) From fd96c72f4eb090c134a216dd94d7d333c7edf7bb Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 28 Aug 2019 10:08:36 -0700 Subject: [PATCH 5/8] un-hardcode the config maps. --- cmd/pubsub/receive_adapter/main.go | 41 ++++++++++--------- pkg/pubsub/adapter/adapter.go | 35 +--------------- .../resources/receive_adapter.go | 2 +- 3 files changed, 24 insertions(+), 54 deletions(-) diff --git a/cmd/pubsub/receive_adapter/main.go b/cmd/pubsub/receive_adapter/main.go index 00bf0fc54d..94f34c8a32 100644 --- a/cmd/pubsub/receive_adapter/main.go +++ b/cmd/pubsub/receive_adapter/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "fmt" "log" "time" @@ -28,6 +29,7 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml" transporthttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/google/knative-gcp/pkg/pubsub/adapter" + "github.com/google/knative-gcp/pkg/reconciler/pullsubscription/resources" "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" @@ -43,18 +45,22 @@ const ( func main() { flag.Parse() - sl, _ := logging.NewLogger("", "INFO") // TODO: use logging config map. - logger := sl.Desugar() - defer logger.Sync() - - ctx := logging.WithLogger(signals.NewContext(), logger.Sugar()) - startable := adapter.Adapter{} if err := envconfig.Process("", &startable); err != nil { - logger.Fatal("Failed to process env var", zap.Error(err)) + panic(fmt.Sprintf("Failed to process env var: %s", err)) } - mainMetrics(sl) + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig := resources.Base64ToLoggingConfig(startable.LoggingConfigBase64) + + // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig := resources.Base64ToMetricsOptions(startable.MetricsConfigBase64) + + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) + ctx := logging.WithLogger(signals.NewContext(), logger) + + mainMetrics(logger, metricsConfig) if startable.Project == "" { project, err := metadata.ProjectID() @@ -72,17 +78,8 @@ func main() { } } -func mainMetrics(logger *zap.SugaredLogger) { - cfg := map[string]string{ - "metrics.backend-destination": "prometheus", // TODO: hard code for now while we test. - } - - if err := metrics.UpdateExporter( - metrics.ExporterOptions{ - Domain: metrics.Domain(), - Component: component, - ConfigMap: cfg, - }, logger); err != nil { +func mainMetrics(logger *zap.SugaredLogger, opts *metrics.ExporterOptions) { + if err := metrics.UpdateExporter(*opts, logger); err != nil { log.Fatalf("Failed to create the metrics exporter: %v", err) } @@ -93,9 +90,15 @@ func mainMetrics(logger *zap.SugaredLogger) { json.LatencyView, xml.LatencyView, datacodec.LatencyView, + adapter.LatencyView, ); err != nil { log.Fatalf("Failed to register views: %v", err) } view.SetReportingPeriod(2 * time.Second) } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index 619ad12e7d..65fa11a7f2 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -19,11 +19,8 @@ package adapter import ( "context" "fmt" - "knative.dev/pkg/metrics" - - "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" @@ -33,11 +30,6 @@ import ( "github.com/google/knative-gcp/pkg/kncloudevents" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" decoratorresources "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" - psresources "github.com/google/knative-gcp/pkg/reconciler/pullsubscription/resources" -) - -var ( - component = "pullsubscriptions" ) // Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a @@ -78,17 +70,11 @@ type Adapter struct { // stored in a config map inside the controllers namespace and copied here. MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` - // metricsConfig is the converted MetricsConfigBase64 value. - metricsConfig *metrics.ExporterOptions - // LoggingConfigBase64 is a base64 encoded json string of logging.Config. // This is used to configure the logging config, the config is stored in // a config map inside the controllers namespace and copied here. LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` - // loggingConfig is the converted LoggingConfigBase64 value. - loggingConfig *logging.Config - // inbound is the cloudevents client to use to receive events. inbound cloudevents.Client @@ -111,12 +97,6 @@ func (a *Adapter) Start(ctx context.Context) error { // This implementation comes from the Decorator object. a.extensions = decoratorresources.MakeDecoratorExtensionsMap(a.ExtensionsBase64) - // Convert base64 encoded json logging.Config to logging.Config. - a.loggingConfig = psresources.Base64ToLoggingConfig(a.LoggingConfigBase64) - - // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. - a.metricsConfig = psresources.Base64ToMetricsOptions(a.MetricsConfigBase64) - // Receive Events on Pub/Sub. if a.inbound == nil { if a.inbound, err = a.newPubSubClient(ctx); err != nil { @@ -140,14 +120,6 @@ func (a *Adapter) Start(ctx context.Context) error { } } - logger, _ := logging.NewLoggerFromConfig(a.loggingConfig, component) - defer flush(logger) - ctx = logging.WithLogger(ctx, logger) - - if err := metrics.UpdateExporter(*a.metricsConfig, logger); err != nil { - return fmt.Errorf("failed to update metrics exporter: %s", err.Error()) - } - return a.inbound.StartReceiver(ctx, a.receive) } @@ -285,8 +257,3 @@ func (a *Adapter) obsNewHTTPClient(ctx context.Context, target string) (cloudeve // Use the transport to make a new CloudEvents client. return cloudevents.NewClient(t) } - -func flush(logger *zap.SugaredLogger) { - _ = logger.Sync() - metrics.FlushExporter() -} diff --git a/pkg/reconciler/pullsubscription/resources/receive_adapter.go b/pkg/reconciler/pullsubscription/resources/receive_adapter.go index cb94e79ac9..a3151dd4c4 100644 --- a/pkg/reconciler/pullsubscription/resources/receive_adapter.go +++ b/pkg/reconciler/pullsubscription/resources/receive_adapter.go @@ -76,7 +76,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, Name: GenerateSubscriptionName(args.Source), - Labels: args.Labels, // TODO: not sure we should use labels like this. + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(args.Source)}, Annotations: map[string]string{}, }, From d48fbafea3bd4dbe220a0863e550aa9e8cc90bd1 Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 28 Aug 2019 10:54:58 -0700 Subject: [PATCH 6/8] return an error from base64 methods. --- cmd/pubsub/receive_adapter/main.go | 10 ++++- .../pullsubscription/resources/config.go | 33 ++++++++++----- .../pullsubscription/resources/config_test.go | 42 ++++++++++++++----- 3 files changed, 61 insertions(+), 24 deletions(-) diff --git a/cmd/pubsub/receive_adapter/main.go b/cmd/pubsub/receive_adapter/main.go index 94f34c8a32..986ef2b64b 100644 --- a/cmd/pubsub/receive_adapter/main.go +++ b/cmd/pubsub/receive_adapter/main.go @@ -51,10 +51,16 @@ func main() { } // Convert base64 encoded json logging.Config to logging.Config. - loggingConfig := resources.Base64ToLoggingConfig(startable.LoggingConfigBase64) + loggingConfig, err := resources.Base64ToLoggingConfig(startable.LoggingConfigBase64) + if err != nil { + panic(err) + } // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. - metricsConfig := resources.Base64ToMetricsOptions(startable.MetricsConfigBase64) + metricsConfig, err := resources.Base64ToMetricsOptions(startable.MetricsConfigBase64) + if err != nil { + panic(err) + } logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) defer flush(logger) diff --git a/pkg/reconciler/pullsubscription/resources/config.go b/pkg/reconciler/pullsubscription/resources/config.go index d7e618b142..be1c097d3c 100644 --- a/pkg/reconciler/pullsubscription/resources/config.go +++ b/pkg/reconciler/pullsubscription/resources/config.go @@ -19,6 +19,7 @@ package resources import ( "encoding/json" "fmt" + "github.com/pkg/errors" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "strconv" @@ -28,22 +29,26 @@ var zapLoggerConfig = "zap-logger-config" // Base64ToMetricsOptions converts a json+base64 string of a // metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. -func Base64ToMetricsOptions(base64 string) *metrics.ExporterOptions { +func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { var opts metrics.ExporterOptions if base64 == "" { - return &opts + return nil, errors.New("base64 metrics string is empty") } quoted64 := strconv.Quote(string(base64)) var bytes []byte // Do not care about the unmarshal error. - _ = json.Unmarshal([]byte(quoted64), &bytes) + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } // Do not care about the unmarshal error. - _ = json.Unmarshal(bytes, &opts) + if err := json.Unmarshal(bytes, &opts); err != nil { + return nil, err + } - return &opts + return &opts, nil } // MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 @@ -73,27 +78,33 @@ func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { // Base64ToLoggingConfig converts a json+base64 string of a logging.Config. // Returns a non-nil logging.Config always. -func Base64ToLoggingConfig(base64 string) *logging.Config { +func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { if base64 == "" { - return &logging.Config{} + return nil, errors.New("base64 logging string is empty") } quoted64 := strconv.Quote(string(base64)) var bytes []byte // Do not care about the unmarshal error. - _ = json.Unmarshal([]byte(quoted64), &bytes) + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } var configMap map[string]string // Do not care about the unmarshal error. - _ = json.Unmarshal(bytes, &configMap) + if err := json.Unmarshal(bytes, &configMap); err != nil { + return nil, err + } cfg, err := logging.NewConfigFromMap(configMap) if err != nil { // Get the default config from logging package. - cfg, _ = logging.NewConfigFromMap(map[string]string{}) + if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } } - return cfg + return cfg, nil } // LoggingConfigToBase64 converts a logging.Config to a json+base64 string. diff --git a/pkg/reconciler/pullsubscription/resources/config_test.go b/pkg/reconciler/pullsubscription/resources/config_test.go index 2c1fa82d8c..642ab0b07c 100644 --- a/pkg/reconciler/pullsubscription/resources/config_test.go +++ b/pkg/reconciler/pullsubscription/resources/config_test.go @@ -27,12 +27,14 @@ import ( func TestMetricsOptions(t *testing.T) { testCases := map[string]struct { - opts *metrics.ExporterOptions - want string + opts *metrics.ExporterOptions + want string + wantErr string }{ "nil": { - opts: nil, - want: "", + opts: nil, + want: "", + wantErr: "base64 metrics string is empty", }, "happy": { opts: &metrics.ExporterOptions{ @@ -60,9 +62,17 @@ func TestMetricsOptions(t *testing.T) { } } // Test to options. - if tc.opts != nil { + { want := tc.opts - got := Base64ToMetricsOptions(base64) + got, gotErr := Base64ToMetricsOptions(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } if diff := cmp.Diff(want, got); diff != "" { t.Errorf("unexpected (-want, +got) = %v", diff) @@ -75,12 +85,14 @@ func TestMetricsOptions(t *testing.T) { func TestLoggingConfig(t *testing.T) { testCases := map[string]struct { - cfg *logging.Config - want string + cfg *logging.Config + want string + wantErr string }{ "nil": { - cfg: nil, - want: "", + cfg: nil, + want: "", + wantErr: "base64 logging string is empty", }, "happy": { cfg: &logging.Config{ @@ -105,7 +117,15 @@ func TestLoggingConfig(t *testing.T) { // Test to config. if tc.cfg != nil { want := tc.cfg - got := Base64ToLoggingConfig(base64) + got, gotErr := Base64ToLoggingConfig(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } if diff := cmp.Diff(want, got); diff != "" { t.Errorf("unexpected (-want, +got) = %v", diff) From d0cc550d8786bcf3c68dd0b42d0e76c527aa440d Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Wed, 28 Aug 2019 10:58:04 -0700 Subject: [PATCH 7/8] update deps. --- Gopkg.lock | 1 + 1 file changed, 1 insertion(+) diff --git a/Gopkg.lock b/Gopkg.lock index b8c8e80f0e..712de508ca 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1292,6 +1292,7 @@ "github.com/google/uuid", "github.com/jcrossley3/manifestival/pkg/manifestival", "github.com/kelseyhightower/envconfig", + "github.com/pkg/errors", "go.opencensus.io/metric/metricexport", "go.opencensus.io/stats", "go.opencensus.io/stats/view", From fe9337fd578e5ffa12666cb26bec26d802f55dbf Mon Sep 17 00:00:00 2001 From: Scott Nichols Date: Thu, 29 Aug 2019 15:13:01 -0700 Subject: [PATCH 8/8] remove the panic, nothing to panic here. --- cmd/pubsub/receive_adapter/main.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/cmd/pubsub/receive_adapter/main.go b/cmd/pubsub/receive_adapter/main.go index 986ef2b64b..3239e6c7ea 100644 --- a/cmd/pubsub/receive_adapter/main.go +++ b/cmd/pubsub/receive_adapter/main.go @@ -53,19 +53,24 @@ func main() { // Convert base64 encoded json logging.Config to logging.Config. loggingConfig, err := resources.Base64ToLoggingConfig(startable.LoggingConfigBase64) if err != nil { - panic(err) + fmt.Printf("[ERROR] filed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } } + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) + ctx := logging.WithLogger(signals.NewContext(), logger) + // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. metricsConfig, err := resources.Base64ToMetricsOptions(startable.MetricsConfigBase64) if err != nil { - panic(err) + logger.Errorf("failed to process metrics options: %s", err.Error()) } - logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) - defer flush(logger) - ctx := logging.WithLogger(signals.NewContext(), logger) - mainMetrics(logger, metricsConfig) if startable.Project == "" { @@ -85,6 +90,11 @@ func main() { } func mainMetrics(logger *zap.SugaredLogger, opts *metrics.ExporterOptions) { + if opts == nil { + logger.Info("metrics disabled") + return + } + if err := metrics.UpdateExporter(*opts, logger); err != nil { log.Fatalf("Failed to create the metrics exporter: %v", err) }