From 73b7e0196e9d6500059728f9f4425ac96d7d4b0b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 3 Mar 2026 17:42:34 +0100 Subject: [PATCH] Add namespace labels to source metrics --- pkg/adapter/apiserver/adapter.go | 10 ++++++---- pkg/adapter/apiserver/adapter_injection.go | 13 +++++++------ pkg/adapter/apiserver/delegate.go | 14 ++++++++++++-- pkg/adapter/mtping/runner.go | 9 +++++++++ pkg/observability/newcontext.go | 14 ++++++++++++++ 5 files changed, 48 insertions(+), 12 deletions(-) diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 89e80d86ca9..9c517f6bd18 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -52,10 +52,11 @@ type apiServerAdapter struct { config Config - discover discovery.DiscoveryInterface - k8s dynamic.Interface - source string // TODO: who dis? - name string // TODO: who dis? + discover discovery.DiscoveryInterface + k8s dynamic.Interface + source string // TODO: who dis? + name string // TODO: who dis? + namespace string } type resourceWatchMatch struct { @@ -199,6 +200,7 @@ func (a *apiServerAdapter) setupDelegate() cache.Store { logger: a.logger, ref: a.config.EventMode == v1.ReferenceMode, apiServerSourceName: a.name, + apiServerSourceNS: a.namespace, filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), } if a.config.ResourceOwner != nil { diff --git a/pkg/adapter/apiserver/adapter_injection.go b/pkg/adapter/apiserver/adapter_injection.go index fde80a2be88..a3e09b7740e 100644 --- a/pkg/adapter/apiserver/adapter_injection.go +++ b/pkg/adapter/apiserver/adapter_injection.go @@ -67,12 +67,13 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie } return &apiServerAdapter{ - discover: kubeclient.Get(ctx).Discovery(), - k8s: dynamicclient.Get(ctx), - ce: ceClient, - source: Get(ctx), - name: env.Name, - config: config, + discover: kubeclient.Get(ctx).Discovery(), + k8s: dynamicclient.Get(ctx), + ce: ceClient, + source: Get(ctx), + name: env.Name, + namespace: env.GetNamespace(), + config: config, logger: logger, } diff --git a/pkg/adapter/apiserver/delegate.go b/pkg/adapter/apiserver/delegate.go index 3bc362261e3..e4a4ee61e42 100644 --- a/pkg/adapter/apiserver/delegate.go +++ b/pkg/adapter/apiserver/delegate.go @@ -22,9 +22,11 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/google/uuid" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/adapter/apiserver/events" "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/observability" ) type resourceDelegate struct { @@ -32,9 +34,9 @@ type resourceDelegate struct { source string ref bool apiServerSourceName string + apiServerSourceNS string filter eventfilter.Filter - - logger *zap.SugaredLogger + logger *zap.SugaredLogger } var _ cache.Store = (*resourceDelegate)(nil) @@ -82,6 +84,14 @@ func (a *resourceDelegate) sendCloudEvent(ctx context.Context, event cloudevents subject := event.Context.GetSubject() a.logger.Debugf("sending cloudevent id: %s, source: %s, subject: %s", event.ID(), source, subject) + // Add labels to context so otelhttp picks them up for metrics + ctx = observability.WithLabeler(ctx) + ctx = observability.WithSourceLabels(ctx, types.NamespacedName{ + Name: a.apiServerSourceName, + Namespace: a.apiServerSourceNS, + }) + ctx = observability.WithMinimalEventLabels(ctx, &event) + if result := a.ce.Send(ctx, event); !cloudevents.IsACK(result) { a.logger.Errorw("failed to send cloudevent", zap.Error(result), zap.String("source", source), zap.String("subject", subject), zap.String("id", event.ID())) diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 7947396d0bc..a3a7a50abee 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -28,6 +28,7 @@ import ( "github.com/robfig/cron/v3" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -152,6 +153,14 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, client kncloudevents.Clie a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target) + // Add labels to context so otelhttp picks them up for metrics + ctx = observability.WithLabeler(ctx) + ctx = observability.WithSourceLabels(ctx, types.NamespacedName{ + Name: src.Name, + Namespace: src.Namespace, + }) + ctx = observability.WithMinimalEventLabels(ctx, &event) + if result := client.Send(ctx, event); !cloudevents.IsACK(result) { // Exhausted number of retries. Event is lost. a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result), diff --git a/pkg/observability/newcontext.go b/pkg/observability/newcontext.go index 421deb694a7..9731d44ea13 100644 --- a/pkg/observability/newcontext.go +++ b/pkg/observability/newcontext.go @@ -168,6 +168,20 @@ func WithSinkLabels(ctx context.Context, sink types.NamespacedName, kind string) return ctx } +func WithSourceLabels(ctx context.Context, source types.NamespacedName) context.Context { + labeler, ok := otelhttp.LabelerFromContext(ctx) + if !ok { + ctx = otelhttp.ContextWithLabeler(ctx, labeler) + } + + labeler.Add( + SourceName.With(source.Name), + SourceNamespace.With(source.Namespace), + ) + + return ctx +} + func WithHTTPStatusCodeLabel(ctx context.Context, statusCode int) context.Context { labeler, ok := otelhttp.LabelerFromContext(ctx) if !ok {