diff --git a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go index 4535fec2500..0b934038f52 100644 --- a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go +++ b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go @@ -165,6 +165,10 @@ func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) e return err } kc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(kc.Spec.Subscribable, failedSubscriptions) + if len(failedSubscriptions) > 0 { + logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe") + return fmt.Errorf("Some kafka subscriptions failed to subscribe") + } return nil } diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go index 0ac25a9f846..1780054c105 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck/v1alpha1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) @@ -51,7 +52,7 @@ const ( // the Addressable contract and has a non-empty hostname. NatssChannelConditionAddressable apis.ConditionType = "Addressable" - // NatssChannelConditionServiceReady has status True when a k8s Service representing the channel is ready. + // NatssChannelConditionChannelServiceReady has status True when a k8s Service representing the channel is ready. // Because this uses ExternalName, there are no endpoints to check. NatssChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady" ) @@ -71,7 +72,11 @@ func (cs *NatssChannelStatus) InitializeConditions() { nc.Manage(cs).InitializeConditions() } +// SetAddress sets the address (as part of Addressable contract) and marks the correct condition. func (cs *NatssChannelStatus) SetAddress(url *apis.URL) { + if cs.Address == nil { + cs.Address = &v1alpha1.Addressable{} + } if url != nil { cs.Address.Hostname = url.Host cs.Address.URL = url diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go index 7e024026033..3107a5f2726 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_lifecycle_test.go @@ -19,10 +19,10 @@ package v1alpha1 import ( "testing" - "github.com/knative/pkg/apis" - "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + "github.com/knative/pkg/apis" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" appsv1 "k8s.io/api/apps/v1" @@ -341,19 +341,23 @@ func TestNatssChannelStatus_SetAddressable(t *testing.T) { }, }, }, + AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}}, + SubscribableTypeStatus: eventingduck.SubscribableTypeStatus{}, }, }, "has domain": { url: &apis.URL{Scheme: "http", Host: "test-domain"}, want: &NatssChannelStatus{ - Address: duckv1alpha1.Addressable{ - Addressable: duckv1beta1.Addressable{ - URL: &apis.URL{ - Scheme: "http", - Host: "test-domain", + AddressStatus: duckv1alpha1.AddressStatus{ + Address: &duckv1alpha1.Addressable{ + Addressable: duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, }, + Hostname: "test-domain", }, - Hostname: "test-domain", }, Status: duckv1beta1.Status{ Conditions: []apis.Condition{ diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go index 7d9cb8b06d7..4ab45e67cf1 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/natss_channel_types.go @@ -69,7 +69,10 @@ type NatssChannelStatus struct { // provided targets from inside the cluster. // // It generally has the form {channel}.{namespace}.svc.{cluster domain name} - Address duckv1alpha1.Addressable `json:"address,omitempty"` + duckv1alpha1.AddressStatus `json:",inline"` + + // Subscribers is populated with the statuses of each of the Channelable's subscribers. + eventingduck.SubscribableTypeStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index 06f4e9b4323..bbcb4840989 100644 --- a/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/contrib/natss/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -111,7 +111,8 @@ func (in *NatssChannelSpec) DeepCopy() *NatssChannelSpec { func (in *NatssChannelStatus) DeepCopyInto(out *NatssChannelStatus) { *out = *in in.Status.DeepCopyInto(&out.Status) - in.Address.DeepCopyInto(&out.Address) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } diff --git a/contrib/natss/pkg/dispatcher/channel/reconcile.go b/contrib/natss/pkg/dispatcher/channel/reconcile.go index f932706e77b..5b50271a8eb 100644 --- a/contrib/natss/pkg/dispatcher/channel/reconcile.go +++ b/contrib/natss/pkg/dispatcher/channel/reconcile.go @@ -105,7 +105,7 @@ func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool { func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) (bool, error) { if !c.DeletionTimestamp.IsZero() { - if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil { + if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil { r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err)) return false, err } @@ -119,7 +119,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) } // try to subscribe - if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil { + if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil { r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err)) return false, err } diff --git a/contrib/natss/pkg/dispatcher/dispatcher.go b/contrib/natss/pkg/dispatcher/dispatcher.go index bc2b2263178..d0655050f10 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher.go +++ b/contrib/natss/pkg/dispatcher/dispatcher.go @@ -25,12 +25,14 @@ import ( "time" "github.com/knative/eventing/contrib/natss/pkg/stanutil" + "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners" stan "github.com/nats-io/go-nats-streaming" "go.uber.org/zap" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + corev1 "k8s.io/api/core/v1" ) const ( @@ -182,25 +184,28 @@ func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) { } } -func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error { +// UpdateSubscriptions creates/deletes the natss subscriptions based on channel.Spec.Subscribable.Subscribers +// Return type:map[eventingduck.SubscriberSpec]error --> Returns a map of subscriberSpec that failed with the value=error encountered. +// Ignore the value in case error != nil +func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (map[eventingduck.SubscriberSpec]error, error) { s.subscriptionsMux.Lock() defer s.subscriptionsMux.Unlock() + failedToSubscribe := make(map[eventingduck.SubscriberSpec]error) cRef := provisioners.ChannelReference{Namespace: channel.Namespace, Name: channel.Name} - if channel.Spec.Subscribable == nil || isFinalizer { s.logger.Sugar().Infof("Empty subscriptions for channel Ref: %v; unsubscribe all active subscriptions, if any", cRef) chMap, ok := s.subscriptions[cRef] if !ok { // nothing to do s.logger.Sugar().Infof("No channel Ref %v found in subscriptions map", cRef) - return nil + return failedToSubscribe, nil } for sub := range chMap { s.unsubscribe(cRef, sub) } delete(s.subscriptions, cRef) - return nil + return failedToSubscribe, nil } subscriptions := channel.Spec.Subscribable.Subscribers @@ -211,6 +216,7 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. chMap = make(map[subscriptionReference]*stan.Subscription) s.subscriptions[cRef] = chMap } + var errStrings []string for _, sub := range subscriptions { // check if the subscription already exist and do nothing in this case subRef := newSubscriptionReference(sub) @@ -219,10 +225,13 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef) continue } - // subscribe + // subscribe and update failedSubscription if subscribe fails natssSub, err := s.subscribe(cRef, subRef) if err != nil { - return err + errStrings = append(errStrings, err.Error()) + s.logger.Sugar().Errorf("failed to subscribe (subscription:%q) to channel: %v. Error:%s", sub, cRef, err.Error()) + failedToSubscribe[sub] = err + continue } chMap[subRef] = natssSub activeSubs[subRef] = true @@ -237,7 +246,19 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1. if len(s.subscriptions[cRef]) == 0 { delete(s.subscriptions, cRef) } - return nil + return failedToSubscribe, nil +} + +func toSubscriberStatus(subSpec *v1alpha1.SubscriberSpec, condition corev1.ConditionStatus, msg string) *v1alpha1.SubscriberStatus { + if subSpec == nil { + return nil + } + return &v1alpha1.SubscriberStatus{ + UID: subSpec.UID, + ObservedGeneration: subSpec.Generation, + Message: msg, + Ready: condition, + } } func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReference, subscription subscriptionReference) (*stan.Subscription, error) { diff --git a/contrib/natss/pkg/dispatcher/dispatcher_test.go b/contrib/natss/pkg/dispatcher/dispatcher_test.go index 3dbf7524b4f..cdc80b4b20a 100644 --- a/contrib/natss/pkg/dispatcher/dispatcher_test.go +++ b/contrib/natss/pkg/dispatcher/dispatcher_test.go @@ -177,7 +177,7 @@ func TestUpdateSubscriptions(t *testing.T) { logger.Info("TestUpdateSubscriptions()") c := makeChannelWithSubscribers() - if err := s.UpdateSubscriptions(c, false); err != nil { + if _, err := s.UpdateSubscriptions(c, false); err != nil { t.Errorf("UpdateSubscriptions failed: %v", err) } diff --git a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go index 9a938e5b597..6853add26a8 100644 --- a/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go +++ b/contrib/natss/pkg/reconciler/dispatcher/natsschannel.go @@ -19,24 +19,28 @@ package controller import ( "context" "encoding/json" - - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "go.uber.org/zap" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/sets" + "fmt" + "reflect" + "strings" "github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1" messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/contrib/natss/pkg/dispatcher" "github.com/knative/eventing/contrib/natss/pkg/reconciler" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners/fanout" "github.com/knative/eventing/pkg/provisioners/multichannelfanout" "github.com/knative/pkg/controller" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" ) @@ -107,15 +111,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return err } + if !original.Status.IsReady() { + return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status") + } + // Don't modify the informers copy. natssChannel := original.DeepCopy() + reconcileErr := r.reconcile(ctx, natssChannel) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling NatssChannel", zap.Error(reconcileErr)) + } else { + logging.FromContext(ctx).Debug("NatssChannel reconciled") + } + + // TODO: Should this check for subscribable status rather than entire status? + if _, updateStatusErr := r.updateStatus(ctx, natssChannel); updateStatusErr != nil { + logging.FromContext(ctx).Error("Failed to update NatssChannel status", zap.Error(updateStatusErr)) + return updateStatusErr + } + return nil +} + +func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.NatssChannel) error { // TODO update dispatcher API and use Channelable or NatssChannel. c := toChannel(natssChannel) // See if the channel has been deleted. if natssChannel.DeletionTimestamp != nil { - if err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil { + if _, err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil { logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err)) return err } @@ -131,10 +155,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { } // Try to subscribe. - if err := r.natssDispatcher.UpdateSubscriptions(c, false); err != nil { + failedSubscriptions, err := r.natssDispatcher.UpdateSubscriptions(c, false) + if err != nil { logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err)) return err } + natssChannel.Status.SubscribableStatus = r.createSubscribableStatus(natssChannel.Spec.Subscribable, failedSubscriptions) + if len(failedSubscriptions) > 0 { + var b strings.Builder + for _, subError := range failedSubscriptions { + b.WriteString("\n") + b.WriteString(subError.Error()) + } + errMsg := b.String() + logging.FromContext(ctx).Error(errMsg) + return fmt.Errorf(errMsg) + } natssChannels, err := r.natsschannelLister.List(labels.Everything()) if err != nil { @@ -156,6 +192,44 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { return nil } +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable, failedSubscriptions map[eventingduck.SubscriberSpec]error) *eventingduck.SubscribableStatus { + if subscribable == nil { + return nil + } + subscriberStatus := make([]eventingduck.SubscriberStatus, 0) + for _, sub := range subscribable.Subscribers { + status := eventingduck.SubscriberStatus{ + UID: sub.UID, + ObservedGeneration: sub.Generation, + Ready: corev1.ConditionTrue, + } + if err, ok := failedSubscriptions[sub]; ok { + status.Ready = corev1.ConditionFalse + status.Message = err.Error() + } + subscriberStatus = append(subscriberStatus, status) + } + return &eventingduck.SubscribableStatus{ + Subscribers: subscriberStatus, + } +} + +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssChannel) (*v1alpha1.NatssChannel, error) { + nc, err := r.natsschannelLister.NatssChannels(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + if reflect.DeepEqual(nc.Status, desired.Status) { + return nc, nil + } + + // Don't modify the informers copy. + existing := nc.DeepCopy() + existing.Status = desired.Status + + return r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing) +} // newConfigFromNatssChannels creates a new Config from the list of natss channels. func (r *Reconciler) newConfigFromNatssChannels(channels []*v1alpha1.NatssChannel) *multichannelfanout.Config { @@ -208,7 +282,7 @@ func removeFinalizer(channel *v1alpha1.NatssChannel) { } func toChannel(natssChannel *v1alpha1.NatssChannel) *eventingv1alpha1.Channel { - return &eventingv1alpha1.Channel{ + channel := &eventingv1alpha1.Channel{ ObjectMeta: v1.ObjectMeta{ Name: natssChannel.Name, Namespace: natssChannel.Namespace, @@ -216,8 +290,11 @@ func toChannel(natssChannel *v1alpha1.NatssChannel) *eventingv1alpha1.Channel { Spec: eventingv1alpha1.ChannelSpec{ Subscribable: natssChannel.Spec.Subscribable, }, - Status: eventingv1alpha1.ChannelStatus{ - Address: natssChannel.Status.Address, - }, } + if natssChannel.Status.Address != nil { + channel.Status = eventingv1alpha1.ChannelStatus{ + Address: *natssChannel.Status.Address, + } + } + return channel }