From 8dd3e6b5afb0755f633457bac4e15b6a47367ea5 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Fri, 27 Oct 2023 10:30:50 -0400 Subject: [PATCH 1/4] Don't use async handler Signed-off-by: Calum Murray --- pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index cb1024dec5c..00becbd9f9f 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -226,7 +226,7 @@ func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.C HostName: imc.Status.Address.URL.Host, Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), FanoutConfig: fanout.Config{ - AsyncHandler: true, + AsyncHandler: false, Subscriptions: subs, }, }, nil From ff97c1d883d3dcb3eb255e49a782ba14cf5f2451 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 30 Oct 2023 10:48:06 -0400 Subject: [PATCH 2/4] Fix Kn-Namespace header, potential race condition for sync receiver Signed-off-by: Calum Murray --- pkg/channel/fanout/fanout_event_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 6f82030e4ea..b6c6b5ac8bc 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -55,7 +55,7 @@ type Subscription struct { // Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` - // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. AsyncHandler bool `json:"asyncHandler,omitempty"` } @@ -240,6 +240,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch reportArgs := channel.ReportArgs{} reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace + additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace) dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders) return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) } @@ -302,7 +303,6 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription, if dispatchResult.err != nil { f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err)) dispatchResultForFanout.err = dispatchResult.err - return dispatchResultForFanout } case <-time.After(f.timeout): f.logger.Error("Fanout timed out") From 54078cdfb9182e3f7f8407c0519aaa1b0b571b61 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 30 Oct 2023 14:50:19 -0400 Subject: [PATCH 3/4] try refactoring test Signed-off-by: Calum Murray --- test/rekt/features/broker/data_plane.go | 44 +++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/test/rekt/features/broker/data_plane.go b/test/rekt/features/broker/data_plane.go index 24ad575b0b3..311ec038937 100644 --- a/test/rekt/features/broker/data_plane.go +++ b/test/rekt/features/broker/data_plane.go @@ -19,6 +19,8 @@ package broker import ( "context" + cetest "github.com/cloudevents/sdk-go/v2/test" + "github.com/google/uuid" "knative.dev/eventing/test/rekt/features/knconf" "knative.dev/eventing/test/rekt/resources/broker" @@ -50,9 +52,11 @@ func DataPlaneIngress(brokerName string) *feature.Feature { state.SetOrFail(ctx, t, "brokerName", brokerName) }) + f = withBrokerAcceptsCEVersions(f, brokerName) + f.Stable("Conformance"). - Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", - brokerAcceptsCEVersions). + //Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", + // brokerAcceptsCEVersions). May("Other versions MAY be rejected.", brokerRejectsUnknownCEVersion). ShouldNot("The Broker SHOULD NOT perform an upgrade of the produced event's CloudEvents version.", @@ -70,6 +74,42 @@ func DataPlaneIngress(brokerName string) *feature.Feature { return f } +func withBrokerAcceptsCEVersions(f *feature.Feature, brokerName string) *feature.Feature { + uuids := map[string]string{ + uuid.New().String(): "1.0", + uuid.New().String(): "0.3", + } + + for id, version := range uuids { + // We need to use a different source name, otherwise, it will try to update + // the pod, which is immutable. + source := feature.MakeRandomK8sName("source") + event := cetest.FullEvent() + event.SetID(id) + event.SetSpecVersion(version) + f.Setup("Install Source", eventshub.Install(source, + eventshub.StartSenderToResource(broker.GVR(), brokerName), + eventshub.InputEvent(event), + )) + + f.Stable("Conformance").Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", func(ctx context.Context, t feature.T) { + store := eventshub.StoreFromContext(ctx, source) + // We are looking for two events, one of them is the sent event and the other + // is Response, so correlate them first. We want to make sure the event was sent and that the + // response was what was expected. + events := knconf.Correlate(store.AssertAtLeast(ctx, t, 2, knconf.SentEventMatcher(""))) + for _, e := range events { + // Make sure HTTP response code is 2XX + if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 { + t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode) + } + } + }) + } + + return f +} + func DataPlaneAddressability(brokerName string) *feature.Feature { f := feature.NewFeatureNamed("Broker Addressability") From 0e701d3a18cc3f3c82ff73849f1109a5f54163dd Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 30 Oct 2023 16:09:25 -0400 Subject: [PATCH 4/4] take 2 on test fix Signed-off-by: Calum Murray --- test/rekt/broker_test.go | 2 +- test/rekt/features/broker/data_plane.go | 44 ++----------------------- 2 files changed, 3 insertions(+), 43 deletions(-) diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index 1152f9bb639..26b2c18fa4d 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -138,8 +138,8 @@ func TestBrokerConformance(t *testing.T) { // Install and wait for a Ready Broker. env.Prerequisite(ctx, t, broker.GoesReady("default", b.WithEnvConfig()...)) - env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) env.TestSet(ctx, t, broker.DataPlaneConformance("default")) + env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) } func TestBrokerDefaultDelivery(t *testing.T) { diff --git a/test/rekt/features/broker/data_plane.go b/test/rekt/features/broker/data_plane.go index 311ec038937..24ad575b0b3 100644 --- a/test/rekt/features/broker/data_plane.go +++ b/test/rekt/features/broker/data_plane.go @@ -19,8 +19,6 @@ package broker import ( "context" - cetest "github.com/cloudevents/sdk-go/v2/test" - "github.com/google/uuid" "knative.dev/eventing/test/rekt/features/knconf" "knative.dev/eventing/test/rekt/resources/broker" @@ -52,11 +50,9 @@ func DataPlaneIngress(brokerName string) *feature.Feature { state.SetOrFail(ctx, t, "brokerName", brokerName) }) - f = withBrokerAcceptsCEVersions(f, brokerName) - f.Stable("Conformance"). - //Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", - // brokerAcceptsCEVersions). + Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", + brokerAcceptsCEVersions). May("Other versions MAY be rejected.", brokerRejectsUnknownCEVersion). ShouldNot("The Broker SHOULD NOT perform an upgrade of the produced event's CloudEvents version.", @@ -74,42 +70,6 @@ func DataPlaneIngress(brokerName string) *feature.Feature { return f } -func withBrokerAcceptsCEVersions(f *feature.Feature, brokerName string) *feature.Feature { - uuids := map[string]string{ - uuid.New().String(): "1.0", - uuid.New().String(): "0.3", - } - - for id, version := range uuids { - // We need to use a different source name, otherwise, it will try to update - // the pod, which is immutable. - source := feature.MakeRandomK8sName("source") - event := cetest.FullEvent() - event.SetID(id) - event.SetSpecVersion(version) - f.Setup("Install Source", eventshub.Install(source, - eventshub.StartSenderToResource(broker.GVR(), brokerName), - eventshub.InputEvent(event), - )) - - f.Stable("Conformance").Must("The ingress endpoint(s) MUST conform to at least one of the following versions of the specification: 0.3, 1.0", func(ctx context.Context, t feature.T) { - store := eventshub.StoreFromContext(ctx, source) - // We are looking for two events, one of them is the sent event and the other - // is Response, so correlate them first. We want to make sure the event was sent and that the - // response was what was expected. - events := knconf.Correlate(store.AssertAtLeast(ctx, t, 2, knconf.SentEventMatcher(""))) - for _, e := range events { - // Make sure HTTP response code is 2XX - if e.Response.StatusCode < 200 || e.Response.StatusCode > 299 { - t.Errorf("Expected statuscode 2XX for sequence %d got %d", e.Response.Sequence, e.Response.StatusCode) - } - } - }) - } - - return f -} - func DataPlaneAddressability(brokerName string) *feature.Feature { f := feature.NewFeatureNamed("Broker Addressability")