From 4127b0ad96fd53fb553084b21347def3724b9e46 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Tue, 14 Nov 2023 09:22:21 -0500 Subject: [PATCH 1/8] Implement the auth check in the event receiver --- pkg/channel/event_receiver.go | 36 +++++++++++++++++++ pkg/channel/fanout/fanout_event_handler.go | 20 ++++++++++- .../fanout/fanout_event_handler_test.go | 2 ++ .../multi_channel_fanout_event_handler.go | 2 +- ...multi_channel_fanout_event_handler_test.go | 2 +- .../dispatcher/inmemorychannel.go | 4 +++ .../dispatcher/inmemorychannel_test.go | 6 ++-- 7 files changed, 66 insertions(+), 6 deletions(-) diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 86ff96cf146..2307eb45786 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -20,9 +20,13 @@ import ( "context" "errors" "fmt" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" nethttp "net/http" "time" + "knative.dev/eventing/pkg/channel/fanout" + "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol/http" "go.uber.org/zap" @@ -65,6 +69,7 @@ type EventReceiver struct { hostToChannelFunc ResolveChannelFromHostFunc pathToChannelFunc ResolveChannelFromPathFunc reporter StatsReporter + handler *fanout.FanoutEventHandler } // EventReceiverFunc is the function to be called for handling the event. @@ -153,6 +158,8 @@ func (r *EventReceiver) Start(ctx context.Context) error { } func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { + + ctx := request.Context() response.Header().Set("Allow", "POST, OPTIONS") if request.Method == nethttp.MethodOptions { response.Header().Set("WebHook-Allowed-Origin", "*") // Accept from any Origin: @@ -218,6 +225,35 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth return } + /// Here we do the OIDC audience verification + features := feature.FromContext(ctx) + if features.IsOIDCAuthentication() { + r.logger.Debug("OIDC authentication is enabled") + + channel := r.handler.GetChannelObject() + + if channel.Status.Address.Audience == nil { + r.logger.Warn(fmt.Sprintf("Audience of channel %s/%s must not be nil, while feature %s is enabled", channel.Name, channel.Namespace, feature.OIDCAuthentication)) + response.WriteHeader(nethttp.StatusInternalServerError) + return + } + + token := auth.GetJWTFromHeader(request.Header) + if token == "" { + r.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + + if _, err := r.handler.TokenVerifier.VerifyJWT(ctx, token, *channel.Status.Address.Audience); err != nil { + r.logger.Warn("no valid JWT provided", zap.Error(err)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + + r.logger.Debug("Request contained a valid JWT. Continuing...") + } + err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) if err != nil { if _, ok := err.(*UnknownChannelError); ok { diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 6f82030e4ea..2e41c0c4f99 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -24,6 +24,8 @@ package fanout import ( "context" "errors" + v1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/auth" nethttp "net/http" "sync" "time" @@ -83,6 +85,8 @@ type FanoutEventHandler struct { eventDispatcher *kncloudevents.Dispatcher + TokenVerifier *auth.OIDCTokenVerifier + // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, // rather than a member variable. timeout time.Duration @@ -91,6 +95,7 @@ type FanoutEventHandler struct { logger *zap.Logger eventTypeHandler *eventtype.EventTypeAutoHandler channelAddressable *duckv1.KReference + imc *v1.InMemoryChannel channelUID *types.UID } @@ -101,9 +106,12 @@ func NewFanoutEventHandler( reporter channel.StatsReporter, eventTypeHandler *eventtype.EventTypeAutoHandler, channelAddressable *duckv1.KReference, + imc *v1.InMemoryChannel, channelUID *types.UID, eventDispatcher *kncloudevents.Dispatcher, + TokenVerifier *auth.OIDCTokenVerifier, receiverOpts ...channel.EventReceiverOptions, + ) (*FanoutEventHandler, error) { handler := &FanoutEventHandler{ logger: logger, @@ -114,12 +122,13 @@ func NewFanoutEventHandler( channelAddressable: channelAddressable, channelUID: channelUID, eventDispatcher: eventDispatcher, + TokenVerifier: TokenVerifier, } handler.subscriptions = make([]Subscription, len(config.Subscriptions)) copy(handler.subscriptions, config.Subscriptions) // The receiver function needs to point back at the handler itself, so set it up after // initialization. - receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, receiverOpts...) + receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, handler, receiverOpts...) if err != nil { return nil, err } @@ -163,6 +172,15 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil } +func (f *FanoutEventHandler) GetChannelObject() *v1.InMemoryChannel { + // convert the channel addressable to a channel object + channelObject := &v1.InMemoryChannel{} + channelObject.SetName(f.channelAddressable.Name) + channelObject.SetNamespace(f.channelAddressable.Namespace) + channelObject.SetUID(*f.channelUID) + return channelObject +} + func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscription) { f.subscriptionsMutex.Lock() defer f.subscriptionsMutex.Unlock() diff --git a/pkg/channel/fanout/fanout_event_handler_test.go b/pkg/channel/fanout/fanout_event_handler_test.go index 44ab95ccf82..93fe6e79d74 100644 --- a/pkg/channel/fanout/fanout_event_handler_test.go +++ b/pkg/channel/fanout/fanout_event_handler_test.go @@ -390,7 +390,9 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event nil, nil, nil, + nil, dispatcher, + nil, recvOptionFunc, ) <-calledChan diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go index 20bbeb7d3e9..d2d441a817e 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go @@ -74,7 +74,7 @@ func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Confi if key == "" { continue } - handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, eventDispatcher, recvOptions...) + handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, nil, cc.ChannelUID, eventDispatcher, nil, recvOptions...) if err != nil { logger.Error("Failed creating new fanout handler.", zap.Error(err)) return nil, err diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go index 455d3fc9746..058c8a6f4c5 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go @@ -123,7 +123,7 @@ func TestNewEventHandler(t *testing.T) { if h != nil { t.Errorf("Found handler for %q but not expected", handlerName) } - f, err := fanout.NewFanoutEventHandler(logger, fanout.Config{}, reporter, nil, nil, nil, dispatcher) + f, err := fanout.NewFanoutEventHandler(logger, fanout.Config{}, reporter, nil, nil, nil, nil, dispatcher, nil, nil) if err != nil { t.Error("Failed to create FanoutMessagHandler: ", err) } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index cb1024dec5c..2ef628ab2b5 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -121,8 +121,10 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec r.reporter, eventTypeAutoHandler, channelRef, + imc, UID, r.eventDispatcher, + nil, ) if err != nil { logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) @@ -150,8 +152,10 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec r.reporter, eventTypeAutoHandler, channelRef, + imc, UID, r.eventDispatcher, + nil, channel.ResolveChannelFromPath(channel.ParseChannelFromPath), ) if err != nil { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 5a1ebb34884..471d6d70219 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -502,7 +502,7 @@ func TestReconciler_ReconcileKind(t *testing.T) { dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) // Just run the tests once with no existing handler (creates the handler) and once // with an existing, so we exercise both paths at once. - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) if err != nil { t.Error(err) } @@ -551,7 +551,7 @@ func TestReconciler_InvalidInputs(t *testing.T) { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) if err != nil { t.Error(err) } @@ -585,7 +585,7 @@ func TestReconciler_Deletion(t *testing.T) { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) if err != nil { t.Error(err) } From ae495fb0f2cdab10ca8abcec2859fc7e946e4594 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Thu, 23 Nov 2023 23:08:56 +0100 Subject: [PATCH 2/8] Reject request for wrong audience in IMC event_receiver (#1) --- pkg/channel/event_receiver.go | 42 ++++++++++++------- pkg/channel/fanout/fanout_event_handler.go | 23 ++-------- .../fanout/fanout_event_handler_test.go | 2 - .../multi_channel_fanout_event_handler.go | 2 +- ...multi_channel_fanout_event_handler_test.go | 2 +- .../inmemorychannel/dispatcher/controller.go | 1 + .../dispatcher/inmemorychannel.go | 18 ++++++-- .../dispatcher/inmemorychannel_test.go | 6 +-- 8 files changed, 51 insertions(+), 45 deletions(-) diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 2307eb45786..8d9ed752194 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -20,12 +20,11 @@ import ( "context" "errors" "fmt" - "knative.dev/eventing/pkg/apis/feature" - "knative.dev/eventing/pkg/auth" nethttp "net/http" "time" - "knative.dev/eventing/pkg/channel/fanout" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" "github.com/cloudevents/sdk-go/v2/event" "github.com/cloudevents/sdk-go/v2/protocol/http" @@ -69,7 +68,9 @@ type EventReceiver struct { hostToChannelFunc ResolveChannelFromHostFunc pathToChannelFunc ResolveChannelFromPathFunc reporter StatsReporter - handler *fanout.FanoutEventHandler + tokenVerifier *auth.OIDCTokenVerifier + audience string + withContext func(context.Context) context.Context } // EventReceiverFunc is the function to be called for handling the event. @@ -105,6 +106,21 @@ func ResolveChannelFromPath(PathToChannelFunc ResolveChannelFromPathFunc) EventR } } +func OIDCTokenVerification(tokenVerifier *auth.OIDCTokenVerifier, audience string) EventReceiverOptions { + return func(r *EventReceiver) error { + r.tokenVerifier = tokenVerifier + r.audience = audience + return nil + } +} + +func ReceiverWithContextFunc(fn func(context.Context) context.Context) EventReceiverOptions { + return func(r *EventReceiver) error { + r.withContext = fn + return nil + } +} + // NewEventReceiver creates an event receiver passing new events to the // receiverFunc. func NewEventReceiver(receiverFunc EventReceiverFunc, logger *zap.Logger, reporter StatsReporter, opts ...EventReceiverOptions) (*EventReceiver, error) { @@ -158,8 +174,12 @@ func (r *EventReceiver) Start(ctx context.Context) error { } func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *nethttp.Request) { - ctx := request.Context() + + if r.withContext != nil { + ctx = r.withContext(ctx) + } + response.Header().Set("Allow", "POST, OPTIONS") if request.Method == nethttp.MethodOptions { response.Header().Set("WebHook-Allowed-Origin", "*") // Accept from any Origin: @@ -230,14 +250,6 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth if features.IsOIDCAuthentication() { r.logger.Debug("OIDC authentication is enabled") - channel := r.handler.GetChannelObject() - - if channel.Status.Address.Audience == nil { - r.logger.Warn(fmt.Sprintf("Audience of channel %s/%s must not be nil, while feature %s is enabled", channel.Name, channel.Namespace, feature.OIDCAuthentication)) - response.WriteHeader(nethttp.StatusInternalServerError) - return - } - token := auth.GetJWTFromHeader(request.Header) if token == "" { r.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication)) @@ -245,13 +257,15 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth return } - if _, err := r.handler.TokenVerifier.VerifyJWT(ctx, token, *channel.Status.Address.Audience); err != nil { + if _, err := r.tokenVerifier.VerifyJWT(ctx, token, r.audience); err != nil { r.logger.Warn("no valid JWT provided", zap.Error(err)) response.WriteHeader(nethttp.StatusUnauthorized) return } r.logger.Debug("Request contained a valid JWT. Continuing...") + } else { + r.logger.Debug("OIDC authentication is disabled") } err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 2e41c0c4f99..20d870244da 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -24,8 +24,6 @@ package fanout import ( "context" "errors" - v1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/auth" nethttp "net/http" "sync" "time" @@ -85,8 +83,6 @@ type FanoutEventHandler struct { eventDispatcher *kncloudevents.Dispatcher - TokenVerifier *auth.OIDCTokenVerifier - // TODO: Plumb context through the receiver and dispatcher and use that to store the timeout, // rather than a member variable. timeout time.Duration @@ -95,7 +91,6 @@ type FanoutEventHandler struct { logger *zap.Logger eventTypeHandler *eventtype.EventTypeAutoHandler channelAddressable *duckv1.KReference - imc *v1.InMemoryChannel channelUID *types.UID } @@ -105,11 +100,9 @@ func NewFanoutEventHandler( config Config, reporter channel.StatsReporter, eventTypeHandler *eventtype.EventTypeAutoHandler, - channelAddressable *duckv1.KReference, - imc *v1.InMemoryChannel, + channelRef *duckv1.KReference, channelUID *types.UID, eventDispatcher *kncloudevents.Dispatcher, - TokenVerifier *auth.OIDCTokenVerifier, receiverOpts ...channel.EventReceiverOptions, ) (*FanoutEventHandler, error) { @@ -119,16 +112,15 @@ func NewFanoutEventHandler( reporter: reporter, asyncHandler: config.AsyncHandler, eventTypeHandler: eventTypeHandler, - channelAddressable: channelAddressable, + channelAddressable: channelRef, channelUID: channelUID, eventDispatcher: eventDispatcher, - TokenVerifier: TokenVerifier, } handler.subscriptions = make([]Subscription, len(config.Subscriptions)) copy(handler.subscriptions, config.Subscriptions) // The receiver function needs to point back at the handler itself, so set it up after // initialization. - receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, handler, receiverOpts...) + receiver, err := channel.NewEventReceiver(createEventReceiverFunction(handler), logger, reporter, receiverOpts...) if err != nil { return nil, err } @@ -172,15 +164,6 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil } -func (f *FanoutEventHandler) GetChannelObject() *v1.InMemoryChannel { - // convert the channel addressable to a channel object - channelObject := &v1.InMemoryChannel{} - channelObject.SetName(f.channelAddressable.Name) - channelObject.SetNamespace(f.channelAddressable.Namespace) - channelObject.SetUID(*f.channelUID) - return channelObject -} - func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscription) { f.subscriptionsMutex.Lock() defer f.subscriptionsMutex.Unlock() diff --git a/pkg/channel/fanout/fanout_event_handler_test.go b/pkg/channel/fanout/fanout_event_handler_test.go index 93fe6e79d74..44ab95ccf82 100644 --- a/pkg/channel/fanout/fanout_event_handler_test.go +++ b/pkg/channel/fanout/fanout_event_handler_test.go @@ -390,9 +390,7 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event nil, nil, nil, - nil, dispatcher, - nil, recvOptionFunc, ) <-calledChan diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go index d2d441a817e..20bbeb7d3e9 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler.go @@ -74,7 +74,7 @@ func NewEventHandlerWithConfig(_ context.Context, logger *zap.Logger, conf Confi if key == "" { continue } - handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, nil, cc.ChannelUID, eventDispatcher, nil, recvOptions...) + handler, err := fanout.NewFanoutEventHandler(logger, cc.FanoutConfig, reporter, cc.EventTypeHandler, cc.ChannelAddressable, cc.ChannelUID, eventDispatcher, recvOptions...) if err != nil { logger.Error("Failed creating new fanout handler.", zap.Error(err)) return nil, err diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go index 058c8a6f4c5..455d3fc9746 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go @@ -123,7 +123,7 @@ func TestNewEventHandler(t *testing.T) { if h != nil { t.Errorf("Found handler for %q but not expected", handlerName) } - f, err := fanout.NewFanoutEventHandler(logger, fanout.Config{}, reporter, nil, nil, nil, nil, dispatcher, nil, nil) + f, err := fanout.NewFanoutEventHandler(logger, fanout.Config{}, reporter, nil, nil, nil, dispatcher) if err != nil { t.Error("Failed to create FanoutMessagHandler: ", err) } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index d344455a17a..d3dac4d8c79 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -128,6 +128,7 @@ func NewController( eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider), + tokenVerifier: auth.NewOIDCTokenVerifier(ctx), } impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 2ef628ab2b5..a988c515943 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -37,6 +37,7 @@ import ( eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/messaging/v1" + "knative.dev/eventing/pkg/auth" "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" "knative.dev/eventing/pkg/channel/multichannelfanout" @@ -57,6 +58,7 @@ type Reconciler struct { eventingClient eventingv1beta2.EventingV1beta2Interface featureStore *feature.Store eventDispatcher *kncloudevents.Dispatcher + tokenVerifier *auth.OIDCTokenVerifier } // Check the interfaces Reconciler should implement @@ -111,6 +113,10 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec UID = &imc.UID } + wc := func(ctx context.Context) context.Context { + return r.featureStore.ToContext(ctx) + } + // First grab the host based MultiChannelFanoutMessage httpHandler httpHandler := r.multiChannelEventHandler.GetChannelHandler(config.HostName) if httpHandler == nil { @@ -121,10 +127,10 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec r.reporter, eventTypeAutoHandler, channelRef, - imc, UID, r.eventDispatcher, - nil, + channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)), + channel.ReceiverWithContextFunc(wc), ) if err != nil { logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) @@ -152,11 +158,11 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec r.reporter, eventTypeAutoHandler, channelRef, - imc, UID, r.eventDispatcher, - nil, channel.ResolveChannelFromPath(channel.ParseChannelFromPath), + channel.OIDCTokenVerification(r.tokenVerifier, audience(imc)), + channel.ReceiverWithContextFunc(wc), ) if err != nil { logging.FromContext(ctx).Error("Failed to create a new fanout.EventHandler", err) @@ -291,3 +297,7 @@ func toKReference(imc *v1.InMemoryChannel) *duckv1.KReference { Address: imc.Status.Address.Name, } } + +func audience(imc *v1.InMemoryChannel) string { + return auth.GetAudience(v1.SchemeGroupVersion.WithKind("InMemoryChannel"), imc.ObjectMeta) +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 471d6d70219..5a1ebb34884 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -502,7 +502,7 @@ func TestReconciler_ReconcileKind(t *testing.T) { dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) // Just run the tests once with no existing handler (creates the handler) and once // with an existing, so we exercise both paths at once. - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) if err != nil { t.Error(err) } @@ -551,7 +551,7 @@ func TestReconciler_InvalidInputs(t *testing.T) { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) if err != nil { t.Error(err) } @@ -585,7 +585,7 @@ func TestReconciler_Deletion(t *testing.T) { oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) - fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, nil, dispatcher, nil) + fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) if err != nil { t.Error(err) } From 256a41de6d6b3826d678a05bac959eb82446bb68 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Thu, 23 Nov 2023 17:14:39 -0500 Subject: [PATCH 3/8] Running the rekt test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Christoph Stäbler --- test/auth/oidc_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/auth/oidc_test.go b/test/auth/oidc_test.go index e1c813e2f75..096f5c8fa10 100644 --- a/test/auth/oidc_test.go +++ b/test/auth/oidc_test.go @@ -75,7 +75,7 @@ func TestChannelImplSupportsOIDC(t *testing.T) { name := feature.MakeRandomK8sName("channelimpl") env.Prerequisite(ctx, t, channel.ImplGoesReady(name)) - env.Test(ctx, t, oidc.AddressableHasAudiencePopulated(channel_impl.GVR(), channel_impl.GVK().Kind, name, env.Namespace())) + env.TestSet(ctx, t, oidc.AddressableOIDCConformance(channel_impl.GVR(), channel_impl.GVK().Kind, name, env.Namespace())) } func TestParallelSupportsOIDC(t *testing.T) { From 115605f1a8c643c046f442b3f45a47f982d2a60d Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 24 Nov 2023 14:48:49 -0500 Subject: [PATCH 4/8] Update pkg/channel/fanout/fanout_event_handler.go Co-authored-by: Calum Murray --- pkg/channel/fanout/fanout_event_handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index c861b9beee2..24aacb62c99 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -112,7 +112,7 @@ func NewFanoutEventHandler( reporter: reporter, asyncHandler: config.AsyncHandler, eventTypeHandler: eventTypeHandler, - channelAddressable: channelRef, + channelRef: channelRef, channelUID: channelUID, eventDispatcher: eventDispatcher, } From 095bf1354a9369ff83560cbff0a669d8e2194603 Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 24 Nov 2023 16:16:11 -0500 Subject: [PATCH 5/8] Update the variable naming Signed-off-by: Leo Li --- pkg/channel/event_receiver.go | 23 +---------------- pkg/channel/fanout/fanout_event_handler.go | 30 +++++++++++----------- 2 files changed, 16 insertions(+), 37 deletions(-) diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index 8d9ed752194..f45e4a26b56 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -23,7 +23,6 @@ import ( nethttp "net/http" "time" - "knative.dev/eventing/pkg/apis/feature" "knative.dev/eventing/pkg/auth" "github.com/cloudevents/sdk-go/v2/event" @@ -246,27 +245,7 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth } /// Here we do the OIDC audience verification - features := feature.FromContext(ctx) - if features.IsOIDCAuthentication() { - r.logger.Debug("OIDC authentication is enabled") - - token := auth.GetJWTFromHeader(request.Header) - if token == "" { - r.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication)) - response.WriteHeader(nethttp.StatusUnauthorized) - return - } - - if _, err := r.tokenVerifier.VerifyJWT(ctx, token, r.audience); err != nil { - r.logger.Warn("no valid JWT provided", zap.Error(err)) - response.WriteHeader(nethttp.StatusUnauthorized) - return - } - - r.logger.Debug("Request contained a valid JWT. Continuing...") - } else { - r.logger.Debug("OIDC authentication is disabled") - } + auth.ValidateEventAuthHeader(ctx, request) err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) if err != nil { diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index c256f3ffb5e..f59361fe398 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -88,11 +88,11 @@ type FanoutEventHandler struct { // rather than a member variable. timeout time.Duration - reporter channel.StatsReporter - logger *zap.Logger - eventTypeHandler *eventtype.EventTypeAutoHandler - channelAddressable *duckv1.KReference - channelUID *types.UID + reporter channel.StatsReporter + logger *zap.Logger + eventTypeHandler *eventtype.EventTypeAutoHandler + channelRef *duckv1.KReference + channelUID *types.UID } // NewFanoutEventHandler creates a new fanout.EventHandler. @@ -108,14 +108,14 @@ func NewFanoutEventHandler( ) (*FanoutEventHandler, error) { handler := &FanoutEventHandler{ - logger: logger, - timeout: defaultTimeout, - reporter: reporter, - asyncHandler: config.AsyncHandler, - eventTypeHandler: eventTypeHandler, - channelRef: channelRef, - channelUID: channelUID, - eventDispatcher: eventDispatcher, + logger: logger, + timeout: defaultTimeout, + reporter: reporter, + asyncHandler: config.AsyncHandler, + eventTypeHandler: eventTypeHandler, + channelRef: channelRef, + channelUID: channelUID, + eventDispatcher: eventDispatcher, } handler.subscriptions = make([]Subscription, len(config.Subscriptions)) copy(handler.subscriptions, config.Subscriptions) @@ -185,7 +185,7 @@ func (f *FanoutEventHandler) GetSubscriptions(ctx context.Context) []Subscriptio } func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event.Event) { - if f.channelAddressable == nil { + if f.channelRef == nil { f.logger.Warn("No addressable for channel") return } else { @@ -193,7 +193,7 @@ func (f *FanoutEventHandler) autoCreateEventType(ctx context.Context, evnt event f.logger.Warn("No channelUID provided, unable to autocreate event type") return } - err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelAddressable, *f.channelUID) + err := f.eventTypeHandler.AutoCreateEventType(ctx, &evnt, f.channelRef, *f.channelUID) if err != nil { f.logger.Warn("EventTypeCreate failed") return From 05b1eb2fbddbc5419615fe55761b89aca5b9b73b Mon Sep 17 00:00:00 2001 From: Leo Li Date: Fri, 24 Nov 2023 16:17:05 -0500 Subject: [PATCH 6/8] Refactor the jwt event auth header verification Signed-off-by: Leo Li --- pkg/auth/token_verifier.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index a37f8060012..9618fd69a2c 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -158,3 +158,18 @@ type openIDMetadata struct { SubjectTypes []string `json:"subject_types_supported"` SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } + +func ValidateEventAuthHeader(ctx context.Context, request *http.Request) error { + features := feature.FromContext(ctx) + if features.IsOIDCAuthentication() { + token := GetJWTFromHeader(request.Header) + if token == "" { + return fmt.Errorf("no JWT in %s header provided while feature %s is enabled", AuthHeaderKey, feature.OIDCAuthentication) + } + tokenVerifier := NewOIDCTokenVerifier(ctx) + if _, err := tokenVerifier.VerifyJWT(ctx, token, "audience"); err != nil { + return fmt.Errorf("no valid JWT provided: %w", err) + } + } + return nil +} From c01f0731bb450559f9ffb3f6a7d753d75007c71a Mon Sep 17 00:00:00 2001 From: Leo Li Date: Mon, 4 Dec 2023 01:38:18 -0500 Subject: [PATCH 7/8] Revert the refactoring Signed-off-by: Leo Li --- pkg/auth/token_verifier.go | 15 --------------- pkg/channel/event_receiver.go | 23 ++++++++++++++++++++++- 2 files changed, 22 insertions(+), 16 deletions(-) diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index 9618fd69a2c..a37f8060012 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -158,18 +158,3 @@ type openIDMetadata struct { SubjectTypes []string `json:"subject_types_supported"` SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } - -func ValidateEventAuthHeader(ctx context.Context, request *http.Request) error { - features := feature.FromContext(ctx) - if features.IsOIDCAuthentication() { - token := GetJWTFromHeader(request.Header) - if token == "" { - return fmt.Errorf("no JWT in %s header provided while feature %s is enabled", AuthHeaderKey, feature.OIDCAuthentication) - } - tokenVerifier := NewOIDCTokenVerifier(ctx) - if _, err := tokenVerifier.VerifyJWT(ctx, token, "audience"); err != nil { - return fmt.Errorf("no valid JWT provided: %w", err) - } - } - return nil -} diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index f45e4a26b56..cc26e094955 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "knative.dev/eventing/pkg/apis/feature" nethttp "net/http" "time" @@ -245,7 +246,27 @@ func (r *EventReceiver) ServeHTTP(response nethttp.ResponseWriter, request *neth } /// Here we do the OIDC audience verification - auth.ValidateEventAuthHeader(ctx, request) + features := feature.FromContext(ctx) + if features.IsOIDCAuthentication() { + r.logger.Debug("OIDC authentication is enabled") + + token := auth.GetJWTFromHeader(request.Header) + if token == "" { + r.logger.Warn(fmt.Sprintf("No JWT in %s header provided while feature %s is enabled", auth.AuthHeaderKey, feature.OIDCAuthentication)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + + if _, err := r.tokenVerifier.VerifyJWT(ctx, token, r.audience); err != nil { + r.logger.Warn("no valid JWT provided", zap.Error(err)) + response.WriteHeader(nethttp.StatusUnauthorized) + return + } + + r.logger.Debug("Request contained a valid JWT. Continuing...") + } else { + r.logger.Debug("OIDC authentication is disabled") + } err = r.receiverFunc(request.Context(), channel, *event, utils.PassThroughHeaders(request.Header)) if err != nil { From 27f45176b45d8eadf95e5d39ac4496755863d7ba Mon Sep 17 00:00:00 2001 From: Leo Li Date: Mon, 4 Dec 2023 11:48:37 -0500 Subject: [PATCH 8/8] Fix the linter issue Signed-off-by: Leo Li --- pkg/channel/event_receiver.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/channel/event_receiver.go b/pkg/channel/event_receiver.go index cc26e094955..f39e8facc32 100644 --- a/pkg/channel/event_receiver.go +++ b/pkg/channel/event_receiver.go @@ -20,10 +20,11 @@ import ( "context" "errors" "fmt" - "knative.dev/eventing/pkg/apis/feature" nethttp "net/http" "time" + "knative.dev/eventing/pkg/apis/feature" + "knative.dev/eventing/pkg/auth" "github.com/cloudevents/sdk-go/v2/event"