From dcfb9155e3d9a70de30c41ecff4bb7afa382f05d Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Wed, 11 Nov 2020 17:57:24 -0800 Subject: [PATCH 1/6] fix 4509, remove finalizer --- .../dispatcher/inmemorychannel.go | 23 +++---- .../dispatcher/inmemorychannel_test.go | 66 +------------------ 2 files changed, 14 insertions(+), 75 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index ffea4b7c28e..c232cc78ae0 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -52,6 +52,18 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event { logging.FromContext(ctx).Infow("Reconciling", zap.Any("InMemoryChannel", imc)) + // If the IMC has been deleted just make sure the handler is removed. + if !imc.GetDeletionTimestamp().IsZero() { + if imc.Status.Address != nil && + imc.Status.Address.URL != nil { + if hostName := imc.Status.Address.URL.Host; hostName != "" { + logging.FromContext(ctx).Info("Removing dispatcher") + r.multiChannelMessageHandler.DeleteChannelHandler(hostName) + } + } + return nil + } + if !imc.Status.IsReady() { logging.FromContext(ctx).Debug("IMC is not ready, skipping") return nil @@ -93,17 +105,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) return r.patchSubscriberStatus(ctx, imc) } -func (r *Reconciler) FinalizeKind(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event { - if imc.Status.Address != nil && - imc.Status.Address.URL != nil { - if hostName := imc.Status.Address.URL.Host; hostName != "" { - logging.FromContext(ctx).Info("Removing dispatcher") - r.multiChannelMessageHandler.DeleteChannelHandler(hostName) - } - } - return nil -} - func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemoryChannel) error { after := imc.DeepCopy() diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 3d2fcf86a6c..cd83492866d 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -132,13 +132,6 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewInMemoryChannel(imcName, testNS, WithInitInMemoryChannelConditions), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "updated configuration, one channel", Key: imcKey, @@ -151,13 +144,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelChannelServiceReady(), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "with subscribers", Key: imcKey, @@ -172,14 +158,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, twoSubscriberPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "with subscribers, patch fails", Key: imcKey, @@ -197,11 +177,9 @@ func TestAllCases(t *testing.T) { InduceFailure("patch", "inmemorychannels/status"), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, twoSubscriberPatch), }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), Eventf(corev1.EventTypeWarning, "InternalError", "Failed patching: inducing failure for patch inmemorychannels"), }, WantErr: true, @@ -220,14 +198,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber2UID), subscriber2Generation), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "with subscribers, one removed one added to status", Key: imcKey, @@ -244,14 +214,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, oneSubscriberRemovedOneAddedPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - - WantErr: false, }, { Name: "subscriber with delivery spec", Key: imcKey, @@ -281,13 +245,8 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress)), }, WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), makePatch(testNS, imcName, oneSubscriberPatch), }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, { Name: "subscriber with invalid delivery spec", Key: imcKey, @@ -316,11 +275,7 @@ func TestAllCases(t *testing.T) { }), WithInMemoryChannelAddress(channelServiceAddress)), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchFinalizers(testNS, imcName), - }, WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), Eventf(corev1.EventTypeWarning, "InternalError", "failed to parse Spec.BackoffDelay: expected 'P' period mark at the start: garbage"), }, WantErr: true, @@ -354,13 +309,6 @@ func TestAllCases(t *testing.T) { WithInMemoryChannelAddress(channelServiceAddress), WithInMemoryChannelDeleted), }, - WantPatches: []clientgotesting.PatchActionImpl{ - patchRemoveFinalizers(testNS, imcName), - }, - WantEvents: []string{ - Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName), - }, - WantErr: false, }, } @@ -529,7 +477,7 @@ func TestReconciler_ReconcileKind(t *testing.T) { } } -func TestReconciler_FinalizeKind(t *testing.T) { +func TestReconciler_ReconcileKindDeletion(t *testing.T) { testCases := map[string]struct { imc *v1.InMemoryChannel wantResult reconciler.Event @@ -554,7 +502,7 @@ func TestReconciler_FinalizeKind(t *testing.T) { r := &Reconciler{ multiChannelMessageHandler: handler, } - e := r.FinalizeKind(context.TODO(), tc.imc) + e := r.ReconcileKind(context.TODO(), tc.imc) if e != tc.wantResult { t.Errorf("Results differ, want %v have %v", tc.wantResult, e) } @@ -576,16 +524,6 @@ func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl { } } -func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - return makePatch(namespace, name, - `{"metadata":{"finalizers":["`+finalizerName+`"],"resourceVersion":""}}`) -} - -func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionImpl { - return makePatch(namespace, name, - `{"metadata":{"finalizers":[],"resourceVersion":""}}`) -} - type fakeMultiChannelHandler struct { handlers map[string]fanout.MessageHandler } From 59bae4ba88bbabf60a686206be0ea6cc5ea5bfbd Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 16 Nov 2020 11:06:53 -0800 Subject: [PATCH 2/6] checkpoint --- pkg/reconciler/inmemorychannel/dispatcher/controller.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index da0bf159669..94f373cf313 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -122,6 +122,15 @@ func NewController( // Watch for inmemory channels. inmemorychannelInformer.Informer().AddEventHandler( + cache.ResourceEventHandlerFuncs{ + AddFunc: cache.FilteringResourceEventHandler{ + FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), + Handler: impl.Enqueue, + }, + AddFunc: h, + UpdateFunc: PassNew(h), + DeleteFunc: h, + } cache.FilteringResourceEventHandler{ FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), Handler: controller.HandleAll(impl.Enqueue), From 038cc3fa3aeac1cb67c44210d63e75eb65e497d5 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 16 Nov 2020 12:14:03 -0800 Subject: [PATCH 3/6] no finalizer --- .../inmemorychannel/dispatcher/controller.go | 19 ++++-------- .../dispatcher/inmemorychannel.go | 29 +++++++++++-------- .../dispatcher/inmemorychannel_test.go | 10 ++----- 3 files changed, 26 insertions(+), 32 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index 94f373cf313..f0229e428fe 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -18,6 +18,7 @@ package dispatcher import ( "context" + "knative.dev/pkg/injection" "time" "k8s.io/client-go/tools/cache" @@ -25,8 +26,6 @@ import ( "knative.dev/eventing/pkg/channel/multichannelfanout" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/injection" - "github.com/google/uuid" "github.com/kelseyhightower/envconfig" "knative.dev/pkg/kmeta" @@ -122,19 +121,13 @@ func NewController( // Watch for inmemory channels. inmemorychannelInformer.Informer().AddEventHandler( - cache.ResourceEventHandlerFuncs{ - AddFunc: cache.FilteringResourceEventHandler{ - FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), - Handler: impl.Enqueue, - }, - AddFunc: h, - UpdateFunc: PassNew(h), - DeleteFunc: h, - } cache.FilteringResourceEventHandler{ FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)), - Handler: controller.HandleAll(impl.Enqueue), - }) + Handler: cache.ResourceEventHandlerFuncs{ + AddFunc: impl.Enqueue, + UpdateFunc: controller.PassNew(impl.Enqueue), + DeleteFunc: r.deleteFunc, + }}) // Start the dispatcher. go func() { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index c232cc78ae0..7700e9ad238 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -29,6 +29,7 @@ import ( "go.uber.org/zap" "knative.dev/pkg/apis/duck" + "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" "knative.dev/pkg/reconciler" @@ -52,18 +53,6 @@ type Reconciler struct { func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event { logging.FromContext(ctx).Infow("Reconciling", zap.Any("InMemoryChannel", imc)) - // If the IMC has been deleted just make sure the handler is removed. - if !imc.GetDeletionTimestamp().IsZero() { - if imc.Status.Address != nil && - imc.Status.Address.URL != nil { - if hostName := imc.Status.Address.URL.Host; hostName != "" { - logging.FromContext(ctx).Info("Removing dispatcher") - r.multiChannelMessageHandler.DeleteChannelHandler(hostName) - } - } - return nil - } - if !imc.Status.IsReady() { logging.FromContext(ctx).Debug("IMC is not ready, skipping") return nil @@ -160,3 +149,19 @@ func (r *Reconciler) newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*mult }, }, nil } + +func (r *Reconciler) deleteFunc(obj interface{}) { + if acc, err := kmeta.DeletionHandlingAccessor(obj); err == nil { + if imc, ok := acc.(*v1.InMemoryChannel); ok { + // If the IMC has been deleted just make sure the handler is removed. + if !imc.GetDeletionTimestamp().IsZero() { + if imc.Status.Address != nil && + imc.Status.Address.URL != nil { + if hostName := imc.Status.Address.URL.Host; hostName != "" { + r.multiChannelMessageHandler.DeleteChannelHandler(hostName) + } + } + } + } + } +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index cd83492866d..a989c013b97 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -477,10 +477,9 @@ func TestReconciler_ReconcileKind(t *testing.T) { } } -func TestReconciler_ReconcileKindDeletion(t *testing.T) { +func TestReconciler_Deletion(t *testing.T) { testCases := map[string]struct { - imc *v1.InMemoryChannel - wantResult reconciler.Event + imc *v1.InMemoryChannel }{ "With address": { imc: NewInMemoryChannel(imcName, testNS, @@ -502,10 +501,7 @@ func TestReconciler_ReconcileKindDeletion(t *testing.T) { r := &Reconciler{ multiChannelMessageHandler: handler, } - e := r.ReconcileKind(context.TODO(), tc.imc) - if e != tc.wantResult { - t.Errorf("Results differ, want %v have %v", tc.wantResult, e) - } + r.deleteFunc(tc.imc) if handler.GetChannelHandler(channelServiceAddress) != nil { t.Error("Got handler") } From 54130f540985565fb62066bc3b5a6859247e5ab8 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 16 Nov 2020 12:45:22 -0800 Subject: [PATCH 4/6] go imports --- pkg/reconciler/inmemorychannel/dispatcher/controller.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index f0229e428fe..4e022925128 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -18,9 +18,10 @@ package dispatcher import ( "context" - "knative.dev/pkg/injection" "time" + "knative.dev/pkg/injection" + "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/channel/multichannelfanout" From c90d3b57571ca245462619097bdc7cf8ebbf505a Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 16 Nov 2020 12:58:06 -0800 Subject: [PATCH 5/6] address pr feedback --- .../dispatcher/inmemorychannel.go | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 7700e9ad238..ecf2441084e 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -151,17 +151,17 @@ func (r *Reconciler) newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*mult } func (r *Reconciler) deleteFunc(obj interface{}) { - if acc, err := kmeta.DeletionHandlingAccessor(obj); err == nil { - if imc, ok := acc.(*v1.InMemoryChannel); ok { - // If the IMC has been deleted just make sure the handler is removed. - if !imc.GetDeletionTimestamp().IsZero() { - if imc.Status.Address != nil && - imc.Status.Address.URL != nil { - if hostName := imc.Status.Address.URL.Host; hostName != "" { - r.multiChannelMessageHandler.DeleteChannelHandler(hostName) - } - } - } + acc, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + return + } + imc, ok := acc.(*v1.InMemoryChannel) + if !ok { + return + } + if imc.Status.Address != nil && imc.Status.Address.URL != nil { + if hostName := imc.Status.Address.URL.Host; hostName != "" { + r.multiChannelMessageHandler.DeleteChannelHandler(hostName) } } } From 70468c8c37f024f87a5ac220f0a87cd01aaa99e2 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 16 Nov 2020 13:09:24 -0800 Subject: [PATCH 6/6] add tests for invalid types & nils --- .../dispatcher/inmemorychannel.go | 5 ++- .../dispatcher/inmemorychannel_test.go | 32 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index ecf2441084e..8fb96e1f90e 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -151,12 +151,15 @@ func (r *Reconciler) newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*mult } func (r *Reconciler) deleteFunc(obj interface{}) { + if obj == nil { + return + } acc, err := kmeta.DeletionHandlingAccessor(obj) if err != nil { return } imc, ok := acc.(*v1.InMemoryChannel) - if !ok { + if !ok || imc == nil { return } if imc.Status.Address != nil && imc.Status.Address.URL != nil { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index a989c013b97..627040dd884 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -477,6 +477,38 @@ func TestReconciler_ReconcileKind(t *testing.T) { } } +func TestReconciler_InvalidInputs(t *testing.T) { + testCases := map[string]struct { + imc interface{} + }{ + "nil": {}, + "With no address": { + imc: NewInMemoryChannel(imcName, testNS, WithInMemoryChannelDeleted), + }, + "With invalid type": { + imc: &subscriber1, + }, + } + for n, tc := range testCases { + fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil) + if err != nil { + t.Error(err) + } + for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} { + t.Run("handler-"+n, func(t *testing.T) { + handler := newFakeMultiChannelHandler() + if fanoutHandler != nil { + handler.SetChannelHandler(channelServiceAddress, fanoutHandler) + } + r := &Reconciler{ + multiChannelMessageHandler: handler, + } + r.deleteFunc(tc.imc) + }) + } + } +} + func TestReconciler_Deletion(t *testing.T) { testCases := map[string]struct { imc *v1.InMemoryChannel