From 54644a8d563b381f67ecf9ef5792b59186e119e6 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Tue, 14 Nov 2023 13:08:59 -0500 Subject: [PATCH] Don't use async handler (#7415) * Don't use async handler Signed-off-by: Calum Murray * Fix Kn-Namespace header, potential race condition for sync receiver Signed-off-by: Calum Murray * try refactoring test Signed-off-by: Calum Murray * take 2 on test fix Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- pkg/channel/fanout/fanout_event_handler.go | 4 ++-- pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go | 2 +- test/rekt/broker_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/channel/fanout/fanout_event_handler.go b/pkg/channel/fanout/fanout_event_handler.go index 6f82030e4ea..b6c6b5ac8bc 100644 --- a/pkg/channel/fanout/fanout_event_handler.go +++ b/pkg/channel/fanout/fanout_event_handler.go @@ -55,7 +55,7 @@ type Subscription struct { // Config for a fanout.EventHandler. type Config struct { Subscriptions []Subscription `json:"subscriptions"` - // AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. + // Deprecated: AsyncHandler controls whether the Subscriptions are called synchronous or asynchronously. // It is expected to be false when used as a sidecar. AsyncHandler bool `json:"asyncHandler,omitempty"` } @@ -240,6 +240,7 @@ func createEventReceiverFunction(f *FanoutEventHandler) func(context.Context, ch reportArgs := channel.ReportArgs{} reportArgs.EventType = event.Type() reportArgs.Ns = ref.Namespace + additionalHeaders.Set(apis.KnNamespaceHeader, ref.Namespace) dispatchResultForFanout := f.dispatch(ctx, subs, event, additionalHeaders) return ParseDispatchResultAndReportMetrics(dispatchResultForFanout, f.reporter, reportArgs) } @@ -302,7 +303,6 @@ func (f *FanoutEventHandler) dispatch(ctx context.Context, subs []Subscription, if dispatchResult.err != nil { f.logger.Error("Fanout had an error", zap.Error(dispatchResult.err)) dispatchResultForFanout.err = dispatchResult.err - return dispatchResultForFanout } case <-time.After(f.timeout): f.logger.Error("Fanout timed out") diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index cb1024dec5c..00becbd9f9f 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -226,7 +226,7 @@ func newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*multichannelfanout.C HostName: imc.Status.Address.URL.Host, Path: fmt.Sprintf("%s/%s", imc.Namespace, imc.Name), FanoutConfig: fanout.Config{ - AsyncHandler: true, + AsyncHandler: false, Subscriptions: subs, }, }, nil diff --git a/test/rekt/broker_test.go b/test/rekt/broker_test.go index 86532aa03c9..a6b2409c442 100644 --- a/test/rekt/broker_test.go +++ b/test/rekt/broker_test.go @@ -139,8 +139,8 @@ func TestBrokerConformance(t *testing.T) { // Install and wait for a Ready Broker. env.Prerequisite(ctx, t, broker.GoesReady("default", b.WithEnvConfig()...)) - env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) env.TestSet(ctx, t, broker.DataPlaneConformance("default")) + env.TestSet(ctx, t, broker.ControlPlaneConformance("default", b.WithEnvConfig()...)) } func TestBrokerDefaultDelivery(t *testing.T) {