diff --git a/Gopkg.lock b/Gopkg.lock index 2efc26c539..6aff172f06 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1188,6 +1188,7 @@ "logging/testing", "metrics", "metrics/metricskey", + "metrics/testing", "profiling", "ptr", "reconciler/testing", @@ -1291,6 +1292,7 @@ "github.com/google/uuid", "github.com/jcrossley3/manifestival/pkg/manifestival", "github.com/kelseyhightower/envconfig", + "github.com/pkg/errors", "go.opencensus.io/metric/metricexport", "go.opencensus.io/stats", "go.opencensus.io/stats/view", @@ -1370,6 +1372,7 @@ "knative.dev/pkg/logging", "knative.dev/pkg/logging/testing", "knative.dev/pkg/metrics", + "knative.dev/pkg/metrics/testing", "knative.dev/pkg/profiling", "knative.dev/pkg/ptr", "knative.dev/pkg/reconciler/testing", diff --git a/cmd/pubsub/receive_adapter/main.go b/cmd/pubsub/receive_adapter/main.go index 00bf0fc54d..3239e6c7ea 100644 --- a/cmd/pubsub/receive_adapter/main.go +++ b/cmd/pubsub/receive_adapter/main.go @@ -18,6 +18,7 @@ package main import ( "flag" + "fmt" "log" "time" @@ -28,6 +29,7 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents/datacodec/xml" transporthttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" "github.com/google/knative-gcp/pkg/pubsub/adapter" + "github.com/google/knative-gcp/pkg/reconciler/pullsubscription/resources" "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" @@ -43,18 +45,33 @@ const ( func main() { flag.Parse() - sl, _ := logging.NewLogger("", "INFO") // TODO: use logging config map. - logger := sl.Desugar() - defer logger.Sync() - - ctx := logging.WithLogger(signals.NewContext(), logger.Sugar()) - startable := adapter.Adapter{} if err := envconfig.Process("", &startable); err != nil { - logger.Fatal("Failed to process env var", zap.Error(err)) + panic(fmt.Sprintf("Failed to process env var: %s", err)) + } + + // Convert base64 encoded json logging.Config to logging.Config. + loggingConfig, err := resources.Base64ToLoggingConfig(startable.LoggingConfigBase64) + if err != nil { + fmt.Printf("[ERROR] filed to process logging config: %s", err.Error()) + // Use default logging config. + if loggingConfig, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + // If this fails, there is no recovering. + panic(err) + } } - mainMetrics(sl) + logger, _ := logging.NewLoggerFromConfig(loggingConfig, component) + defer flush(logger) + ctx := logging.WithLogger(signals.NewContext(), logger) + + // Convert base64 encoded json metrics.ExporterOptions to metrics.ExporterOptions. + metricsConfig, err := resources.Base64ToMetricsOptions(startable.MetricsConfigBase64) + if err != nil { + logger.Errorf("failed to process metrics options: %s", err.Error()) + } + + mainMetrics(logger, metricsConfig) if startable.Project == "" { project, err := metadata.ProjectID() @@ -72,17 +89,13 @@ func main() { } } -func mainMetrics(logger *zap.SugaredLogger) { - cfg := map[string]string{ - "metrics.backend-destination": "prometheus", // TODO: hard code for now while we test. +func mainMetrics(logger *zap.SugaredLogger, opts *metrics.ExporterOptions) { + if opts == nil { + logger.Info("metrics disabled") + return } - if err := metrics.UpdateExporter( - metrics.ExporterOptions{ - Domain: metrics.Domain(), - Component: component, - ConfigMap: cfg, - }, logger); err != nil { + if err := metrics.UpdateExporter(*opts, logger); err != nil { log.Fatalf("Failed to create the metrics exporter: %v", err) } @@ -93,9 +106,15 @@ func mainMetrics(logger *zap.SugaredLogger) { json.LatencyView, xml.LatencyView, datacodec.LatencyView, + adapter.LatencyView, ); err != nil { log.Fatalf("Failed to register views: %v", err) } view.SetReportingPeriod(2 * time.Second) } + +func flush(logger *zap.SugaredLogger) { + _ = logger.Sync() + metrics.FlushExporter() +} diff --git a/pkg/pubsub/adapter/adapter.go b/pkg/pubsub/adapter/adapter.go index 29a5e8c483..65fa11a7f2 100644 --- a/pkg/pubsub/adapter/adapter.go +++ b/pkg/pubsub/adapter/adapter.go @@ -19,10 +19,8 @@ package adapter import ( "context" "fmt" - - "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" - cloudevents "github.com/cloudevents/sdk-go" + "github.com/cloudevents/sdk-go/pkg/cloudevents/observability" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport" "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" cepubsub "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/pubsub" @@ -31,7 +29,7 @@ import ( "github.com/google/knative-gcp/pkg/kncloudevents" "github.com/google/knative-gcp/pkg/pubsub/adapter/converters" - "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" + decoratorresources "github.com/google/knative-gcp/pkg/reconciler/decorator/resources" ) // Adapter implements the Pub/Sub adapter to deliver Pub/Sub messages from a @@ -55,10 +53,10 @@ type Adapter struct { // subscription to use. Subscription string `envconfig:"PUBSUB_SUBSCRIPTION_ID" required:"true"` - // ExtensionsBased64 is a based64 encoded json string of a map of + // ExtensionsBase64 is a based64 encoded json string of a map of // CloudEvents extensions (key-value pairs) override onto the outbound // event. - ExtensionsBased64 string `envconfig:"K_CE_EXTENSIONS" required:"true"` + ExtensionsBase64 string `envconfig:"K_CE_EXTENSIONS" required:"true"` // extensions is the converted ExtensionsBased64 value. extensions map[string]string @@ -67,8 +65,15 @@ type Adapter struct { // One of [binary, structured, push]. Default: binary SendMode converters.ModeType `envconfig:"SEND_MODE" default:"binary" required:"true"` - // MetricsDomain holds the metrics domain to use for surfacing metrics. - MetricsDomain string `envconfig:"METRICS_DOMAIN" required:"true"` + // MetricsConfigBase64 is a base64 encoded json string of metrics.ExporterOptions. + // This is used to configure the metrics exporter options, the config is + // stored in a config map inside the controllers namespace and copied here. + MetricsConfigBase64 string `envconfig:"K_METRICS_CONFIG" required:"true"` + + // LoggingConfigBase64 is a base64 encoded json string of logging.Config. + // This is used to configure the logging config, the config is stored in + // a config map inside the controllers namespace and copied here. + LoggingConfigBase64 string `envconfig:"K_LOGGING_CONFIG" required:"true"` // inbound is the cloudevents client to use to receive events. inbound cloudevents.Client @@ -90,7 +95,7 @@ func (a *Adapter) Start(ctx context.Context) error { // Convert base64 encoded json map to extensions map. // This implementation comes from the Decorator object. - a.extensions = resources.MakeDecoratorExtensionsMap(a.ExtensionsBased64) + a.extensions = decoratorresources.MakeDecoratorExtensionsMap(a.ExtensionsBase64) // Receive Events on Pub/Sub. if a.inbound == nil { diff --git a/pkg/reconciler/pullsubscription/controller.go b/pkg/reconciler/pullsubscription/controller.go index 35f673c22b..d1e2943bf9 100644 --- a/pkg/reconciler/pullsubscription/controller.go +++ b/pkg/reconciler/pullsubscription/controller.go @@ -18,6 +18,7 @@ package pullsubscription import ( "context" + "knative.dev/pkg/metrics" "github.com/kelseyhightower/envconfig" "go.uber.org/zap" @@ -97,5 +98,8 @@ func NewController( c.tracker = tracker.New(impl.EnqueueKey, controller.GetTrackerLease(ctx)) + cmw.Watch(logging.ConfigMapName(), c.UpdateFromLoggingConfigMap) + cmw.Watch(metrics.ConfigMapName(), c.UpdateFromMetricsConfigMap) + return impl } diff --git a/pkg/reconciler/pullsubscription/controller_test.go b/pkg/reconciler/pullsubscription/controller_test.go index 7754329345..d2ede8df7b 100644 --- a/pkg/reconciler/pullsubscription/controller_test.go +++ b/pkg/reconciler/pullsubscription/controller_test.go @@ -17,18 +17,26 @@ limitations under the License. package pullsubscription import ( + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" "os" "testing" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/configmap" logtesting "knative.dev/pkg/logging/testing" . "knative.dev/pkg/reconciler/testing" + "knative.dev/pkg/system" - // Fake injection informers + _ "knative.dev/pkg/metrics/testing" - _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" + // Fake injection informers _ "knative.dev/pkg/injection/informers/kubeinformers/appsv1/deployment/fake" _ "knative.dev/pkg/injection/informers/kubeinformers/batchv1/job/fake" + + _ "github.com/google/knative-gcp/pkg/client/injection/informers/pubsub/v1alpha1/pullsubscription/fake" ) func TestNew(t *testing.T) { @@ -38,7 +46,22 @@ func TestNew(t *testing.T) { _ = os.Setenv("PUBSUB_RA_IMAGE", "PUBSUB_RA_IMAGE") _ = os.Setenv("PUBSUB_SUB_IMAGE", "PUBSUB_SUB_IMAGE") - c := NewController(ctx, configmap.NewFixedWatcher()) + c := NewController(ctx, configmap.NewStaticWatcher( + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: logging.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, + &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: metrics.ConfigMapName(), + Namespace: system.Namespace(), + }, + Data: map[string]string{}, + }, + )) if c == nil { t.Fatal("Expected NewController to return a non-nil value") diff --git a/pkg/reconciler/pullsubscription/pullsubscription.go b/pkg/reconciler/pullsubscription/pullsubscription.go index 0fdbce2084..53487efe8f 100644 --- a/pkg/reconciler/pullsubscription/pullsubscription.go +++ b/pkg/reconciler/pullsubscription/pullsubscription.go @@ -19,6 +19,7 @@ package pullsubscription import ( "context" "encoding/json" + "knative.dev/pkg/metrics" "time" appsv1 "k8s.io/api/apps/v1" @@ -54,6 +55,8 @@ const ( // ReconcilerName is the name of the reconciler ReconcilerName = "PullSubscriptions" + component = "pullsubscriptions" + finalizerName = controllerAgentName ) @@ -70,6 +73,9 @@ type Reconciler struct { receiveAdapterImage string + loggingConfig *logging.Config + metricsConfig *metrics.ExporterOptions + // eventTypeReconciler eventtype.Reconciler // TODO: event types. } @@ -408,6 +414,8 @@ func (r *Reconciler) createOrUpdateReceiveAdapter(ctx context.Context, src *v1al SubscriptionID: src.Status.SubscriptionID, SinkURI: src.Status.SinkURI, TransformerURI: src.Status.TransformerURI, + LoggingConfig: resources.LoggingConfigToBase64(r.loggingConfig), + MetricsConfig: resources.MetricsOptionsToBase64(r.metricsConfig), }) if existing == nil { @@ -447,6 +455,35 @@ func (r *Reconciler) getReceiveAdapter(ctx context.Context, src *v1alpha1.PullSu return nil, apierrors.NewNotFound(schema.GroupResource{}, "") } +func (r *Reconciler) UpdateFromLoggingConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + logcfg, err := logging.NewConfigFromConfigMap(cfg) + if err != nil { + r.Logger.Warn("failed to create logging config from configmap", zap.String("cfg.Name", cfg.Name)) + return + } + r.loggingConfig = logcfg + r.Logger.Infow("Update from logging ConfigMap", zap.Any("ConfigMap", cfg)) + // TODO: requeue all pullsubscriptions +} + +func (r *Reconciler) UpdateFromMetricsConfigMap(cfg *corev1.ConfigMap) { + if cfg != nil { + delete(cfg.Data, "_example") + } + + r.metricsConfig = &metrics.ExporterOptions{ + Domain: metrics.Domain(), + Component: component, + ConfigMap: cfg.Data, + } + r.Logger.Infow("Update from metrics ConfigMap", zap.Any("ConfigMap", cfg)) + // TODO: requeue all pullsubscriptions +} + // TODO: Registry //func (r *Reconciler) reconcileEventTypes(ctx context.Context, src *v1alpha1.PullSubscription) error { // args := r.newEventTypeReconcilerArgs(src) diff --git a/pkg/reconciler/pullsubscription/resources/config.go b/pkg/reconciler/pullsubscription/resources/config.go new file mode 100644 index 0000000000..be1c097d3c --- /dev/null +++ b/pkg/reconciler/pullsubscription/resources/config.go @@ -0,0 +1,134 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "encoding/json" + "fmt" + "github.com/pkg/errors" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + "strconv" +) + +var zapLoggerConfig = "zap-logger-config" + +// Base64ToMetricsOptions converts a json+base64 string of a +// metrics.ExporterOptions. Returns a non-nil metrics.ExporterOptions always. +func Base64ToMetricsOptions(base64 string) (*metrics.ExporterOptions, error) { + var opts metrics.ExporterOptions + if base64 == "" { + return nil, errors.New("base64 metrics string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &opts); err != nil { + return nil, err + } + + return &opts, nil +} + +// MetricsOptionsToBase64 converts a metrics.ExporterOptions to a json+base64 +// string. +func MetricsOptionsToBase64(opts *metrics.ExporterOptions) string { + if opts == nil { + return "" + } + + jsonOpts, err := json.Marshal(opts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Opts, err := json.Marshal(jsonOpts) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Opts)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} + +// Base64ToLoggingConfig converts a json+base64 string of a logging.Config. +// Returns a non-nil logging.Config always. +func Base64ToLoggingConfig(base64 string) (*logging.Config, error) { + if base64 == "" { + return nil, errors.New("base64 logging string is empty") + } + + quoted64 := strconv.Quote(string(base64)) + + var bytes []byte + // Do not care about the unmarshal error. + if err := json.Unmarshal([]byte(quoted64), &bytes); err != nil { + return nil, err + } + + var configMap map[string]string + // Do not care about the unmarshal error. + if err := json.Unmarshal(bytes, &configMap); err != nil { + return nil, err + } + + cfg, err := logging.NewConfigFromMap(configMap) + if err != nil { + // Get the default config from logging package. + if cfg, err = logging.NewConfigFromMap(map[string]string{}); err != nil { + return nil, err + } + } + return cfg, nil +} + +// LoggingConfigToBase64 converts a logging.Config to a json+base64 string. +func LoggingConfigToBase64(cfg *logging.Config) string { + if cfg == nil || cfg.LoggingConfig == "" { + return "" + } + + jsonCfg, err := json.Marshal(map[string]string{ + zapLoggerConfig: cfg.LoggingConfig, + }) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // if we json.Marshal a []byte, we will get back a base64 encoded quoted string. + base64Cfg, err := json.Marshal(jsonCfg) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + + base64, err := strconv.Unquote(string(base64Cfg)) + if err != nil { + return fmt.Sprintf(`{"error":"%s}`, err.Error()) + } + // Turn the base64 encoded []byte back into a string. + return base64 +} diff --git a/pkg/reconciler/pullsubscription/resources/config_test.go b/pkg/reconciler/pullsubscription/resources/config_test.go new file mode 100644 index 0000000000..642ab0b07c --- /dev/null +++ b/pkg/reconciler/pullsubscription/resources/config_test.go @@ -0,0 +1,137 @@ +/* +Copyright 2019 Google LLC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resources + +import ( + "go.uber.org/zap/zapcore" + "knative.dev/pkg/logging" + "knative.dev/pkg/metrics" + "testing" + + "github.com/google/go-cmp/cmp" +) + +func TestMetricsOptions(t *testing.T) { + testCases := map[string]struct { + opts *metrics.ExporterOptions + want string + wantErr string + }{ + "nil": { + opts: nil, + want: "", + wantErr: "base64 metrics string is empty", + }, + "happy": { + opts: &metrics.ExporterOptions{ + Domain: "domain", + Component: "component", + PrometheusPort: 9090, + ConfigMap: map[string]string{ + "foo": "bar", + "boosh": "kakow", + }, + }, + want: "eyJEb21haW4iOiJkb21haW4iLCJDb21wb25lbnQiOiJjb21wb25lbnQiLCJQcm9tZXRoZXVzUG9ydCI6OTA5MCwiQ29uZmlnTWFwIjp7ImJvb3NoIjoia2Frb3ciLCJmb28iOiJiYXIifX0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := MetricsOptionsToBase64(tc.opts) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to options. + { + want := tc.opts + got, gotErr := Base64ToMetricsOptions(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} + +func TestLoggingConfig(t *testing.T) { + testCases := map[string]struct { + cfg *logging.Config + want string + wantErr string + }{ + "nil": { + cfg: nil, + want: "", + wantErr: "base64 logging string is empty", + }, + "happy": { + cfg: &logging.Config{ + LoggingConfig: "{}", + LoggingLevel: map[string]zapcore.Level{}, + }, + want: "eyJ6YXAtbG9nZ2VyLWNvbmZpZyI6Int9In0=", + }, + } + for n, tc := range testCases { + t.Run(n, func(t *testing.T) { + base64 := LoggingConfigToBase64(tc.cfg) + // Test to base64. + { + want := tc.want + got := base64 + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + // Test to config. + if tc.cfg != nil { + want := tc.cfg + got, gotErr := Base64ToLoggingConfig(base64) + + if gotErr != nil { + if diff := cmp.Diff(tc.wantErr, gotErr.Error()); diff != "" { + t.Errorf("unexpected err (-want, +got) = %v", diff) + } + } else if tc.wantErr != "" { + t.Errorf("expected err %v", tc.wantErr) + } + + if diff := cmp.Diff(want, got); diff != "" { + t.Errorf("unexpected (-want, +got) = %v", diff) + t.Log(got) + } + } + }) + } +} diff --git a/pkg/reconciler/pullsubscription/resources/receive_adapter.go b/pkg/reconciler/pullsubscription/resources/receive_adapter.go index 35c9125bb6..a3151dd4c4 100644 --- a/pkg/reconciler/pullsubscription/resources/receive_adapter.go +++ b/pkg/reconciler/pullsubscription/resources/receive_adapter.go @@ -39,6 +39,8 @@ type ReceiveAdapterArgs struct { SubscriptionID string SinkURI string TransformerURI string + MetricsConfig string + LoggingConfig string } const ( @@ -74,7 +76,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { ObjectMeta: metav1.ObjectMeta{ Namespace: args.Source.Namespace, Name: GenerateSubscriptionName(args.Source), - Labels: args.Labels, // TODO: not sure we should use labels like this. + Labels: args.Labels, OwnerReferences: []metav1.OwnerReference{*kmeta.NewControllerRef(args.Source)}, Annotations: map[string]string{}, }, @@ -92,9 +94,6 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Name: "receive-adapter", Image: args.Image, Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: credsFile, }, { @@ -118,6 +117,12 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { }, { Name: "K_CE_EXTENSIONS", Value: ceExtensions, + }, { + Name: "K_METRICS_CONFIG", + Value: args.MetricsConfig, + }, { + Name: "K_LOGGING_CONFIG", + Value: args.LoggingConfig, }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume, diff --git a/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go b/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go index b4a67c2c16..ed86d97d19 100644 --- a/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go +++ b/pkg/reconciler/pullsubscription/resources/receive_adapter_test.go @@ -53,6 +53,8 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { }, SubscriptionID: "sub-id", SinkURI: "sink-uri", + LoggingConfig: "LoggingConfig-ABC123", + MetricsConfig: "MetricsConfig-ABC123", }) one := int32(1) @@ -94,9 +96,6 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { Name: "receive-adapter", Image: "test-image", Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/var/secrets/google/eventing-secret-key", }, { @@ -118,6 +117,12 @@ func TestMakeMinimumReceiveAdapter(t *testing.T) { Value: "binary", }, { Name: "K_CE_EXTENSIONS", + }, { + Name: "K_METRICS_CONFIG", + Value: "MetricsConfig-ABC123", + }, { + Name: "K_LOGGING_CONFIG", + Value: "LoggingConfig-ABC123", }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume, @@ -175,6 +180,8 @@ func TestMakeFullReceiveAdapter(t *testing.T) { SubscriptionID: "sub-id", SinkURI: "sink-uri", TransformerURI: "transformer-uri", + LoggingConfig: "LoggingConfig-ABC123", + MetricsConfig: "MetricsConfig-ABC123", }) one := int32(1) @@ -216,9 +223,6 @@ func TestMakeFullReceiveAdapter(t *testing.T) { Name: "receive-adapter", Image: "test-image", Env: []corev1.EnvVar{{ - Name: "METRICS_DOMAIN", - Value: "pubsub.cloud.run/pullsubscriptions/adapter", - }, { Name: "GOOGLE_APPLICATION_CREDENTIALS", Value: "/var/secrets/google/eventing-secret-key", }, { @@ -242,6 +246,12 @@ func TestMakeFullReceiveAdapter(t *testing.T) { }, { Name: "K_CE_EXTENSIONS", Value: "eyJmb28iOiJiYXIifQ==", + }, { + Name: "K_METRICS_CONFIG", + Value: "MetricsConfig-ABC123", + }, { + Name: "K_LOGGING_CONFIG", + Value: "LoggingConfig-ABC123", }}, VolumeMounts: []corev1.VolumeMount{{ Name: credsVolume,