diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 6ded61139e5..5276b6f0a5e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -227,7 +227,16 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + var brokerRef, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -239,7 +248,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, requestType: "reply_forward", } @@ -256,7 +265,16 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + var brokerRef, brokerNamespace string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -298,6 +316,13 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + var brokerRef string + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + } else { + brokerRef = trigger.Spec.Broker + } + triggerRef := types.NamespacedName{ Name: trigger.Name, Namespace: trigger.Namespace, @@ -321,7 +346,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), requestType: "filter", }