Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) e
return err
}
kc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(kc.Spec.Subscribable, failedSubscriptions)
if len(failedSubscriptions) > 0 {
logging.FromContext(ctx).Error("Some kafka subscriptions failed to subscribe")
return fmt.Errorf("Some kafka subscriptions failed to subscribe")
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
"github.com/knative/pkg/apis"
"github.com/knative/pkg/apis/duck/v1alpha1"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
)
Expand Down Expand Up @@ -51,7 +52,7 @@ const (
// the Addressable contract and has a non-empty hostname.
NatssChannelConditionAddressable apis.ConditionType = "Addressable"

// NatssChannelConditionServiceReady has status True when a k8s Service representing the channel is ready.
// NatssChannelConditionChannelServiceReady has status True when a k8s Service representing the channel is ready.
// Because this uses ExternalName, there are no endpoints to check.
NatssChannelConditionChannelServiceReady apis.ConditionType = "ChannelServiceReady"
)
Expand All @@ -71,7 +72,11 @@ func (cs *NatssChannelStatus) InitializeConditions() {
nc.Manage(cs).InitializeConditions()
}

// SetAddress sets the address (as part of Addressable contract) and marks the correct condition.
func (cs *NatssChannelStatus) SetAddress(url *apis.URL) {
if cs.Address == nil {
cs.Address = &v1alpha1.Addressable{}
}
if url != nil {
cs.Address.Hostname = url.Host
cs.Address.URL = url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package v1alpha1
import (
"testing"

"github.com/knative/pkg/apis"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
"github.com/knative/pkg/apis"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1"
appsv1 "k8s.io/api/apps/v1"
Expand Down Expand Up @@ -341,19 +341,23 @@ func TestNatssChannelStatus_SetAddressable(t *testing.T) {
},
},
},
AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}},
SubscribableTypeStatus: eventingduck.SubscribableTypeStatus{},
},
},
"has domain": {
url: &apis.URL{Scheme: "http", Host: "test-domain"},
want: &NatssChannelStatus{
Address: duckv1alpha1.Addressable{
Addressable: duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
AddressStatus: duckv1alpha1.AddressStatus{
Address: &duckv1alpha1.Addressable{
Addressable: duckv1beta1.Addressable{
URL: &apis.URL{
Scheme: "http",
Host: "test-domain",
},
},
Hostname: "test-domain",
},
Hostname: "test-domain",
},
Status: duckv1beta1.Status{
Conditions: []apis.Condition{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ type NatssChannelStatus struct {
// provided targets from inside the cluster.
//
// It generally has the form {channel}.{namespace}.svc.{cluster domain name}
Address duckv1alpha1.Addressable `json:"address,omitempty"`
duckv1alpha1.AddressStatus `json:",inline"`

// Subscribers is populated with the statuses of each of the Channelable's subscribers.
eventingduck.SubscribableTypeStatus `json:",inline"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions contrib/natss/pkg/dispatcher/channel/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (r *reconciler) shouldReconcile(c *eventingv1alpha1.Channel) bool {

func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel) (bool, error) {
if !c.DeletionTimestamp.IsZero() {
if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil {
if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, true); err != nil {
r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err))
return false, err
}
Expand All @@ -119,7 +119,7 @@ func (r *reconciler) reconcile(ctx context.Context, c *eventingv1alpha1.Channel)
}

// try to subscribe
if err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil {
if _, err := r.subscriptionsSupervisor.UpdateSubscriptions(c, false); err != nil {
r.logger.Error("UpdateSubscriptions() failed: ", zap.Error(err))
return false, err
}
Expand Down
39 changes: 30 additions & 9 deletions contrib/natss/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
"time"

"github.com/knative/eventing/contrib/natss/pkg/stanutil"
"github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/provisioners"
stan "github.com/nats-io/go-nats-streaming"
"go.uber.org/zap"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
corev1 "k8s.io/api/core/v1"
)

const (
Expand Down Expand Up @@ -182,25 +184,28 @@ func (s *SubscriptionsSupervisor) Connect(stopCh <-chan struct{}) {
}
}

func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) error {
// UpdateSubscriptions creates/deletes the natss subscriptions based on channel.Spec.Subscribable.Subscribers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to describe what it returns as well, now that it returns a map of failed subscriptions?

// Return type:map[eventingduck.SubscriberSpec]error --> Returns a map of subscriberSpec that failed with the value=error encountered.
// Ignore the value in case error != nil
func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.Channel, isFinalizer bool) (map[eventingduck.SubscriberSpec]error, error) {
s.subscriptionsMux.Lock()
defer s.subscriptionsMux.Unlock()

failedToSubscribe := make(map[eventingduck.SubscriberSpec]error)
cRef := provisioners.ChannelReference{Namespace: channel.Namespace, Name: channel.Name}

if channel.Spec.Subscribable == nil || isFinalizer {
s.logger.Sugar().Infof("Empty subscriptions for channel Ref: %v; unsubscribe all active subscriptions, if any", cRef)
chMap, ok := s.subscriptions[cRef]
if !ok {
// nothing to do
s.logger.Sugar().Infof("No channel Ref %v found in subscriptions map", cRef)
return nil
return failedToSubscribe, nil
}
for sub := range chMap {
s.unsubscribe(cRef, sub)
}
delete(s.subscriptions, cRef)
return nil
return failedToSubscribe, nil
}

subscriptions := channel.Spec.Subscribable.Subscribers
Expand All @@ -211,6 +216,7 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.
chMap = make(map[subscriptionReference]*stan.Subscription)
s.subscriptions[cRef] = chMap
}
var errStrings []string
for _, sub := range subscriptions {
// check if the subscription already exist and do nothing in this case
subRef := newSubscriptionReference(sub)
Expand All @@ -219,10 +225,13 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.
s.logger.Sugar().Infof("Subscription: %v already active for channel: %v", sub, cRef)
continue
}
// subscribe
// subscribe and update failedSubscription if subscribe fails
natssSub, err := s.subscribe(cRef, subRef)
if err != nil {
return err
errStrings = append(errStrings, err.Error())
s.logger.Sugar().Errorf("failed to subscribe (subscription:%q) to channel: %v. Error:%s", sub, cRef, err.Error())
failedToSubscribe[sub] = err
continue
}
chMap[subRef] = natssSub
Comment thread
akashrv marked this conversation as resolved.
activeSubs[subRef] = true
Expand All @@ -237,7 +246,19 @@ func (s *SubscriptionsSupervisor) UpdateSubscriptions(channel *eventingv1alpha1.
if len(s.subscriptions[cRef]) == 0 {
delete(s.subscriptions, cRef)
}
return nil
return failedToSubscribe, nil
}

func toSubscriberStatus(subSpec *v1alpha1.SubscriberSpec, condition corev1.ConditionStatus, msg string) *v1alpha1.SubscriberStatus {
if subSpec == nil {
return nil
}
return &v1alpha1.SubscriberStatus{
UID: subSpec.UID,
ObservedGeneration: subSpec.Generation,
Message: msg,
Ready: condition,
}
}

func (s *SubscriptionsSupervisor) subscribe(channel provisioners.ChannelReference, subscription subscriptionReference) (*stan.Subscription, error) {
Expand Down
2 changes: 1 addition & 1 deletion contrib/natss/pkg/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func TestUpdateSubscriptions(t *testing.T) {
logger.Info("TestUpdateSubscriptions()")

c := makeChannelWithSubscribers()
if err := s.UpdateSubscriptions(c, false); err != nil {
if _, err := s.UpdateSubscriptions(c, false); err != nil {
t.Errorf("UpdateSubscriptions failed: %v", err)
}

Expand Down
99 changes: 88 additions & 11 deletions contrib/natss/pkg/reconciler/dispatcher/natsschannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,28 @@ package controller
import (
"context"
"encoding/json"

eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"go.uber.org/zap"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"fmt"
"reflect"
"strings"

"github.com/knative/eventing/contrib/natss/pkg/apis/messaging/v1alpha1"
messaginginformers "github.com/knative/eventing/contrib/natss/pkg/client/informers/externalversions/messaging/v1alpha1"
listers "github.com/knative/eventing/contrib/natss/pkg/client/listers/messaging/v1alpha1"
"github.com/knative/eventing/contrib/natss/pkg/dispatcher"
"github.com/knative/eventing/contrib/natss/pkg/reconciler"
eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/logging"
"github.com/knative/eventing/pkg/provisioners/fanout"
"github.com/knative/eventing/pkg/provisioners/multichannelfanout"
"github.com/knative/pkg/controller"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
)

Expand Down Expand Up @@ -107,15 +111,35 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
return err
}

if !original.Status.IsReady() {
return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status")
Comment thread
akashrv marked this conversation as resolved.
}

// Don't modify the informers copy.
natssChannel := original.DeepCopy()

reconcileErr := r.reconcile(ctx, natssChannel)
if reconcileErr != nil {
logging.FromContext(ctx).Error("Error reconciling NatssChannel", zap.Error(reconcileErr))
} else {
logging.FromContext(ctx).Debug("NatssChannel reconciled")
}

// TODO: Should this check for subscribable status rather than entire status?
if _, updateStatusErr := r.updateStatus(ctx, natssChannel); updateStatusErr != nil {
logging.FromContext(ctx).Error("Failed to update NatssChannel status", zap.Error(updateStatusErr))
return updateStatusErr
}
return nil
}

func (r *Reconciler) reconcile(ctx context.Context, natssChannel *v1alpha1.NatssChannel) error {
// TODO update dispatcher API and use Channelable or NatssChannel.
c := toChannel(natssChannel)

// See if the channel has been deleted.
if natssChannel.DeletionTimestamp != nil {
if err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil {
if _, err := r.natssDispatcher.UpdateSubscriptions(c, true); err != nil {
logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err))
return err
}
Expand All @@ -131,10 +155,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {
}

// Try to subscribe.
if err := r.natssDispatcher.UpdateSubscriptions(c, false); err != nil {
failedSubscriptions, err := r.natssDispatcher.UpdateSubscriptions(c, false)
if err != nil {
logging.FromContext(ctx).Error("Error updating subscriptions", zap.Any("channel", c), zap.Error(err))
return err
}
natssChannel.Status.SubscribableStatus = r.createSubscribableStatus(natssChannel.Spec.Subscribable, failedSubscriptions)
if len(failedSubscriptions) > 0 {
var b strings.Builder
for _, subError := range failedSubscriptions {
b.WriteString("\n")
b.WriteString(subError.Error())
}
errMsg := b.String()
logging.FromContext(ctx).Error(errMsg)
return fmt.Errorf(errMsg)
}

natssChannels, err := r.natsschannelLister.List(labels.Everything())
if err != nil {
Expand All @@ -156,6 +192,44 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error {

return nil
}
func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable, failedSubscriptions map[eventingduck.SubscriberSpec]error) *eventingduck.SubscribableStatus {
if subscribable == nil {
return nil
}
subscriberStatus := make([]eventingduck.SubscriberStatus, 0)
for _, sub := range subscribable.Subscribers {
status := eventingduck.SubscriberStatus{
UID: sub.UID,
ObservedGeneration: sub.Generation,
Ready: corev1.ConditionTrue,
}
if err, ok := failedSubscriptions[sub]; ok {
status.Ready = corev1.ConditionFalse
status.Message = err.Error()
}
subscriberStatus = append(subscriberStatus, status)
}
return &eventingduck.SubscribableStatus{
Subscribers: subscriberStatus,
}
}

func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.NatssChannel) (*v1alpha1.NatssChannel, error) {
nc, err := r.natsschannelLister.NatssChannels(desired.Namespace).Get(desired.Name)
if err != nil {
return nil, err
}

if reflect.DeepEqual(nc.Status, desired.Status) {
return nc, nil
}

// Don't modify the informers copy.
existing := nc.DeepCopy()
existing.Status = desired.Status

return r.NatssClientSet.MessagingV1alpha1().NatssChannels(desired.Namespace).UpdateStatus(existing)
}

// newConfigFromNatssChannels creates a new Config from the list of natss channels.
func (r *Reconciler) newConfigFromNatssChannels(channels []*v1alpha1.NatssChannel) *multichannelfanout.Config {
Expand Down Expand Up @@ -208,16 +282,19 @@ func removeFinalizer(channel *v1alpha1.NatssChannel) {
}

func toChannel(natssChannel *v1alpha1.NatssChannel) *eventingv1alpha1.Channel {
return &eventingv1alpha1.Channel{
channel := &eventingv1alpha1.Channel{
ObjectMeta: v1.ObjectMeta{
Name: natssChannel.Name,
Namespace: natssChannel.Namespace,
},
Spec: eventingv1alpha1.ChannelSpec{
Subscribable: natssChannel.Spec.Subscribable,
},
Status: eventingv1alpha1.ChannelStatus{
Address: natssChannel.Status.Address,
},
}
if natssChannel.Status.Address != nil {
channel.Status = eventingv1alpha1.ChannelStatus{
Address: *natssChannel.Status.Address,
}
}
return channel
}