From 61976aeeb8b3c4c27cd090f88361639095bc9d08 Mon Sep 17 00:00:00 2001 From: Gregory Haynes Date: Mon, 10 Sep 2018 18:14:57 +0000 Subject: [PATCH] Reprovision/Resub on bus param update When parameters for a bus are updated we need to reprovision and resubscribe all channels for that bus. Fixes #392 --- pkg/buses/cache.go | 16 +++++++++++++ pkg/buses/cache_test.go | 52 +++++++++++++++++++++++++++++++++++++++++ pkg/buses/reconciler.go | 19 +++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/pkg/buses/cache.go b/pkg/buses/cache.go index 430e209e5ad..009d629fec2 100644 --- a/pkg/buses/cache.go +++ b/pkg/buses/cache.go @@ -96,3 +96,19 @@ func (c *Cache) RemoveSubscription(subscription *channelsv1alpha1.Subscription) ref := NewSubscriptionReference(subscription) delete(c.subscriptions, ref) } + +func (c *Cache) AllChannels() []*channelsv1alpha1.Channel { + chans := []*channelsv1alpha1.Channel{} + for _, channel := range c.channels { + chans = append(chans, channel) + } + return chans +} + +func (c *Cache) AllSubscriptions() []*channelsv1alpha1.Subscription { + subs := []*channelsv1alpha1.Subscription{} + for _, sub := range c.subscriptions { + subs = append(subs, sub) + } + return subs +} diff --git a/pkg/buses/cache_test.go b/pkg/buses/cache_test.go index 5b2f8cee9e0..2a337c9adc6 100644 --- a/pkg/buses/cache_test.go +++ b/pkg/buses/cache_test.go @@ -17,10 +17,12 @@ limitations under the License. package buses_test import ( + "fmt" "testing" channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1" "github.com/knative/eventing/pkg/buses" + "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -130,6 +132,56 @@ func TestCacheNilSubscription(t *testing.T) { cache.RemoveSubscription(subscription) } +func TestCacheAllChannels(t *testing.T) { + cases := []struct { + Channels []*channelsv1alpha1.Channel + }{ + {Channels: []*channelsv1alpha1.Channel{}}, + {Channels: []*channelsv1alpha1.Channel{ + makeChannel(buses.NewChannelReferenceFromNames(cacheTestChannel, cacheDefaultNamespace)), + }}, + } + + for _, tt := range cases { + t.Run(fmt.Sprintf("%v", tt.Channels), func(t *testing.T) { + cache := buses.NewCache() + + for _, channel := range tt.Channels { + cache.AddChannel(channel) + } + + if !equality.Semantic.DeepEqual(tt.Channels, cache.AllChannels()) { + t.Errorf("%v != %v", tt.Channels, cache.AllChannels()) + } + }) + } +} + +func TestCacheAllSubscriptions(t *testing.T) { + cases := []struct { + Subscriptions []*channelsv1alpha1.Subscription + }{ + {Subscriptions: []*channelsv1alpha1.Subscription{}}, + {Subscriptions: []*channelsv1alpha1.Subscription{ + makeSubscription(buses.NewSubscriptionReferenceFromNames(cacheTestSubscription, cacheDefaultNamespace)), + }}, + } + + for _, tt := range cases { + t.Run(fmt.Sprintf("%v", tt.Subscriptions), func(t *testing.T) { + cache := buses.NewCache() + + for _, sub := range tt.Subscriptions { + cache.AddSubscription(sub) + } + + if !equality.Semantic.DeepEqual(tt.Subscriptions, cache.AllSubscriptions()) { + t.Errorf("%v != %v", tt.Subscriptions, cache.AllSubscriptions()) + } + }) + } +} + func makeChannel(ref buses.ChannelReference) *channelsv1alpha1.Channel { return &channelsv1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/buses/reconciler.go b/pkg/buses/reconciler.go index b5d3892a9de..a0605c03188 100644 --- a/pkg/buses/reconciler.go +++ b/pkg/buses/reconciler.go @@ -551,11 +551,30 @@ func (r *Reconciler) createOrUpdateBus(bus channelsv1alpha1.GenericBus) error { // will not emit the same key concurrently. Any bus received is an updated // revision of the current bus. bus, r.bus = r.bus, bus + if !equality.Semantic.DeepEqual(r.bus.GetSpec(), bus.GetSpec()) { err := r.handler.onBus(r.bus, r) if err != nil { return err } + + oldParams := bus.GetSpec().Parameters + newParams := r.bus.GetSpec().Parameters + // If channel parameters changed we need to reprovision + if !equality.Semantic.DeepEqual(oldParams.Channel, newParams.Channel) { + r.logger.Infof("Bus channel parameters changed. Reprovisioning channels.") + for _, channel := range r.cache.AllChannels() { + r.workqueue.AddRateLimited(makeWorkqueueKeyForChannel(channel)) + } + } + + // If subscription parameters changed we need to resubscribe + if !equality.Semantic.DeepEqual(oldParams.Subscription, newParams.Subscription) { + r.logger.Infof("Bus subscription parameters changed. Resubscribing.") + for _, subscription := range r.cache.AllSubscriptions() { + r.workqueue.AddRateLimited(makeWorkqueueKeyForSubscription(subscription)) + } + } } return nil