From 49aa6bcfa79e099637e4ab60ebab4c56c6f0997d Mon Sep 17 00:00:00 2001 From: yijie-04 Date: Thu, 9 May 2024 23:51:45 -0400 Subject: [PATCH 1/3] data plane changes --- pkg/broker/filter/filter_handler.go | 35 +++++++++++++++++++++++++---- 1 file changed, 31 insertions(+), 4 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 6ee15200be9..69e93b4ed2e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -72,6 +72,11 @@ const ( skipTTL = -1 ) +var ( + brokerNamespace string + brokerRef string +) + // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time @@ -227,7 +232,15 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -239,7 +252,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, requestType: "reply_forward", } @@ -256,7 +269,15 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { - broker, err := h.brokerLister.Brokers(trigger.Namespace).Get(trigger.Spec.Broker) + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + brokerNamespace = trigger.Spec.BrokerRef.Namespace + } else { + brokerRef = trigger.Spec.Broker + brokerNamespace = trigger.Namespace + } + + broker, err := h.brokerLister.Brokers(brokerNamespace).Get(brokerRef) if err != nil { h.logger.Info("Unable to get the Broker", zap.Error(err)) writer.WriteHeader(http.StatusBadRequest) @@ -298,6 +319,12 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { + brokerRef = trigger.Spec.BrokerRef.Name + } else { + brokerRef = trigger.Spec.Broker + } + triggerRef := types.NamespacedName{ Name: trigger.Name, Namespace: trigger.Namespace, @@ -321,7 +348,7 @@ func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger reportArgs := &ReportArgs{ ns: trigger.Namespace, trigger: trigger.Name, - broker: trigger.Spec.Broker, + broker: brokerRef, filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"), requestType: "filter", } From ba4d1e50cb3edaff1204930a82ca0ee326c0969e Mon Sep 17 00:00:00 2001 From: yijie-04 Date: Fri, 17 May 2024 13:30:26 -0400 Subject: [PATCH 2/3] moved global variables inside function --- pkg/broker/filter/filter_handler.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 90550617782..2d999d1ad59 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -72,11 +72,6 @@ const ( skipTTL = -1 ) -var ( - brokerNamespace string - brokerRef string -) - // Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Handler struct { // reporter reports stats of status code and dispatch time @@ -232,6 +227,7 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + var brokerRef, brokerNamespace string if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { brokerRef = trigger.Spec.BrokerRef.Name brokerNamespace = trigger.Spec.BrokerRef.Namespace From d164904c4220d061863ad059a55b367b66c1b5cc Mon Sep 17 00:00:00 2001 From: yijie-04 Date: Fri, 17 May 2024 13:34:59 -0400 Subject: [PATCH 3/3] updated other functions --- pkg/broker/filter/filter_handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 2d999d1ad59..5276b6f0a5e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -265,6 +265,7 @@ func (h *Handler) handleDispatchToReplyRequest(ctx context.Context, trigger *eve } func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + var brokerRef, brokerNamespace string if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { brokerRef = trigger.Spec.BrokerRef.Name brokerNamespace = trigger.Spec.BrokerRef.Namespace @@ -315,6 +316,7 @@ func (h *Handler) handleDispatchToDLSRequest(ctx context.Context, trigger *event } func (h *Handler) handleDispatchToSubscriberRequest(ctx context.Context, trigger *eventingv1.Trigger, writer http.ResponseWriter, request *http.Request, event *event.Event) { + var brokerRef string if feature.FromContext(ctx).IsEnabled(feature.CrossNamespaceEventLinks) && trigger.Spec.BrokerRef.Namespace != "" { brokerRef = trigger.Spec.BrokerRef.Name } else {