From b714f1235742ff2eb63093cb51aa0fe9bd5088ce Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Thu, 6 Jun 2019 19:11:49 -0700 Subject: [PATCH 1/6] WIP --- .../v1alpha1/natss_channel_lifecycle.go | 7 ++- .../messaging/v1alpha1/natss_channel_types.go | 5 +- .../v1alpha1/zz_generated.deepcopy.go | 3 +- .../natss/pkg/dispatcher/channel/reconcile.go | 4 +- contrib/natss/pkg/dispatcher/dispatcher.go | 37 +++++++++--- .../natss/pkg/dispatcher/dispatcher_test.go | 2 +- .../pkg/reconciler/dispatcher/natsschannel.go | 56 +++++++++++++++++-- 7 files changed, 94 insertions(+), 20 deletions(-) diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go index 0ac25a9f846..1780054c105 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck/v1alpha1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) @@ -51,7 +52,7 @@ const ( // the Addressable contract and has a non-empty hostname. NatssChannelConditionAddressable apis.ConditionType = "Addressable" - // NatssChannelConditionServiceReady has status True when a k8s Service representing the channel is ready. + // NatssChannelConditionChannelServiceReady has status True when a k8s Service representing the channel is ready. // Because this uses ExternalName, there are no endpoints to check. NatssChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady" ) @@ -71,7 +72,11 @@ func (cs *NatssChannelStatus) InitializeConditions() { nc.Manage(cs).InitializeConditions() } +// SetAddress sets the address (as part of Addressable contract) and marks the correct condition. func (cs *NatssChannelStatus) SetAddress(url *apis.URL) { + if cs.Address == nil { + cs.Address = &v1alpha1.Addressable{} + } if url != nil { cs.Address.Hostname = url.Host cs.Address.URL = url diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go index 7d9cb8b06d7..4ab45e67cf1 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go @@ -69,7 +69,10 @@ type NatssChannelStatus struct { // provided targets from inside the cluster. // // It generally has the form {channel}.{namespace}.svc.{cluster domain name} - Address duckv1alpha1.Addressable `json:"address,omitempty"` + duckv1alpha1.AddressStatus `json:",inline"` + + // Subscribers is populated with the statuses of each of the Channelable's subscribers. + eventingduck.SubscribableTypeStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 06f4e9b4323..bbcb4840989 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -111,7 +111,8 @@ func (in *NatssChannelSpec) DeepCopy() *NatssChannelSpec { func (in *NatssChannelStatus) DeepCopyInto(out *NatssChannelStatus) { *out = *in in.Status.DeepCopyInto(&out.Status) - in.Address.DeepCopyInto(&out.Address) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } diff --git a/contrib/natss/pkg/dispatcher/channel/reconcile.go b/contrib/natss/pkg/dispatcher/channel/reconcile.go index f932706e77b..5b50271a8eb 100644 --- a/contrib/natss/pkg/dispatcher/channel/reconcile.go +++ b/contrib/natss/pkg/dispatcher/channel/reconcile.go @@ -105,7 +105,7 @@ func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) (bool, error) { if !c.DeletionTimestamp.IsZero() { - if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil { + if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil { r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err)) return false, err } @@ -119,7 +119,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } // try to subscribe - if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil { + if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil { r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err)) return false, err } diff --git a/contrib/natss/pkg/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher.go index bc2b2263178..1adeb385692 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher.go @@ -20,17 +20,19 @@ import ( "context" "encoding/json" "fmt" + "strings" "sync" "sync/atomic" "time" "github.com/knative/eventing/contrib/natss/pkg/stanutil" + "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" stan "github.com/nats-io/go-nats-streaming" "go.uber.org/zap" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" ) const ( @@ -182,27 +184,27 @@ func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { } } -func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error { +func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (*v1alpha1.SubscribableStatus, error) { s.subscriptionsMux.Lock() defer s.subscriptionsMux.Unlock() cRef := provisioners.ChannelReference{Namespace: channel.Namespace, Name: channel.Name} - if channel.Spec.Subscribable == nil || isFinalizer { s.logger.Sugar().Infof("Empty subscriptions for channel Ref: %v; unsubscribe all active subscriptions, if any", cRef) chMap, ok := s.subscriptions[cRef] if !ok { // nothing to do s.logger.Sugar().Infof("No channel Ref %v found in subscriptions map", cRef) - return nil + return nil, nil } for sub := range chMap { s.unsubscribe(cRef, sub) } delete(s.subscriptions, cRef) - return nil + return nil, nil } + subsStatus := &v1alpha1.SubscribableStatus{} subscriptions := channel.Spec.Subscribable.Subscribers activeSubs := make(map[subscriptionReference]bool) // it's logically a set @@ -211,21 +213,28 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. chMap = make(map[subscriptionReference]*stan.Subscription) s.subscriptions[cRef] = chMap } + var errStrings []string for _, sub := range subscriptions { // check if the subscription already exist and do nothing in this case subRef := newSubscriptionReference(sub) if _, ok := chMap[subRef]; ok { activeSubs[subRef] = true s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef) + subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionTrue, "")) continue } // subscribe natssSub, err := s.subscribe(cRef, subRef) if err != nil { - return err + subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionFalse, err.Error())) + errStrings = append(errStrings, err.Error()) } chMap[subRef] = natssSub activeSubs[subRef] = true + subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionTrue, "")) + } + if len(errStrings) > 0 { + return subsStatus, fmt.Errorf(strings.Join(errStrings, "\n")) } // Unsubscribe for deleted subscriptions for sub := range chMap { @@ -237,7 +246,19 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. if len(s.subscriptions[cRef]) == 0 { delete(s.subscriptions, cRef) } - return nil + return subsStatus, nil +} + +func ToSubscriberStatus(subSpec *v1alpha1.SubscriberSpec, condition corev1.ConditionStatus, msg string) *v1alpha1.SubscriberStatus { + if subSpec == nil { + return nil + } + return &v1alpha1.SubscriberStatus{ + UID: subSpec.UID, + ObservedGeneration: subSpec.Generation, + Message: msg, + Ready: condition, + } } func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReference, subscription subscriptionReference) (*stan.Subscription, error) { diff --git a/contrib/natss/pkg/dispatcher/dispatcher_test.go b/contrib/natss/pkg/dispatcher/dispatcher_test.go index 3dbf7524b4f..cdc80b4b20a 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher_test.go +++ b/contrib/natss/pkg/dispatcher/dispatcher_test.go @@ -177,7 +177,7 @@ func TestUpdateSubscriptions(t *testing.T) { logger.Info("TestUpdateSubscriptions()") c := makeChannelWithSubscribers() - if err := s.UpdateSubscriptions(c, false); err != nil { + if _, err := s.UpdateSubscriptions(c, false); err != nil { t.Errorf("UpdateSubscriptions failed: %v", err) } diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index 15cfe801fa4..63149d5b3f0 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -19,6 +19,8 @@ package controller import ( "context" "encoding/json" + "fmt" + "reflect" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "go.uber.org/zap" @@ -111,15 +113,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return err } + if !original.Status.IsReady() { + return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status") + } + // Don't modify the informers copy. natssChannel := original.DeepCopy() + reconcileErr := r.reconcile(ctx, natssChannel) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling InMemoryChannel", zap.Error(reconcileErr)) + } else { + logging.FromContext(ctx).Debug("InMemoryChannel reconciled") + } + + // todo: Should this check for subscribable status rather than entire status? + if _, updateStatusErr := r.updateStatus(ctx, natssChannel); updateStatusErr != nil { + logging.FromContext(ctx).Error("Failed to update InMemoryChannel status", zap.Error(updateStatusErr)) + return updateStatusErr + } + return nil +} + +func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.NatssChannel) error { // TODO update dispatcher API and use Channelable or NatssChannel. c := toChannel(natssChannel) // See if the channel has been deleted. if natssChannel.DeletionTimestamp != nil { - if err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil { + if _, err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil { logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err)) return err } @@ -135,7 +157,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { } // Try to subscribe. - if err := r.natssDispatcher.UpdateSubscriptions(c, false); err != nil { + subStatus, err := r.natssDispatcher.UpdateSubscriptions(c, false) + natssChannel.Status.SubscribableStatus = subStatus + if err != nil { logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err)) return err } @@ -161,6 +185,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return nil } +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssChannel) (*v1alpha1.NatssChannel, error) { + imc, err := r.natsschannelLister.NatssChannels(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + if reflect.DeepEqual(imc.Status, desired.Status) { + return imc, nil + } + + // Don't modify the informers copy. + existing := imc.DeepCopy() + existing.Status = desired.Status + + return r.eventingClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) +} + // newConfigFromNatssChannels creates a new Config from the list of natss channels. func (r *Reconciler) newConfigFromNatssChannels(channels []*v1alpha1.NatssChannel) *multichannelfanout.Config { cc := make([]multichannelfanout.ChannelConfig, 0) @@ -212,7 +253,7 @@ func removeFinalizer(channel *v1alpha1.NatssChannel) { } func toChannel(natssChannel *v1alpha1.NatssChannel) *eventingv1alpha1.Channel { - return &eventingv1alpha1.Channel{ + channel := &eventingv1alpha1.Channel{ ObjectMeta: v1.ObjectMeta{ Name: natssChannel.Name, Namespace: natssChannel.Namespace, @@ -220,8 +261,11 @@ func toChannel(natssChannel *v1alpha1.NatssChannel) *eventingv1alpha1.Channel { Spec: eventingv1alpha1.ChannelSpec{ Subscribable: natssChannel.Spec.Subscribable, }, - Status: eventingv1alpha1.ChannelStatus{ - Address: natssChannel.Status.Address, - }, } + if natssChannel.Status.Address != nil { + channel.Status = eventingv1alpha1.ChannelStatus{ + Address: *natssChannel.Status.Address, + } + } + return channel } From 2e75a51a2155a41a8b0148480c179303343e0e50 Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Fri, 7 Jun 2019 13:21:43 -0700 Subject: [PATCH 2/6] Fixed UTS --- .../v1alpha1/natss_channel_lifecycle_test.go | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go index 7e024026033..3107a5f2726 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go @@ -19,10 +19,10 @@ package v1alpha1 import ( "testing" - "github.com/knative/pkg/apis" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" appsv1 "k8s.io/api/apps/v1" @@ -341,19 +341,23 @@ func TestNatssChannelStatus_SetAddressable(t *testing.T) { }, }, }, + AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}}, + SubscribableTypeStatus: eventingduck.SubscribableTypeStatus{}, }, }, "has domain": { url: &apis.URL{Scheme: "http", Host: "test-domain"}, want: &NatssChannelStatus{ - Address: duckv1alpha1.Addressable{ - Addressable: duckv1beta1.Addressable{ - URL: &apis.URL{ - Scheme: "http", - Host: "test-domain", + AddressStatus: duckv1alpha1.AddressStatus{ + Address: &duckv1alpha1.Addressable{ + Addressable: duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, }, + Hostname: "test-domain", }, - Hostname: "test-domain", }, Status: duckv1beta1.Status{ Conditions: []apis.Condition{ From db2741d86cdeaa55dadbed70f07517394051b059 Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Fri, 7 Jun 2019 13:40:20 -0700 Subject: [PATCH 3/6] Fixed natss channel dispatcher compile issue --- contrib/natss/pkg/reconciler/dispatcher/natsschannel.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index 13081165489..eb8b06e3365 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -195,7 +195,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssCh existing := imc.DeepCopy() existing.Status = desired.Status - return r.eventingClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) + return r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) } // newConfigFromNatssChannels creates a new Config from the list of natss channels. From 20c436a7b1f22891ad01a47fed19576a075b1256 Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Fri, 7 Jun 2019 15:37:08 -0700 Subject: [PATCH 4/6] minor improvements to dispatcher.go --- contrib/natss/pkg/dispatcher/dispatcher.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/contrib/natss/pkg/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher.go index 1adeb385692..e1a6aa37f45 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher.go @@ -215,23 +215,29 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. } var errStrings []string for _, sub := range subscriptions { + subscriberStatus := ToSubscriberStatus(&sub, corev1.ConditionFalse, "internal error") // check if the subscription already exist and do nothing in this case subRef := newSubscriptionReference(sub) if _, ok := chMap[subRef]; ok { activeSubs[subRef] = true s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef) - subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionTrue, "")) + subscriberStatus.Ready = corev1.ConditionTrue + subscriberStatus.Message = fmt.Sprintf("Subscription: %v already active for channel: %v", sub, cRef) continue } // subscribe natssSub, err := s.subscribe(cRef, subRef) if err != nil { - subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionFalse, err.Error())) + subscriberStatus.Ready = corev1.ConditionTrue + subscriberStatus.Message = err.Error() errStrings = append(errStrings, err.Error()) } chMap[subRef] = natssSub activeSubs[subRef] = true - subsStatus.Subscribers = append(subsStatus.Subscribers, *ToSubscriberStatus(&sub, corev1.ConditionTrue, "")) + + subscriberStatus.Ready = corev1.ConditionTrue + subscriberStatus.Message = fmt.Sprintf("Subscription: %v subscribed to channel: %v", sub, cRef) + subsStatus.Subscribers = append(subsStatus.Subscribers, *subscriberStatus) } if len(errStrings) > 0 { return subsStatus, fmt.Errorf(strings.Join(errStrings, "\n")) From b562dba4531d5203b54a3d8fae014935d756ebaf Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Fri, 7 Jun 2019 17:32:29 -0700 Subject: [PATCH 5/6] Updates based on PR comments --- .../pkg/reconciler/dispatcher/kafkachannel.go | 4 ++ contrib/natss/pkg/dispatcher/dispatcher.go | 30 ++++------ .../pkg/reconciler/dispatcher/natsschannel.go | 56 ++++++++++++++----- 3 files changed, 56 insertions(+), 34 deletions(-) diff --git a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go index 4535fec2500..0b934038f52 100644 --- a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go +++ b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go @@ -165,6 +165,10 @@ func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) e return err } kc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(kc.Spec.Subscribable, failedSubscriptions) + if len(failedSubscriptions) > 0 { + logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe") + return fmt.Errorf("Some kafka subscriptions failed to subscribe") + } return nil } diff --git a/contrib/natss/pkg/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher.go index e1a6aa37f45..57c93b84d58 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher.go @@ -20,13 +20,13 @@ import ( "context" "encoding/json" "fmt" - "strings" "sync" "sync/atomic" "time" "github.com/knative/eventing/contrib/natss/pkg/stanutil" "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" @@ -184,10 +184,12 @@ func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { } } -func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (*v1alpha1.SubscribableStatus, error) { +// UpdateSubscriptions creates/deletes the natss subscriptions based on channel.Spec.Subscribable.Subscribers +func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (map[eventingduck.SubscriberSpec]error, error) { s.subscriptionsMux.Lock() defer s.subscriptionsMux.Unlock() + failedToSubscribe := make(map[eventingduck.SubscriberSpec]error) cRef := provisioners.ChannelReference{Namespace: channel.Namespace, Name: channel.Name} if channel.Spec.Subscribable == nil || isFinalizer { s.logger.Sugar().Infof("Empty subscriptions for channel Ref: %v; unsubscribe all active subscriptions, if any", cRef) @@ -195,16 +197,15 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. if !ok { // nothing to do s.logger.Sugar().Infof("No channel Ref %v found in subscriptions map", cRef) - return nil, nil + return failedToSubscribe, nil } for sub := range chMap { s.unsubscribe(cRef, sub) } delete(s.subscriptions, cRef) - return nil, nil + return failedToSubscribe, nil } - subsStatus := &v1alpha1.SubscribableStatus{} subscriptions := channel.Spec.Subscribable.Subscribers activeSubs := make(map[subscriptionReference]bool) // it's logically a set @@ -215,32 +216,23 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. } var errStrings []string for _, sub := range subscriptions { - subscriberStatus := ToSubscriberStatus(&sub, corev1.ConditionFalse, "internal error") // check if the subscription already exist and do nothing in this case subRef := newSubscriptionReference(sub) if _, ok := chMap[subRef]; ok { activeSubs[subRef] = true s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef) - subscriberStatus.Ready = corev1.ConditionTrue - subscriberStatus.Message = fmt.Sprintf("Subscription: %v already active for channel: %v", sub, cRef) continue } // subscribe natssSub, err := s.subscribe(cRef, subRef) if err != nil { - subscriberStatus.Ready = corev1.ConditionTrue - subscriberStatus.Message = err.Error() errStrings = append(errStrings, err.Error()) + s.logger.Sugar().Errorf("failed to subscribe (subscription:%q) to channel: %v. Error:%s", sub, cRef, err.Error()) + failedToSubscribe[sub] = err + continue } chMap[subRef] = natssSub activeSubs[subRef] = true - - subscriberStatus.Ready = corev1.ConditionTrue - subscriberStatus.Message = fmt.Sprintf("Subscription: %v subscribed to channel: %v", sub, cRef) - subsStatus.Subscribers = append(subsStatus.Subscribers, *subscriberStatus) - } - if len(errStrings) > 0 { - return subsStatus, fmt.Errorf(strings.Join(errStrings, "\n")) } // Unsubscribe for deleted subscriptions for sub := range chMap { @@ -252,10 +244,10 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. if len(s.subscriptions[cRef]) == 0 { delete(s.subscriptions, cRef) } - return subsStatus, nil + return failedToSubscribe, nil } -func ToSubscriberStatus(subSpec *v1alpha1.SubscriberSpec, condition corev1.ConditionStatus, msg string) *v1alpha1.SubscriberStatus { +func toSubscriberStatus(subSpec *v1alpha1.SubscriberSpec, condition corev1.ConditionStatus, msg string) *v1alpha1.SubscriberStatus { if subSpec == nil { return nil } diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index eb8b06e3365..ce447cfe502 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -22,23 +22,24 @@ import ( "fmt" "reflect" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "go.uber.org/zap" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" - "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" "github.com/knative/eventing/contrib/natss/pkg/reconciler" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners/fanout" "github.com/knative/eventing/pkg/provisioners/multichannelfanout" "github.com/knative/pkg/controller" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" ) @@ -118,14 +119,14 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { reconcileErr := r.reconcile(ctx, natssChannel) if reconcileErr != nil { - logging.FromContext(ctx).Error("Error reconciling InMemoryChannel", zap.Error(reconcileErr)) + logging.FromContext(ctx).Error("Error reconciling NatssChannel", zap.Error(reconcileErr)) } else { - logging.FromContext(ctx).Debug("InMemoryChannel reconciled") + logging.FromContext(ctx).Debug("NatssChannel reconciled") } - // todo: Should this check for subscribable status rather than entire status? + // TODO: Should this check for subscribable status rather than entire status? if _, updateStatusErr := r.updateStatus(ctx, natssChannel); updateStatusErr != nil { - logging.FromContext(ctx).Error("Failed to update InMemoryChannel status", zap.Error(updateStatusErr)) + logging.FromContext(ctx).Error("Failed to update NatssChannel status", zap.Error(updateStatusErr)) return updateStatusErr } return nil @@ -153,12 +154,16 @@ func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.Natss } // Try to subscribe. - subStatus, err := r.natssDispatcher.UpdateSubscriptions(c, false) - natssChannel.Status.SubscribableStatus = subStatus + failedSubscriptions, err := r.natssDispatcher.UpdateSubscriptions(c, false) if err != nil { logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err)) return err } + natssChannel.Status.SubscribableStatus = r.createSubscribableStatus(natssChannel.Spec.Subscribable, failedSubscriptions) + if len(failedSubscriptions) > 0 { + logging.FromContext(ctx).Error("Some natss subscriptions failed to subscribe") + return fmt.Errorf("Some natss subscriptions failed to subscribe") + } natssChannels, err := r.natsschannelLister.List(labels.Everything()) if err != nil { @@ -180,19 +185,40 @@ func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.Natss return nil } +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable, failedSubscriptions map[eventingduck.SubscriberSpec]error) *eventingduck.SubscribableStatus { + if subscribable == nil { + return nil + } + subscriberStatus := make([]eventingduck.SubscriberStatus, 0) + for _, sub := range subscribable.Subscribers { + status := eventingduck.SubscriberStatus{ + UID: sub.UID, + ObservedGeneration: sub.Generation, + Ready: corev1.ConditionTrue, + } + if err, ok := failedSubscriptions[sub]; ok { + status.Ready = corev1.ConditionFalse + status.Message = err.Error() + } + subscriberStatus = append(subscriberStatus, status) + } + return &eventingduck.SubscribableStatus{ + Subscribers: subscriberStatus, + } +} func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssChannel) (*v1alpha1.NatssChannel, error) { - imc, err := r.natsschannelLister.NatssChannels(desired.Namespace).Get(desired.Name) + nc, err := r.natsschannelLister.NatssChannels(desired.Namespace).Get(desired.Name) if err != nil { return nil, err } - if reflect.DeepEqual(imc.Status, desired.Status) { - return imc, nil + if reflect.DeepEqual(nc.Status, desired.Status) { + return nc, nil } // Don't modify the informers copy. - existing := imc.DeepCopy() + existing := nc.DeepCopy() existing.Status = desired.Status return r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) From 6430580c95ad65fe6f7530ef1d7d610b850ce666 Mon Sep 17 00:00:00 2001 From: Akash Verenkar Date: Mon, 10 Jun 2019 17:23:55 -0700 Subject: [PATCH 6/6] Updates based on PR comments --- contrib/natss/pkg/dispatcher/dispatcher.go | 4 +++- .../natss/pkg/reconciler/dispatcher/natsschannel.go | 11 +++++++++-- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/contrib/natss/pkg/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher.go index 57c93b84d58..d0655050f10 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher.go @@ -185,6 +185,8 @@ func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { } // UpdateSubscriptions creates/deletes the natss subscriptions based on channel.Spec.Subscribable.Subscribers +// Return type:map[eventingduck.SubscriberSpec]error --> Returns a map of subscriberSpec that failed with the value=error encountered. +// Ignore the value in case error != nil func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (map[eventingduck.SubscriberSpec]error, error) { s.subscriptionsMux.Lock() defer s.subscriptionsMux.Unlock() @@ -223,7 +225,7 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef) continue } - // subscribe + // subscribe and update failedSubscription if subscribe fails natssSub, err := s.subscribe(cRef, subRef) if err != nil { errStrings = append(errStrings, err.Error()) diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index ce447cfe502..6853add26a8 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -21,6 +21,7 @@ import ( "encoding/json" "fmt" "reflect" + "strings" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" @@ -161,8 +162,14 @@ func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.Natss } natssChannel.Status.SubscribableStatus = r.createSubscribableStatus(natssChannel.Spec.Subscribable, failedSubscriptions) if len(failedSubscriptions) > 0 { - logging.FromContext(ctx).Error("Some natss subscriptions failed to subscribe") - return fmt.Errorf("Some natss subscriptions failed to subscribe") + var b strings.Builder + for _, subError := range failedSubscriptions { + b.WriteString("\n") + b.WriteString(subError.Error()) + } + errMsg := b.String() + logging.FromContext(ctx).Error(errMsg) + return fmt.Errorf(errMsg) } natssChannels, err := r.natsschannelLister.List(labels.Everything())