From 9401c17d668cd08bf71ed4d462c9cfe92fa7c93a Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Thu, 6 Jun 2019 12:54:54 +0300 Subject: [PATCH 1/3] change to new status fields and codegen --- pkg/apis/messaging/v1alpha1/in_memory_channel_types.go | 5 ++++- pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go index 6d6638c996b..b2c65d685f3 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go @@ -71,7 +71,10 @@ type InMemoryChannelStatus struct { // provided targets from inside the cluster. // // It generally has the form {channel}.{namespace}.svc.{cluster domain name} - Address duckv1alpha1.Addressable `json:"address,omitempty"` + duckv1alpha1.AddressStatus `json:",inline"` + + // Subscribers is populated with the statuses of each of the Channelable's subscribers. + eventingduck.SubscribableTypeStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 1ab4cf03025..0200b217083 100644 --- a/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -111,7 +111,8 @@ func (in *InMemoryChannelSpec) DeepCopy() *InMemoryChannelSpec { func (in *InMemoryChannelStatus) DeepCopyInto(out *InMemoryChannelStatus) { *out = *in in.Status.DeepCopyInto(&out.Status) - in.Address.DeepCopyInto(&out.Address) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } From 01fcacd09e5a9d6ce94c4ae4e58034e2920f0bc0 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Thu, 6 Jun 2019 13:52:59 +0300 Subject: [PATCH 2/3] Populate the status subscribable properly --- cmd/in_memory/channel_dispatcher/main.go | 1 + .../200-dispatcher-clusterrole.yaml | 7 ++ .../v1alpha1/in_memory_channel_lifecycle.go | 4 + .../in_memory_channel_lifecycle_test.go | 15 ++-- .../dispatcher/inmemorychannel.go | 82 ++++++++++++++++++- .../dispatcher/inmemorychannel_test.go | 2 + 6 files changed, 104 insertions(+), 7 deletions(-) diff --git a/cmd/in_memory/channel_dispatcher/main.go b/cmd/in_memory/channel_dispatcher/main.go index defada3f7d7..97f49a110d3 100644 --- a/cmd/in_memory/channel_dispatcher/main.go +++ b/cmd/in_memory/channel_dispatcher/main.go @@ -92,6 +92,7 @@ func main() { controllers := [...]*kncontroller.Impl{ inmemorychannel.NewController( opt, + opt.EventingClientSet, inMemoryDispatcher, inMemoryChannelInformer, ), diff --git a/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml b/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml index bcdc80115c1..d67d3e6506c 100644 --- a/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml +++ b/config/channels/in-memory-channel/200-dispatcher-clusterrole.yaml @@ -33,3 +33,10 @@ rules: - get - list - watch +# Updates the status to reflect subscribable status. + - apiGroups: + - messaging.knative.dev + resources: + - inmemorychannels/status + verbs: + - update diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle.go index 7854a8a862f..c8b74c77b92 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck/v1alpha1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) @@ -68,6 +69,9 @@ func (imcs *InMemoryChannelStatus) InitializeConditions() { // TODO: Use the new beta duck types. func (imcs *InMemoryChannelStatus) SetAddress(url *apis.URL) { + if imcs.Address == nil { + imcs.Address = &v1alpha1.Addressable{} + } if url != nil { imcs.Address.Hostname = url.Host imcs.Address.URL = url diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle_test.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle_test.go index d638298ea65..c62f674e093 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle_test.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_lifecycle_test.go @@ -340,19 +340,22 @@ func TestInMemoryChannelStatus_SetAddressable(t *testing.T) { }, }, }, + AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}}, }, }, "has domain": { url: &apis.URL{Scheme: "http", Host: "test-domain"}, want: &InMemoryChannelStatus{ - Address: duckv1alpha1.Addressable{ - Addressable: duckv1beta1.Addressable{ - URL: &apis.URL{ - Scheme: "http", - Host: "test-domain", + AddressStatus: duckv1alpha1.AddressStatus{ + Address: &duckv1alpha1.Addressable{ + duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, }, + "test-domain", }, - Hostname: "test-domain", }, Status: duckv1beta1.Status{ Conditions: []apis.Condition{{ diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 66a81b2455c..7488d19b912 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -18,10 +18,14 @@ package controller import ( "context" + "fmt" + "reflect" "github.com/knative/eventing/pkg/inmemorychannel" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" + clientset "github.com/knative/eventing/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/pkg/logging" @@ -29,6 +33,9 @@ import ( "github.com/knative/eventing/pkg/provisioners/multichannelfanout" "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" ) @@ -46,6 +53,7 @@ const ( type Reconciler struct { *reconciler.Base + eventingClientSet clientset.Interface dispatcher inmemorychannel.Dispatcher inmemorychannelLister listers.InMemoryChannelLister inmemorychannelInformer cache.SharedIndexInformer @@ -59,6 +67,7 @@ var _ controller.Reconciler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, + eventingClientSet clientset.Interface, dispatcher inmemorychannel.Dispatcher, inmemorychannelinformer messaginginformers.InMemoryChannelInformer, ) *controller.Impl { @@ -66,6 +75,7 @@ func NewController( r := &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), dispatcher: dispatcher, + eventingClientSet: eventingClientSet, inmemorychannelLister: inmemorychannelinformer.Lister(), inmemorychannelInformer: inmemorychannelinformer.Informer(), } @@ -81,12 +91,45 @@ func NewController( func (r *Reconciler) Reconcile(ctx context.Context, key string) error { // Convert the namespace/name string into a distinct namespace and name. - _, _, err := cache.SplitMetaNamespaceKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { logging.FromContext(ctx).Error("invalid resource key") return nil } + // Get the IMC resource with this namespace/name. + original, err := r.inmemorychannelLister.InMemoryChannels(namespace).Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("InMemoryChannel key in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + if !original.Status.IsReady() { + return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status") + } + + // Don't modify the informers copy. + channel := original.DeepCopy() + + reconcileErr := r.reconcile(ctx, channel) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling InMemoryChannel", zap.Error(reconcileErr)) + } else { + logging.FromContext(ctx).Debug("InMemoryChannel reconciled") + } + + // todo: Should this check for subscribable status rather than entire status? + if _, updateStatusErr := r.updateStatus(ctx, channel); updateStatusErr != nil { + logging.FromContext(ctx).Error("Failed to update InMemoryChannel status", zap.Error(updateStatusErr)) + return updateStatusErr + } + return nil +} + +func (r *Reconciler) reconcile(ctx context.Context, imc *v1alpha1.InMemoryChannel) error { // This is a special Reconciler that does the following: // 1. Lists the inmemory channels. // 2. Creates a multi-channel-fanout-config. @@ -112,9 +155,28 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return err } + imc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(imc.Spec.Subscribable) return nil } +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable) *eventingduck.SubscribableStatus { + if subscribable == nil { + return nil + } + subscriberStatus := make([]eventingduck.SubscriberStatus, 0) + for _, sub := range subscribable.Subscribers { + status := eventingduck.SubscriberStatus{ + UID: sub.UID, + ObservedGeneration: sub.Generation, + Ready: corev1.ConditionTrue, + } + subscriberStatus = append(subscriberStatus, status) + } + return &eventingduck.SubscribableStatus{ + Subscribers: subscriberStatus, + } +} + // newConfigFromInMemoryChannels creates a new Config from the list of inmemory channels. func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemoryChannel) *multichannelfanout.Config { cc := make([]multichannelfanout.ChannelConfig, 0) @@ -136,3 +198,21 @@ func (r *Reconciler) newConfigFromInMemoryChannels(channels []*v1alpha1.InMemory ChannelConfigs: cc, } } + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.InMemoryChannel) (*v1alpha1.InMemoryChannel, error) { + imc, err := r.inmemorychannelLister.InMemoryChannels(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + if reflect.DeepEqual(imc.Status, desired.Status) { + return imc, nil + } + + // Don't modify the informers copy. + existing := imc.DeepCopy() + existing.Status = desired.Status + + new, err := r.eventingClientSet.MessagingV1alpha1().InMemoryChannels(desired.Namespace).UpdateStatus(existing) + return new, err +} diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 21469e1d4e3..5baf09dd9fc 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -65,6 +65,7 @@ func TestNewController(t *testing.T) { EventingClientSet: eventingClient, Logger: logtesting.TestLogger(t), }, + eventingClient, dispatcher, imcInformer) @@ -104,6 +105,7 @@ func TestAllCases(t *testing.T) { table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { return &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), + eventingClientSet: fakeclientset.NewSimpleClientset(), inmemorychannelLister: listers.GetInMemoryChannelLister(), // TODO fix inmemorychannelInformer: nil, From f5de54a3a88d78b780128b83a006e5c582b40553 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Thu, 6 Jun 2019 14:52:25 +0300 Subject: [PATCH 3/3] use eventingclientset from Base, add unit test for status update --- cmd/in_memory/channel_dispatcher/main.go | 1 - .../dispatcher/inmemorychannel.go | 7 +-- .../dispatcher/inmemorychannel_test.go | 51 ++++++++++++++++++- pkg/reconciler/testing/inmemorychannel.go | 17 ++++++- 4 files changed, 66 insertions(+), 10 deletions(-) diff --git a/cmd/in_memory/channel_dispatcher/main.go b/cmd/in_memory/channel_dispatcher/main.go index 97f49a110d3..defada3f7d7 100644 --- a/cmd/in_memory/channel_dispatcher/main.go +++ b/cmd/in_memory/channel_dispatcher/main.go @@ -92,7 +92,6 @@ func main() { controllers := [...]*kncontroller.Impl{ inmemorychannel.NewController( opt, - opt.EventingClientSet, inMemoryDispatcher, inMemoryChannelInformer, ), diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 7488d19b912..d1a34861d3e 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -25,7 +25,6 @@ import ( eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" - clientset "github.com/knative/eventing/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/pkg/logging" @@ -53,7 +52,6 @@ const ( type Reconciler struct { *reconciler.Base - eventingClientSet clientset.Interface dispatcher inmemorychannel.Dispatcher inmemorychannelLister listers.InMemoryChannelLister inmemorychannelInformer cache.SharedIndexInformer @@ -67,7 +65,6 @@ var _ controller.Reconciler = (*Reconciler)(nil) // Registers event handlers to enqueue events. func NewController( opt reconciler.Options, - eventingClientSet clientset.Interface, dispatcher inmemorychannel.Dispatcher, inmemorychannelinformer messaginginformers.InMemoryChannelInformer, ) *controller.Impl { @@ -75,7 +72,6 @@ func NewController( r := &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), dispatcher: dispatcher, - eventingClientSet: eventingClientSet, inmemorychannelLister: inmemorychannelinformer.Lister(), inmemorychannelInformer: inmemorychannelinformer.Informer(), } @@ -213,6 +209,5 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.InMemor existing := imc.DeepCopy() existing.Status = desired.Status - new, err := r.eventingClientSet.MessagingV1alpha1().InMemoryChannels(desired.Namespace).UpdateStatus(existing) - return new, err + return r.EventingClientSet.MessagingV1alpha1().InMemoryChannels(desired.Namespace).UpdateStatus(existing) } diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index 5baf09dd9fc..5c16e299f2b 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -23,6 +23,7 @@ import ( "github.com/knative/eventing/pkg/provisioners/multichannelfanout" "k8s.io/apimachinery/pkg/runtime" + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" fakeclientset "github.com/knative/eventing/pkg/client/clientset/versioned/fake" informers "github.com/knative/eventing/pkg/client/informers/externalversions" @@ -33,6 +34,7 @@ import ( . "github.com/knative/pkg/reconciler/testing" fakekubeclientset "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" + clientgotesting "k8s.io/client-go/testing" ) const ( @@ -65,7 +67,6 @@ func TestNewController(t *testing.T) { EventingClientSet: eventingClient, Logger: logtesting.TestLogger(t), }, - eventingClient, dispatcher, imcInformer) @@ -75,6 +76,28 @@ func TestNewController(t *testing.T) { } func TestAllCases(t *testing.T) { + subscribers := []duckv1alpha1.SubscriberSpec{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: "call1", + ReplyURI: "sink2", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 2, + SubscriberURI: "call2", + ReplyURI: "sink2", + }} + + subscriberStatuses := []duckv1alpha1.SubscriberStatus{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: "True", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 2, + Ready: "True", + }} + imcKey := testNS + "/" + imcName table := TableTest{ { @@ -98,6 +121,31 @@ func TestAllCases(t *testing.T) { reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)), }, WantErr: false, + }, { + Name: "with subscribers", + Key: imcKey, + Objects: []runtime.Object{ + reconciletesting.NewInMemoryChannel(imcName, testNS, + reconciletesting.WithInitInMemoryChannelConditions, + reconciletesting.WithInMemoryChannelDeploymentReady(), + reconciletesting.WithInMemoryChannelServiceReady(), + reconciletesting.WithInMemoryChannelEndpointsReady(), + reconciletesting.WithInMemoryChannelChannelServiceReady(), + reconciletesting.WithInMemoryChannelSubscribers(subscribers), + reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)), + }, + WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ + Object: reconciletesting.NewInMemoryChannel(imcName, testNS, + reconciletesting.WithInitInMemoryChannelConditions, + reconciletesting.WithInMemoryChannelDeploymentReady(), + reconciletesting.WithInMemoryChannelServiceReady(), + reconciletesting.WithInMemoryChannelEndpointsReady(), + reconciletesting.WithInMemoryChannelChannelServiceReady(), + reconciletesting.WithInMemoryChannelSubscribers(subscribers), + reconciletesting.WithInMemoryChannelStatusSubscribers(subscriberStatuses), + reconciletesting.WithInMemoryChannelAddress(channelServiceAddress)), + }}, + WantErr: false, }, {}, } defer logtesting.ClearAll() @@ -105,7 +153,6 @@ func TestAllCases(t *testing.T) { table.Test(t, reconciletesting.MakeFactory(func(listers *reconciletesting.Listers, opt reconciler.Options) controller.Reconciler { return &Reconciler{ Base: reconciler.NewBase(opt, controllerAgentName), - eventingClientSet: fakeclientset.NewSimpleClientset(), inmemorychannelLister: listers.GetInMemoryChannelLister(), // TODO fix inmemorychannelInformer: nil, diff --git a/pkg/reconciler/testing/inmemorychannel.go b/pkg/reconciler/testing/inmemorychannel.go index be9914d23da..79786e153fa 100644 --- a/pkg/reconciler/testing/inmemorychannel.go +++ b/pkg/reconciler/testing/inmemorychannel.go @@ -20,12 +20,12 @@ import ( "context" "time" + duckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/messaging/v1alpha1" "github.com/knative/pkg/apis" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - // "k8s.io/apimachinery/pkg/types" ) // InMemoryChannelOption enables further configuration of a InMemoryChannel. @@ -56,6 +56,12 @@ func WithInMemoryChannelDeleted(imc *v1alpha1.InMemoryChannel) { imc.ObjectMeta.SetDeletionTimestamp(&deleteTime) } +func WithInMemoryChannelSubscribers(subscribers []duckv1alpha1.SubscriberSpec) InMemoryChannelOption { + return func(imc *v1alpha1.InMemoryChannel) { + imc.Spec.Subscribable = &duckv1alpha1.Subscribable{Subscribers: subscribers} + } +} + func WithInMemoryChannelDeploymentNotReady(reason, message string) InMemoryChannelOption { return func(imc *v1alpha1.InMemoryChannel) { imc.Status.MarkDispatcherFailed(reason, message) @@ -112,3 +118,12 @@ func WithInMemoryChannelAddress(a string) InMemoryChannelOption { }) } } + +func WithInMemoryChannelStatusSubscribers(subscriberStatuses []duckv1alpha1.SubscriberStatus) InMemoryChannelOption { + return func(imc *v1alpha1.InMemoryChannel) { + imc.Status.SubscribableTypeStatus = duckv1alpha1.SubscribableTypeStatus{ + SubscribableStatus: &duckv1alpha1.SubscribableStatus{ + Subscribers: subscriberStatuses}, + } + } +}