diff --git a/pkg/buses/stub/bus.go b/pkg/buses/stub/bus.go index f45b9fafb00..1e67fefaad8 100644 --- a/pkg/buses/stub/bus.go +++ b/pkg/buses/stub/bus.go @@ -82,7 +82,7 @@ func (b *StubBus) addChannel(channelRef buses.ChannelReference, parameters buses b.channels[channelRef] = &stubChannel{ parameters: parameters, subscriptions: make(map[buses.SubscriptionReference]*stubSubscription), - logger: b.logger, + logger: b.logger.With(zap.String("channels.knative.dev/channel", channelRef.String())), } } } @@ -105,21 +105,23 @@ type stubChannel struct { func (c *stubChannel) receiveMessage(message *buses.Message) { for _, stubSubscription := range c.subscriptions { - go func() { - err := stubSubscription.dispatchMessage(message) - if err != nil { - c.logger.Warnf("Failed to dispatch message: %v", err) - } - }() + go stubSubscription.dispatchMessage(message) } } func (c *stubChannel) addSubscription(subscriptionRef buses.SubscriptionReference, parameters buses.ResolvedParameters, bus buses.BusDispatcher) { - // create or update subscription - c.subscriptions[subscriptionRef] = &stubSubscription{ - bus: bus, - parameters: parameters, - subscriptionRef: subscriptionRef, + if subscription, ok := c.subscriptions[subscriptionRef]; ok { + // update subscription + subscription.parameters = parameters + } else { + // create subscription + c.subscriptions[subscriptionRef] = &stubSubscription{ + bus: bus, + parameters: parameters, + subscriptionRef: subscriptionRef, + + logger: c.logger.With(zap.String("channels.knative.dev/subscription", subscriptionRef.String())), + } } } @@ -131,8 +133,14 @@ type stubSubscription struct { bus buses.BusDispatcher parameters buses.ResolvedParameters subscriptionRef buses.SubscriptionReference + + logger *zap.SugaredLogger } func (s *stubSubscription) dispatchMessage(message *buses.Message) error { - return s.bus.DispatchMessage(s.subscriptionRef, message) + err := s.bus.DispatchMessage(s.subscriptionRef, message) + if err != nil { + s.logger.Warnf("Failed to dispatch message: %v", err) + } + return err }