diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index da0bf159669..4e022925128 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -20,13 +20,13 @@ import ( "context" "time" + "knative.dev/pkg/injection" + "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/channel/multichannelfanout" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/injection" - "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "knative.dev/pkg/kmeta" @@ -124,8 +124,11 @@ func NewController( inmemorychannelInformer.Informer().AddEventHandler( cache.FilteringResourceEventHandler{ FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), - Handler: controller.HandleAll(impl.Enqueue), - }) + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: impl.Enqueue, + UpdateFunc: controller.PassNew(impl.Enqueue), + DeleteFunc: r.deleteFunc, + }}) // Start the dispatcher. go func() { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index ffea4b7c28e..8fb96e1f90e 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "knative.dev/pkg/apis/duck" + "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -93,17 +94,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) return r.patchSubscriberStatus(ctx, imc) } -func (r *Reconciler) FinalizeKind(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event { - if imc.Status.Address != nil && - imc.Status.Address.URL != nil { - if hostName := imc.Status.Address.URL.Host; hostName != "" { - logging.FromContext(ctx).Info("Removing dispatcher") - r.multiChannelMessageHandler.DeleteChannelHandler(hostName) - } - } - return nil -} - func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemoryChannel) error { after := imc.DeepCopy() @@ -159,3 +149,22 @@ func (r *Reconciler) newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*mult }, }, nil } + +func (r *Reconciler) deleteFunc(obj interface{}) { + if obj == nil { + return + } + acc, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + return + } + imc, ok := acc.(*v1.InMemoryChannel) + if !ok || imc == nil { + return + } + if imc.Status.Address != nil && imc.Status.Address.URL != nil { + if hostName := imc.Status.Address.URL.Host; hostName != "" { + r.multiChannelMessageHandler.DeleteChannelHandler(hostName) + } + } +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 3d2fcf86a6c..627040dd884 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -132,13 +132,6 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewInMemoryChannel(imcName, testNS, WithInitInMemoryChannelConditions), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "updated configuration, one channel", Key: imcKey, @@ -151,13 +144,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "with subscribers", Key: imcKey, @@ -172,14 +158,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, twoSubscriberPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "with subscribers, patch fails", Key: imcKey, @@ -197,11 +177,9 @@ func TestAllCases(t *testing.T) { InduceFailure("patch", "inmemorychannels/status"), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, twoSubscriberPatch), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), Eventf(corev1.EventTypeWarning, "InternalError", "Failed patching: inducing failure for patch inmemorychannels"), }, WantErr: true, @@ -220,14 +198,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber2UID), subscriber2Generation), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "with subscribers, one removed one added to status", Key: imcKey, @@ -244,14 +214,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, oneSubscriberRemovedOneAddedPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "subscriber with delivery spec", Key: imcKey, @@ -281,13 +245,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, oneSubscriberPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "subscriber with invalid delivery spec", Key: imcKey, @@ -316,11 +275,7 @@ func TestAllCases(t *testing.T) { }), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), Eventf(corev1.EventTypeWarning, "InternalError", "failed to parse Spec.BackoffDelay: expected 'P' period mark at the start: garbage"), }, WantErr: true, @@ -354,13 +309,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithInMemoryChannelDeleted), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchRemoveFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, } @@ -529,10 +477,41 @@ func TestReconciler_ReconcileKind(t *testing.T) { } } -func TestReconciler_FinalizeKind(t *testing.T) { +func TestReconciler_InvalidInputs(t *testing.T) { testCases := map[string]struct { - imc *v1.InMemoryChannel - wantResult reconciler.Event + imc interface{} + }{ + "nil": {}, + "With no address": { + imc: NewInMemoryChannel(imcName, testNS, WithInMemoryChannelDeleted), + }, + "With invalid type": { + imc: &subscriber1, + }, + } + for n, tc := range testCases { + fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil) + if err != nil { + t.Error(err) + } + for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} { + t.Run("handler-"+n, func(t *testing.T) { + handler := newFakeMultiChannelHandler() + if fanoutHandler != nil { + handler.SetChannelHandler(channelServiceAddress, fanoutHandler) + } + r := &Reconciler{ + multiChannelMessageHandler: handler, + } + r.deleteFunc(tc.imc) + }) + } + } +} + +func TestReconciler_Deletion(t *testing.T) { + testCases := map[string]struct { + imc *v1.InMemoryChannel }{ "With address": { imc: NewInMemoryChannel(imcName, testNS, @@ -554,10 +533,7 @@ func TestReconciler_FinalizeKind(t *testing.T) { r := &Reconciler{ multiChannelMessageHandler: handler, } - e := r.FinalizeKind(context.TODO(), tc.imc) - if e != tc.wantResult { - t.Errorf("Results differ, want %v have %v", tc.wantResult, e) - } + r.deleteFunc(tc.imc) if handler.GetChannelHandler(channelServiceAddress) != nil { t.Error("Got handler") } @@ -576,16 +552,6 @@ func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl { } } -func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - return makePatch(namespace, name, - `{"metadata":{"finalizers":["`+finalizerName+`"],"resourceVersion":""}}`) -} - -func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - return makePatch(namespace, name, - `{"metadata":{"finalizers":[],"resourceVersion":""}}`) -} - type fakeMultiChannelHandler struct { handlers map[string]fanout.MessageHandler }