Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ type apiServerAdapter struct {

config Config

discover discovery.DiscoveryInterface
k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
discover discovery.DiscoveryInterface
k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
namespace string
}

type resourceWatchMatch struct {
Expand Down Expand Up @@ -199,6 +200,7 @@ func (a *apiServerAdapter) setupDelegate() cache.Store {
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
apiServerSourceNS: a.namespace,
filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
}
if a.config.ResourceOwner != nil {
Expand Down
13 changes: 7 additions & 6 deletions pkg/adapter/apiserver/adapter_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,13 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}

return &apiServerAdapter{
discover: kubeclient.Get(ctx).Discovery(),
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,
discover: kubeclient.Get(ctx).Discovery(),
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
namespace: env.GetNamespace(),
config: config,

logger: logger,
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/adapter/apiserver/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,21 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/observability"
)

type resourceDelegate struct {
ce cloudevents.Client
source string
ref bool
apiServerSourceName string
apiServerSourceNS string
filter eventfilter.Filter

logger *zap.SugaredLogger
logger *zap.SugaredLogger
}

var _ cache.Store = (*resourceDelegate)(nil)
Expand Down Expand Up @@ -82,6 +84,14 @@ func (a *resourceDelegate) sendCloudEvent(ctx context.Context, event cloudevents
subject := event.Context.GetSubject()
a.logger.Debugf("sending cloudevent id: %s, source: %s, subject: %s", event.ID(), source, subject)

// Add labels to context so otelhttp picks them up for metrics
ctx = observability.WithLabeler(ctx)
ctx = observability.WithSourceLabels(ctx, types.NamespacedName{
Name: a.apiServerSourceName,
Namespace: a.apiServerSourceNS,
})
ctx = observability.WithMinimalEventLabels(ctx, &event)

if result := a.ce.Send(ctx, event); !cloudevents.IsACK(result) {
a.logger.Errorw("failed to send cloudevent", zap.Error(result), zap.String("source", source),
zap.String("subject", subject), zap.String("id", event.ID()))
Expand Down
9 changes: 9 additions & 0 deletions pkg/adapter/mtping/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/robfig/cron/v3"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -152,6 +153,14 @@ func (a *cronJobsRunner) cronTick(ctx context.Context, client kncloudevents.Clie

a.Logger.Debugf("sending cloudevent id: %s, source: %s, target: %s", event.ID(), source, target)

// Add labels to context so otelhttp picks them up for metrics
ctx = observability.WithLabeler(ctx)
ctx = observability.WithSourceLabels(ctx, types.NamespacedName{
Name: src.Name,
Namespace: src.Namespace,
})
ctx = observability.WithMinimalEventLabels(ctx, &event)

if result := client.Send(ctx, event); !cloudevents.IsACK(result) {
// Exhausted number of retries. Event is lost.
a.Logger.Error("failed to send cloudevent result: ", zap.Any("result", result),
Expand Down
14 changes: 14 additions & 0 deletions pkg/observability/newcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,20 @@ func WithSinkLabels(ctx context.Context, sink types.NamespacedName, kind string)
return ctx
}

func WithSourceLabels(ctx context.Context, source types.NamespacedName) context.Context {
labeler, ok := otelhttp.LabelerFromContext(ctx)
if !ok {
ctx = otelhttp.ContextWithLabeler(ctx, labeler)
}

labeler.Add(
SourceName.With(source.Name),
SourceNamespace.With(source.Namespace),
)

return ctx
}

func WithHTTPStatusCodeLabel(ctx context.Context, statusCode int) context.Context {
labeler, ok := otelhttp.LabelerFromContext(ctx)
if !ok {
Expand Down
Loading