Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions pkg/reconciler/inmemorychannel/dispatcher/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"context"
"time"

"knative.dev/pkg/injection"

"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/channel/multichannelfanout"
"knative.dev/eventing/pkg/kncloudevents"

"knative.dev/pkg/injection"

"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -124,8 +124,11 @@ func NewController(
inmemorychannelInformer.Informer().AddEventHandler(
cache.FilteringResourceEventHandler{
FilterFunc: filterWithAnnotation(injection.HasNamespaceScope(ctx)),
Handler: controller.HandleAll(impl.Enqueue),
})
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: r.deleteFunc,
}})

// Start the dispatcher.
go func() {
Expand Down
31 changes: 20 additions & 11 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"go.uber.org/zap"

"knative.dev/pkg/apis/duck"
"knative.dev/pkg/kmeta"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"

Expand Down Expand Up @@ -93,17 +94,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, imc *v1.InMemoryChannel)
return r.patchSubscriberStatus(ctx, imc)
}

func (r *Reconciler) FinalizeKind(ctx context.Context, imc *v1.InMemoryChannel) reconciler.Event {
if imc.Status.Address != nil &&
imc.Status.Address.URL != nil {
if hostName := imc.Status.Address.URL.Host; hostName != "" {
logging.FromContext(ctx).Info("Removing dispatcher")
r.multiChannelMessageHandler.DeleteChannelHandler(hostName)
}
}
return nil
}

func (r *Reconciler) patchSubscriberStatus(ctx context.Context, imc *v1.InMemoryChannel) error {
after := imc.DeepCopy()

Expand Down Expand Up @@ -159,3 +149,22 @@ func (r *Reconciler) newConfigForInMemoryChannel(imc *v1.InMemoryChannel) (*mult
},
}, nil
}

func (r *Reconciler) deleteFunc(obj interface{}) {
if obj == nil {
return
}
acc, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
return
}
imc, ok := acc.(*v1.InMemoryChannel)
if !ok || imc == nil {
return
}
if imc.Status.Address != nil && imc.Status.Address.URL != nil {
if hostName := imc.Status.Address.URL.Host; hostName != "" {
r.multiChannelMessageHandler.DeleteChannelHandler(hostName)
}
}
}
104 changes: 35 additions & 69 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ func TestAllCases(t *testing.T) {
Objects: []runtime.Object{
NewInMemoryChannel(imcName, testNS, WithInitInMemoryChannelConditions),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},
WantErr: false,
}, {
Name: "updated configuration, one channel",
Key: imcKey,
Expand All @@ -151,13 +144,6 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelChannelServiceReady(),
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},
WantErr: false,
}, {
Name: "with subscribers",
Key: imcKey,
Expand All @@ -172,14 +158,8 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
makePatch(testNS, imcName, twoSubscriberPatch),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},

WantErr: false,
}, {
Name: "with subscribers, patch fails",
Key: imcKey,
Expand All @@ -197,11 +177,9 @@ func TestAllCases(t *testing.T) {
InduceFailure("patch", "inmemorychannels/status"),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
makePatch(testNS, imcName, twoSubscriberPatch),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
Eventf(corev1.EventTypeWarning, "InternalError", "Failed patching: inducing failure for patch inmemorychannels"),
},
WantErr: true,
Expand All @@ -220,14 +198,6 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelReadySubscriberAndGeneration(string(subscriber2UID), subscriber2Generation),
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},

WantErr: false,
}, {
Name: "with subscribers, one removed one added to status",
Key: imcKey,
Expand All @@ -244,14 +214,8 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
makePatch(testNS, imcName, oneSubscriberRemovedOneAddedPatch),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},

WantErr: false,
}, {
Name: "subscriber with delivery spec",
Key: imcKey,
Expand Down Expand Up @@ -281,13 +245,8 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
makePatch(testNS, imcName, oneSubscriberPatch),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},
WantErr: false,
}, {
Name: "subscriber with invalid delivery spec",
Key: imcKey,
Expand Down Expand Up @@ -316,11 +275,7 @@ func TestAllCases(t *testing.T) {
}),
WithInMemoryChannelAddress(channelServiceAddress)),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(testNS, imcName),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
Eventf(corev1.EventTypeWarning, "InternalError", "failed to parse Spec.BackoffDelay: expected 'P' period mark at the start: garbage"),
},
WantErr: true,
Expand Down Expand Up @@ -354,13 +309,6 @@ func TestAllCases(t *testing.T) {
WithInMemoryChannelAddress(channelServiceAddress),
WithInMemoryChannelDeleted),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchRemoveFinalizers(testNS, imcName),
},
WantEvents: []string{
Eventf(corev1.EventTypeNormal, "FinalizerUpdate", "Updated %q finalizers", imcName),
},
WantErr: false,
},
}

Expand Down Expand Up @@ -529,10 +477,41 @@ func TestReconciler_ReconcileKind(t *testing.T) {
}
}

func TestReconciler_FinalizeKind(t *testing.T) {
func TestReconciler_InvalidInputs(t *testing.T) {
testCases := map[string]struct {
imc *v1.InMemoryChannel
wantResult reconciler.Event
imc interface{}
}{
"nil": {},
"With no address": {
imc: NewInMemoryChannel(imcName, testNS, WithInMemoryChannelDeleted),
},
"With invalid type": {
imc: &subscriber1,
},
}
for n, tc := range testCases {
fh, err := fanout.NewFanoutMessageHandler(nil, channel.NewMessageDispatcher(nil), fanout.Config{}, nil)
if err != nil {
t.Error(err)
}
for _, fanoutHandler := range []fanout.MessageHandler{nil, fh} {
t.Run("handler-"+n, func(t *testing.T) {
handler := newFakeMultiChannelHandler()
if fanoutHandler != nil {
handler.SetChannelHandler(channelServiceAddress, fanoutHandler)
}
r := &Reconciler{
multiChannelMessageHandler: handler,
}
r.deleteFunc(tc.imc)
})
}
}
}

func TestReconciler_Deletion(t *testing.T) {
testCases := map[string]struct {
imc *v1.InMemoryChannel
}{
"With address": {
imc: NewInMemoryChannel(imcName, testNS,
Expand All @@ -554,10 +533,7 @@ func TestReconciler_FinalizeKind(t *testing.T) {
r := &Reconciler{
multiChannelMessageHandler: handler,
}
e := r.FinalizeKind(context.TODO(), tc.imc)
if e != tc.wantResult {
t.Errorf("Results differ, want %v have %v", tc.wantResult, e)
}
r.deleteFunc(tc.imc)
if handler.GetChannelHandler(channelServiceAddress) != nil {
t.Error("Got handler")
}
Expand All @@ -576,16 +552,6 @@ func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl {
}
}

func patchFinalizers(namespace, name string) clientgotesting.PatchActionImpl {
return makePatch(namespace, name,
`{"metadata":{"finalizers":["`+finalizerName+`"],"resourceVersion":""}}`)
}

func patchRemoveFinalizers(namespace, name string) clientgotesting.PatchActionImpl {
return makePatch(namespace, name,
`{"metadata":{"finalizers":[],"resourceVersion":""}}`)
}

type fakeMultiChannelHandler struct {
handlers map[string]fanout.MessageHandler
}
Expand Down