diff --git a/config/300-channel.yaml b/config/300-channel.yaml index 3244f00fb57..8eabb171d89 100644 --- a/config/300-channel.yaml +++ b/config/300-channel.yaml @@ -95,6 +95,8 @@ spec: uid: type: string minLength: 1 + generation: + type: integer uid: type: string minLength: 1 diff --git a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go index be70f5680cb..ab2072e540f 100644 --- a/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go +++ b/contrib/gcppubsub/pkg/controller/channel/reconcile_test.go @@ -485,7 +485,7 @@ func TestReconcile(t *testing.T) { WantPresent: []runtime.Object{ makeChannelWithFinalizerAndSubscriberWithoutUID(), }, - WantErrMsg: "empty reference UID: {nil http://foo/ }", + WantErrMsg: "empty reference UID: {nil 0 http://foo/ }", WantEvent: []corev1.Event{ events[gcpResourcesPlanFailed], }, diff --git a/contrib/kafka/cmd/channel_controller/main.go b/contrib/kafka/cmd/channel_controller/main.go index d2554687b5e..6e13a9c2be1 100644 --- a/contrib/kafka/cmd/channel_controller/main.go +++ b/contrib/kafka/cmd/channel_controller/main.go @@ -20,6 +20,9 @@ import ( "flag" "log" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned" eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions" @@ -63,7 +66,6 @@ func main() { logger.Fatalw("Error building kubeconfig", zap.Error(err)) } - // TODO the underlying config map needs to be watched and the config should be reloaded if there is a change. kafkaConfig, err := utils.GetKafkaConfig("/etc/config-kafka") if err != nil { logger.Fatalw("Error loading kafka config", zap.Error(err)) diff --git a/contrib/kafka/cmd/channel_dispatcher/main.go b/contrib/kafka/cmd/channel_dispatcher/main.go index 732446f33b6..6a2c31706cc 100644 --- a/contrib/kafka/cmd/channel_dispatcher/main.go +++ b/contrib/kafka/cmd/channel_dispatcher/main.go @@ -20,6 +20,9 @@ import ( "flag" "log" + // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). + // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned" eventingScheme "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned/scheme" informers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions" diff --git a/contrib/kafka/config/200-dispatcher-clusterrole.yaml b/contrib/kafka/config/200-dispatcher-clusterrole.yaml index b3766ef80d2..c5335aad3b1 100644 --- a/contrib/kafka/config/200-dispatcher-clusterrole.yaml +++ b/contrib/kafka/config/200-dispatcher-clusterrole.yaml @@ -21,11 +21,17 @@ rules: - messaging.knative.dev resources: - kafkachannels - - kafkachannels/status verbs: - get - list - watch + - apiGroups: + - messaging.knative.dev + resources: + - kafkachannels/status + verbs: + - get + - update - apiGroups: - "" # Core API group. resources: diff --git a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle.go b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle.go index 820303ad3f5..f4fc6c2ded7 100644 --- a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle.go +++ b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( "github.com/knative/pkg/apis" + "github.com/knative/pkg/apis/duck/v1alpha1" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" ) @@ -75,7 +76,11 @@ func (cs *KafkaChannelStatus) InitializeConditions() { kc.Manage(cs).InitializeConditions() } +// SetAddress sets the address (as part of Addressable contract) and marks the correct condition. func (cs *KafkaChannelStatus) SetAddress(url *apis.URL) { + if cs.Address == nil { + cs.Address = &v1alpha1.Addressable{} + } if url != nil { cs.Address.Hostname = url.Host cs.Address.URL = url diff --git a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle_test.go b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle_test.go index 4c78e7bfa04..8d763ab136a 100644 --- a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle_test.go +++ b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_lifecycle_test.go @@ -376,20 +376,22 @@ func TestKafkaChannelStatus_SetAddressable(t *testing.T) { }, }, }, + AddressStatus: duckv1alpha1.AddressStatus{Address: &duckv1alpha1.Addressable{}}, }, }, "has domain": { url: &apis.URL{Scheme: "http", Host: "test-domain"}, want: &KafkaChannelStatus{ - Address: duckv1alpha1.Addressable{ - Addressable: duckv1beta1.Addressable{ - URL: &apis.URL{ - Scheme: "http", - Host: "test-domain", + AddressStatus: duckv1alpha1.AddressStatus{ + Address: &duckv1alpha1.Addressable{ + Addressable: duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, }, - }, - Hostname: "test-domain", - }, + Hostname: "test-domain", + }}, Status: duckv1beta1.Status{ Conditions: []apis.Condition{ { diff --git a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_types.go b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_types.go index 952023e9291..370ed057bd2 100644 --- a/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_types.go +++ b/contrib/kafka/pkg/apis/messaging/v1alpha1/kafka_channel_types.go @@ -75,7 +75,10 @@ type KafkaChannelStatus struct { // provided targets from inside the cluster. // // It generally has the form {channel}.{namespace}.svc.{cluster domain name} - Address duckv1alpha1.Addressable `json:"address,omitempty"` + duckv1alpha1.AddressStatus `json:",inline"` + + // Subscribers is populated with the statuses of each of the Channelable's subscribers. + eventingduck.SubscribableTypeStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/contrib/kafka/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go b/contrib/kafka/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go index bc991679170..40774c5ddf2 100644 --- a/contrib/kafka/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go +++ b/contrib/kafka/pkg/apis/messaging/v1alpha1/zz_generated.deepcopy.go @@ -111,7 +111,8 @@ func (in *KafkaChannelSpec) DeepCopy() *KafkaChannelSpec { func (in *KafkaChannelStatus) DeepCopyInto(out *KafkaChannelStatus) { *out = *in in.Status.DeepCopyInto(&out.Status) - in.Address.DeepCopyInto(&out.Address) + in.AddressStatus.DeepCopyInto(&out.AddressStatus) + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } diff --git a/contrib/kafka/pkg/dispatcher/dispatcher.go b/contrib/kafka/pkg/dispatcher/dispatcher.go index 1cfac268480..b5a812ae3cd 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher.go @@ -38,13 +38,16 @@ type KafkaDispatcher struct { // TODO: config doesn't have to be atomic as it is read and updated using updateLock. config atomic.Value hostToChannelMap atomic.Value - updateLock sync.Mutex + // hostToChannelMapLock is used to update hostToChannelMap + hostToChannelMapLock sync.Mutex receiver *provisioners.MessageReceiver dispatcher *provisioners.MessageDispatcher kafkaAsyncProducer sarama.AsyncProducer kafkaConsumers map[provisioners.ChannelReference]map[subscription]KafkaConsumer + // consumerUpdateLock must be used to update kafkaConsumers + consumerUpdateLock sync.Mutex kafkaCluster KafkaCluster topicFunc TopicFunc @@ -104,61 +107,78 @@ func (d *KafkaDispatcher) configDiff(updated *multichannelfanout.Config) string return cmp.Diff(d.getConfig(), updated) } -func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error { +// UpdateKafkaConsumers will be called by new CRD based kafka channel dispatcher controller, instead of UpdateConfig. +func (d *KafkaDispatcher) UpdateKafkaConsumers(config *multichannelfanout.Config) (map[eventingduck.SubscriberSpec]error, error) { if config == nil { - return errors.New("nil config") + return nil, fmt.Errorf("nil config") } - d.updateLock.Lock() - defer d.updateLock.Unlock() + d.consumerUpdateLock.Lock() + defer d.consumerUpdateLock.Unlock() - if diff := d.configDiff(config); diff != "" { - d.logger.Info("Updating config (-old +new)", zap.String("diff", diff)) - - // Create hostToChannelMap before updating kafkaConsumers. - // But update the map only after updating kafkaConsumers. - hcMap, err := createHostToChannelMap(config) - if err != nil { - return err + newSubs := make(map[subscription]bool) + failedToSubscribe := make(map[eventingduck.SubscriberSpec]error) + for _, cc := range config.ChannelConfigs { + channelRef := provisioners.ChannelReference{ + Name: cc.Name, + Namespace: cc.Namespace, } - - newSubs := make(map[subscription]bool) - - // Subscribe to new subscriptions. - // TODO: Error returned by subscribe/unsubscribe must be handled. - // https://github.com/knative/eventing/issues/1072. - for _, cc := range config.ChannelConfigs { - channelRef := provisioners.ChannelReference{ - Name: cc.Name, - Namespace: cc.Namespace, - } - for _, subSpec := range cc.FanoutConfig.Subscriptions { - sub := newSubscription(subSpec) - if _, ok := d.kafkaConsumers[channelRef][sub]; !ok { - // only subscribe when not exists in channel-subscriptions map - // do not need to resubscribe every time channel fanout config is updated - d.subscribe(channelRef, sub) + for _, subSpec := range cc.FanoutConfig.Subscriptions { + sub := newSubscription(subSpec) + if _, ok := d.kafkaConsumers[channelRef][sub]; !ok { + // only subscribe when not exists in channel-subscriptions map + // do not need to resubscribe every time channel fanout config is updated + if err := d.subscribe(channelRef, sub); err != nil { + failedToSubscribe[subSpec] = err } - - newSubs[sub] = true } + newSubs[sub] = true } + } - // Unsubscribe and close consumer for any deleted subscriptions - for channelRef, subMap := range d.kafkaConsumers { - for sub := range subMap { - if ok := newSubs[sub]; !ok { - d.unsubscribe(channelRef, sub) - } + // Unsubscribe and close consumer for any deleted subscriptions + for channelRef, subMap := range d.kafkaConsumers { + for sub := range subMap { + if ok := newSubs[sub]; !ok { + d.unsubscribe(channelRef, sub) } } - // At this point all updates are done and hostToChannelMap is created successfully. - // Update the atomic value. - d.setHostToChannelMap(hcMap) + } + return failedToSubscribe, nil +} + +// UpdateHostToChannelMap will be called by new CRD based kafka channel dispatcher controller, instead of UpdateConfig. +func (d *KafkaDispatcher) UpdateHostToChannelMap(config *multichannelfanout.Config) error { + if config == nil { + return errors.New("nil config") + } + + d.hostToChannelMapLock.Lock() + defer d.hostToChannelMapLock.Unlock() - // Update the config so that it can be used for comparison during next sync - d.setConfig(config) + hcMap, err := createHostToChannelMap(config) + if err != nil { + return err + } + + d.setHostToChannelMap(hcMap) + return nil +} + +// UpdateConfig is used by older kafka channel dispatcher controller that is based on ClusterChannelProvisioners model +// Remove this function when the older channel code is deleted +func (d *KafkaDispatcher) UpdateConfig(config *multichannelfanout.Config) error { + if config == nil { + return errors.New("nil config") + } + + if _, err := d.UpdateKafkaConsumers(config); err != nil { + return err + } + if err := d.UpdateHostToChannelMap(config); err != nil { + return err } + return nil } diff --git a/contrib/kafka/pkg/dispatcher/dispatcher_test.go b/contrib/kafka/pkg/dispatcher/dispatcher_test.go index be78ea42b38..e28205942a7 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher_test.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher_test.go @@ -417,9 +417,6 @@ func TestDispatcher_UpdateConfig(t *testing.T) { if diff := sets.NewString(tc.unsubscribes...).Difference(oldSubscribers); diff.Len() != 0 { t.Errorf("subscriptions %+v were never subscribed", diff) } - if diff := cmp.Diff(tc.oldConfig, d.getConfig()); diff != "" { - t.Errorf("unexpected config (-want, +got) = %v", diff) - } if diff := cmp.Diff(tc.oldHostToChanMap, d.getHostToChannelMap()); diff != "" { t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff) } @@ -450,9 +447,6 @@ func TestDispatcher_UpdateConfig(t *testing.T) { if diff := cmp.Diff(tc.newHostToChanMap, d.getHostToChannelMap()); diff != "" { t.Errorf("unexpected hostToChannelMap (-want, +got) = %v", diff) } - if diff := cmp.Diff(tc.newConfig, d.getConfig()); diff != "" { - t.Errorf("unexpected config (-want, +got) = %v", diff) - } }) } diff --git a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go index 59dec3e7d5d..5c9dc9d686b 100644 --- a/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go +++ b/contrib/kafka/pkg/reconciler/dispatcher/kafkachannel.go @@ -18,17 +18,23 @@ package controller import ( "context" + "fmt" + "reflect" "github.com/knative/eventing/contrib/kafka/pkg/apis/messaging/v1alpha1" clientset "github.com/knative/eventing/contrib/kafka/pkg/client/clientset/versioned" messaginginformers "github.com/knative/eventing/contrib/kafka/pkg/client/informers/externalversions/messaging/v1alpha1" listers "github.com/knative/eventing/contrib/kafka/pkg/client/listers/messaging/v1alpha1" "github.com/knative/eventing/contrib/kafka/pkg/dispatcher" + eventingduck "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/provisioners/fanout" "github.com/knative/eventing/pkg/provisioners/multichannelfanout" "github.com/knative/eventing/pkg/reconciler" "github.com/knative/pkg/controller" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrs "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/tools/cache" ) @@ -40,6 +46,11 @@ const ( // controllerAgentName is the string used by this controller to identify // itself when creating events. controllerAgentName = "kafka-ch-dispatcher" + + // Name of the corev1.Events emitted from the reconciliation process. + channelReconciled = "ChannelReconciled" + channelReconcileFailed = "ChannelReconcileFailed" + channelUpdateStatusFailed = "ChannelUpdateStatusFailed" ) // Reconciler reconciles Kafka Channels. @@ -85,58 +96,145 @@ func NewController( func (r *Reconciler) Reconcile(ctx context.Context, key string) error { // Convert the namespace/name string into a distinct namespace and name. - _, _, err := cache.SplitMetaNamespaceKey(key) + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { logging.FromContext(ctx).Error("invalid resource key") return nil } - // This is a special Reconciler that does the following: - // 1. Lists the kafka channels. - // 2. Creates a multi-channel-fanout-config. - // 3. Calls the kafka dispatcher's updateConfig func with the new multi-channel-fanout-config. + // Get the KafkaChannel resource with this namespace/name. + original, err := r.kafkachannelLister.KafkaChannels(namespace).Get(name) + if apierrs.IsNotFound(err) { + // The resource may no longer exist, in which case we stop processing. + logging.FromContext(ctx).Error("KafkaChannel key in work queue no longer exists") + return nil + } else if err != nil { + return err + } + + if !original.Status.IsReady() { + return fmt.Errorf("Channel is not ready. Cannot configure and update subscriber status") + } + + // Don't modify the informers copy. + channel := original.DeepCopy() + + // Reconcile this copy of the KafkaChannel and then write back any status updates regardless of + // whether the reconcile error out. + reconcileErr := r.reconcile(ctx, channel) + if reconcileErr != nil { + logging.FromContext(ctx).Error("Error reconciling KafkaChannel", zap.Error(reconcileErr)) + r.Recorder.Eventf(channel, corev1.EventTypeWarning, channelReconcileFailed, "KafkaChannel reconciliation failed: %v", reconcileErr) + } else { + logging.FromContext(ctx).Debug("KafkaChannel reconciled") + r.Recorder.Event(channel, corev1.EventTypeNormal, channelReconciled, "KafkaChannel reconciled") + } + + // todo: Should this check for subscribable status rather than entire status? + if _, updateStatusErr := r.updateStatus(ctx, channel); updateStatusErr != nil { + logging.FromContext(ctx).Error("Failed to update KafkaChannel status", zap.Error(updateStatusErr)) + r.Recorder.Eventf(channel, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update KafkaChannel's status: %v", updateStatusErr) + return updateStatusErr + } + + // Requeue if the resource is not ready + return reconcileErr +} +func (r *Reconciler) reconcile(ctx context.Context, kc *v1alpha1.KafkaChannel) error { channels, err := r.kafkachannelLister.List(labels.Everything()) if err != nil { logging.FromContext(ctx).Error("Error listing kafka channels") return err } + // TODO: revisit this code. Instead of reading all channels and updating consumers and hostToChannel map for all + // why not just reconcile the current channel. With this the UpdateKafkaConsumers can now return SubscribableStatus + // for the subscriptions on the channel that is being reconciled. kafkaChannels := make([]*v1alpha1.KafkaChannel, 0) for _, channel := range channels { if channel.Status.IsReady() { kafkaChannels = append(kafkaChannels, channel) } } - config := r.newConfigFromKafkaChannels(kafkaChannels) - err = r.kafkaDispatcher.UpdateConfig(config) - if err != nil { - logging.FromContext(ctx).Error("Error updating kafka dispatcher config") + if err := r.kafkaDispatcher.UpdateHostToChannelMap(config); err != nil { + logging.FromContext(ctx).Error("Error updating host to channel map in dispatcher") return err } + failedSubscriptions, err := r.kafkaDispatcher.UpdateKafkaConsumers(config) + if err != nil { + logging.FromContext(ctx).Error("Error updating kafka consumers in dispatcher") + return err + } + kc.Status.SubscribableTypeStatus.SubscribableStatus = r.createSubscribableStatus(kc.Spec.Subscribable, failedSubscriptions) return nil } +func (r *Reconciler) createSubscribableStatus(subscribable *eventingduck.Subscribable, failedSubscriptions map[eventingduck.SubscriberSpec]error) *eventingduck.SubscribableStatus { + if subscribable == nil { + return nil + } + subscriberStatus := make([]eventingduck.SubscriberStatus, 0) + for _, sub := range subscribable.Subscribers { + status := eventingduck.SubscriberStatus{ + UID: sub.UID, + ObservedGeneration: sub.Generation, + Ready: corev1.ConditionTrue, + } + if err, ok := failedSubscriptions[sub]; ok { + status.Ready = corev1.ConditionFalse + status.Message = err.Error() + } + subscriberStatus = append(subscriberStatus, status) + } + return &eventingduck.SubscribableStatus{ + Subscribers: subscriberStatus, + } +} + +// newConfigFromKafkaChannels creates a new Config from the list of kafka channels. +func (r *Reconciler) newChannelConfigFromKafkaChannel(c *v1alpha1.KafkaChannel) *multichannelfanout.ChannelConfig { + channelConfig := multichannelfanout.ChannelConfig{ + Namespace: c.Namespace, + Name: c.Name, + HostName: c.Status.Address.Hostname, + } + if c.Spec.Subscribable != nil { + channelConfig.FanoutConfig = fanout.Config{ + AsyncHandler: true, + Subscriptions: c.Spec.Subscribable.Subscribers, + } + } + return &channelConfig +} + // newConfigFromKafkaChannels creates a new Config from the list of kafka channels. func (r *Reconciler) newConfigFromKafkaChannels(channels []*v1alpha1.KafkaChannel) *multichannelfanout.Config { cc := make([]multichannelfanout.ChannelConfig, 0) for _, c := range channels { - channelConfig := multichannelfanout.ChannelConfig{ - Namespace: c.Namespace, - Name: c.Name, - HostName: c.Status.Address.Hostname, - } - if c.Spec.Subscribable != nil { - channelConfig.FanoutConfig = fanout.Config{ - AsyncHandler: true, - Subscriptions: c.Spec.Subscribable.Subscribers, - } - } - cc = append(cc, channelConfig) + channelConfig := r.newChannelConfigFromKafkaChannel(c) + cc = append(cc, *channelConfig) } return &multichannelfanout.Config{ ChannelConfigs: cc, } } +func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.KafkaChannel) (*v1alpha1.KafkaChannel, error) { + kc, err := r.kafkachannelLister.KafkaChannels(desired.Namespace).Get(desired.Name) + if err != nil { + return nil, err + } + + if reflect.DeepEqual(kc.Status, desired.Status) { + return kc, nil + } + + // Don't modify the informers copy. + existing := kc.DeepCopy() + existing.Status = desired.Status + + new, err := r.eventingClientSet.MessagingV1alpha1().KafkaChannels(desired.Namespace).UpdateStatus(existing) + return new, err +} diff --git a/pkg/apis/duck/v1alpha1/channelable_types.go b/pkg/apis/duck/v1alpha1/channelable_types.go index e8baa8367e6..ec046d5cecc 100644 --- a/pkg/apis/duck/v1alpha1/channelable_types.go +++ b/pkg/apis/duck/v1alpha1/channelable_types.go @@ -20,10 +20,10 @@ import ( "github.com/knative/pkg/apis" "github.com/knative/pkg/apis/duck" "github.com/knative/pkg/apis/duck/v1alpha1" - v1 "k8s.io/api/core/v1" + duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" ) // +genclient @@ -37,9 +37,14 @@ type Channelable struct { metav1.ObjectMeta `json:"metadata,omitempty"` // Spec is the part where the Channelable fulfills the Subscribable contract. - Spec SubscribableSpec `json:"spec"` + Spec ChannelableSpec `json:"spec,omitempty"` - Status ChannelableStatus `json:"status"` + Status ChannelableStatus `json:"status,omitempty"` +} + +// ChannelableSpec contains Spec of the Channelable object +type ChannelableSpec struct { + SubscribableTypeSpec `json:",inline"` } // ChannelableStatus contains the Status of a Channelable object. @@ -47,14 +52,7 @@ type ChannelableStatus struct { // AddressStatus is the part where the Channelable fulfills the Addressable contract. v1alpha1.AddressStatus `json:",inline"` // Subscribers is populated with the statuses of each of the Channelable's subscribers. - Subscribers []Subscriber `json:"subscribers,omitempty"` -} - -// Subscriber contains the status of a Channelable's Subscriber. -type Subscriber struct { - UID types.UID `json:"uid,omitempty"` - Ready v1.ConditionStatus `json:"ready,omitempty"` - Message string `json:"message,omitempty"` + SubscribableTypeStatus `json:",inline"` } var ( @@ -69,10 +67,12 @@ func (c *Channelable) Populate() { // Populate ALL fields Subscribers: []SubscriberSpec{{ UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, SubscriberURI: "call1", ReplyURI: "sink2", }, { UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 2, SubscriberURI: "call2", ReplyURI: "sink2", }}, @@ -81,18 +81,30 @@ func (c *Channelable) Populate() { AddressStatus: v1alpha1.AddressStatus{ Address: &v1alpha1.Addressable{ // Populate ALL fields - Hostname: "this is not empty", + Addressable: duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, + }, + Hostname: "test-domain", + }, + }, + SubscribableTypeStatus: SubscribableTypeStatus{ + SubscribableStatus: &SubscribableStatus{ + Subscribers: []SubscriberStatus{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: corev1.ConditionTrue, + Message: "Some message", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 2, + Ready: corev1.ConditionFalse, + Message: "Some message", + }}, }, }, - Subscribers: []Subscriber{{ - UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", - Ready: "True", - Message: "ready", - }, { - UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", - Ready: "False", - Message: "not ready", - }}, } } diff --git a/pkg/apis/duck/v1alpha1/channelable_types_test.go b/pkg/apis/duck/v1alpha1/channelable_types_test.go index c251a0c3e06..3141b1e74a7 100644 --- a/pkg/apis/duck/v1alpha1/channelable_types_test.go +++ b/pkg/apis/duck/v1alpha1/channelable_types_test.go @@ -19,7 +19,10 @@ package v1alpha1 import ( "testing" + "github.com/knative/pkg/apis" "github.com/knative/pkg/apis/duck/v1alpha1" + duckv1beta1 "github.com/knative/pkg/apis/duck/v1beta1" + corev1 "k8s.io/api/core/v1" "github.com/google/go-cmp/cmp" ) @@ -38,35 +41,51 @@ func TestChannelablePopulate(t *testing.T) { got := &Channelable{} want := &Channelable{ - Spec: SubscribableSpec{ - Subscribable: &Subscribable{ - Subscribers: []SubscriberSpec{{ - UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", - SubscriberURI: "call1", - ReplyURI: "sink2", - }, { - UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", - SubscriberURI: "call2", - ReplyURI: "sink2", - }}, + Spec: ChannelableSpec{ + SubscribableTypeSpec: SubscribableTypeSpec{ + Subscribable: &Subscribable{ + Subscribers: []SubscriberSpec{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, + SubscriberURI: "call1", + ReplyURI: "sink2", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 2, + SubscriberURI: "call2", + ReplyURI: "sink2", + }}, + }, }, }, Status: ChannelableStatus{ AddressStatus: v1alpha1.AddressStatus{ Address: &v1alpha1.Addressable{ // Populate ALL fields - Hostname: "this is not empty", + Addressable: duckv1beta1.Addressable{ + URL: &apis.URL{ + Scheme: "http", + Host: "test-domain", + }, + }, + Hostname: "test-domain", + }, + }, + SubscribableTypeStatus: SubscribableTypeStatus{ + SubscribableStatus: &SubscribableStatus{ + Subscribers: []SubscriberStatus{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: corev1.ConditionTrue, + Message: "Some message", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 2, + Ready: corev1.ConditionFalse, + Message: "Some message", + }}, }, }, - Subscribers: []Subscriber{{ - UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", - Ready: "True", - Message: "ready", - }, { - UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", - Ready: "False", - Message: "not ready", - }}, }, } diff --git a/pkg/apis/duck/v1alpha1/subscribable_types.go b/pkg/apis/duck/v1alpha1/subscribable_types.go index 0cebce702ad..9c45972ea62 100644 --- a/pkg/apis/duck/v1alpha1/subscribable_types.go +++ b/pkg/apis/duck/v1alpha1/subscribable_types.go @@ -49,12 +49,40 @@ type SubscriberSpec struct { // UID is used to understand the origin of the subscriber. // +optional UID types.UID `json:"uid,omitempty"` + // Generation of the origin of the subscriber with uid:UID. + // +optional + Generation int64 `json:"generation,omitempty"` // +optional SubscriberURI string `json:"subscriberURI,omitempty"` // +optional ReplyURI string `json:"replyURI,omitempty"` } +// SubscribableStatus is the schema for the subscribable's status portion of the status +// section of the resource. +type SubscribableStatus struct { + // This is the list of subscription's statuses for this channel. + // +patchMergeKey=uid + // +patchStrategy=merge + Subscribers []SubscriberStatus `json:"subscribers,omitempty" patchStrategy:"merge" patchMergeKey:"uid"` +} + +// SubscriberStatus defines the status of a single subscriber to a Channel. +type SubscriberStatus struct { + // UID is used to understand the origin of the subscriber. + // +optional + UID types.UID `json:"uid,omitempty"` + // Generation of the origin of the subscriber with uid:UID. + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + // Status of the subscriber. + // +optional + Ready corev1.ConditionStatus `json:"ready,omitempty"` + // A human readable message indicating details of Ready status. + // +optional + Message string `json:"message,omitempty"` +} + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -65,16 +93,25 @@ type SubscribableType struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - // SubscribableSpec is the part where Subscribable object is + // SubscribableTypeSpec is the part where Subscribable object is // configured as to be compatible with Subscribable contract. - Spec SubscribableSpec `json:"spec"` + Spec SubscribableTypeSpec `json:"spec"` + + // SubscribableTypeStatus is the part where SubscribableStatus object is + // configured as to be compatible with Subscribable contract. + Status SubscribableTypeStatus `json:"status"` } -// SubscribableSpec shows how we expect folks to embed Subscribable in their Spec field. -type SubscribableSpec struct { +// SubscribableTypeSpec shows how we expect folks to embed Subscribable in their Spec field. +type SubscribableTypeSpec struct { Subscribable *Subscribable `json:"subscribable,omitempty"` } +// SubscribableTypeStatus shows how we expect folks to embed Subscribable in their Status field. +type SubscribableTypeStatus struct { + SubscribableStatus *SubscribableStatus `json:"subscribablestatus,omitempty"` +} + var ( // Verify SubscribableType resources meet duck contracts. _ duck.Populatable = (*SubscribableType)(nil) @@ -92,14 +129,30 @@ func (c *SubscribableType) Populate() { // Populate ALL fields Subscribers: []SubscriberSpec{{ UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, SubscriberURI: "call1", ReplyURI: "sink2", }, { UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 2, SubscriberURI: "call2", ReplyURI: "sink2", }}, } + c.Status.SubscribableStatus = &SubscribableStatus{ + // Populate ALL fields + Subscribers: []SubscriberStatus{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: corev1.ConditionTrue, + Message: "Some message", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 2, + Ready: corev1.ConditionFalse, + Message: "Some message", + }}, + } } // GetListType implements apis.Listable diff --git a/pkg/apis/duck/v1alpha1/subscribable_types_test.go b/pkg/apis/duck/v1alpha1/subscribable_types_test.go index 4c6ec39a8f8..edbfc2fb012 100644 --- a/pkg/apis/duck/v1alpha1/subscribable_types_test.go +++ b/pkg/apis/duck/v1alpha1/subscribable_types_test.go @@ -20,6 +20,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" ) func TestSubscribableGetFullType(t *testing.T) { @@ -46,19 +47,37 @@ func TestSubscribablePopulate(t *testing.T) { got := &SubscribableType{} want := &SubscribableType{ - Spec: SubscribableSpec{ + Spec: SubscribableTypeSpec{ Subscribable: &Subscribable{ Subscribers: []SubscriberSpec{{ UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 1, SubscriberURI: "call1", ReplyURI: "sink2", }, { UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + Generation: 2, SubscriberURI: "call2", ReplyURI: "sink2", }}, }, }, + Status: SubscribableTypeStatus{ + SubscribableStatus: &SubscribableStatus{ + // Populate ALL fields + Subscribers: []SubscriberStatus{{ + UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 1, + Ready: corev1.ConditionTrue, + Message: "Some message", + }, { + UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1", + ObservedGeneration: 2, + Ready: corev1.ConditionFalse, + Message: "Some message", + }}, + }, + }, } got.Populate() diff --git a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go index c8b78275b22..120fb29dbe5 100644 --- a/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/duck/v1alpha1/zz_generated.deepcopy.go @@ -86,15 +86,28 @@ func (in *ChannelableList) DeepCopyObject() runtime.Object { return nil } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ChannelableSpec) DeepCopyInto(out *ChannelableSpec) { + *out = *in + in.SubscribableTypeSpec.DeepCopyInto(&out.SubscribableTypeSpec) + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ChannelableSpec. +func (in *ChannelableSpec) DeepCopy() *ChannelableSpec { + if in == nil { + return nil + } + out := new(ChannelableSpec) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ChannelableStatus) DeepCopyInto(out *ChannelableStatus) { *out = *in in.AddressStatus.DeepCopyInto(&out.AddressStatus) - if in.Subscribers != nil { - in, out := &in.Subscribers, &out.Subscribers - *out = make([]Subscriber, len(*in)) - copy(*out, *in) - } + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } @@ -132,22 +145,22 @@ func (in *Subscribable) DeepCopy() *Subscribable { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *SubscribableSpec) DeepCopyInto(out *SubscribableSpec) { +func (in *SubscribableStatus) DeepCopyInto(out *SubscribableStatus) { *out = *in - if in.Subscribable != nil { - in, out := &in.Subscribable, &out.Subscribable - *out = new(Subscribable) - (*in).DeepCopyInto(*out) + if in.Subscribers != nil { + in, out := &in.Subscribers, &out.Subscribers + *out = make([]SubscriberStatus, len(*in)) + copy(*out, *in) } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscribableSpec. -func (in *SubscribableSpec) DeepCopy() *SubscribableSpec { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscribableStatus. +func (in *SubscribableStatus) DeepCopy() *SubscribableStatus { if in == nil { return nil } - out := new(SubscribableSpec) + out := new(SubscribableStatus) in.DeepCopyInto(out) return out } @@ -158,6 +171,7 @@ func (in *SubscribableType) DeepCopyInto(out *SubscribableType) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) return } @@ -213,17 +227,43 @@ func (in *SubscribableTypeList) DeepCopyObject() runtime.Object { } // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *Subscriber) DeepCopyInto(out *Subscriber) { +func (in *SubscribableTypeSpec) DeepCopyInto(out *SubscribableTypeSpec) { *out = *in + if in.Subscribable != nil { + in, out := &in.Subscribable, &out.Subscribable + *out = new(Subscribable) + (*in).DeepCopyInto(*out) + } return } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Subscriber. -func (in *Subscriber) DeepCopy() *Subscriber { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscribableTypeSpec. +func (in *SubscribableTypeSpec) DeepCopy() *SubscribableTypeSpec { if in == nil { return nil } - out := new(Subscriber) + out := new(SubscribableTypeSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscribableTypeStatus) DeepCopyInto(out *SubscribableTypeStatus) { + *out = *in + if in.SubscribableStatus != nil { + in, out := &in.SubscribableStatus, &out.SubscribableStatus + *out = new(SubscribableStatus) + (*in).DeepCopyInto(*out) + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscribableTypeStatus. +func (in *SubscribableTypeStatus) DeepCopy() *SubscribableTypeStatus { + if in == nil { + return nil + } + out := new(SubscribableTypeStatus) in.DeepCopyInto(out) return out } @@ -248,3 +288,19 @@ func (in *SubscriberSpec) DeepCopy() *SubscriberSpec { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *SubscriberStatus) DeepCopyInto(out *SubscriberStatus) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SubscriberStatus. +func (in *SubscriberStatus) DeepCopy() *SubscriberStatus { + if in == nil { + return nil + } + out := new(SubscriberStatus) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/apis/eventing/v1alpha1/channel_types.go b/pkg/apis/eventing/v1alpha1/channel_types.go index 218d87f3ac4..515a186f39a 100644 --- a/pkg/apis/eventing/v1alpha1/channel_types.go +++ b/pkg/apis/eventing/v1alpha1/channel_types.go @@ -102,6 +102,8 @@ type ChannelStatus struct { // Internal is status unique to each ClusterChannelProvisioner. // +optional Internal *runtime.RawExtension `json:"internal,omitempty"` + + eventingduck.SubscribableTypeStatus `json:",inline"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/eventing/v1alpha1/subscription_lifecycle.go b/pkg/apis/eventing/v1alpha1/subscription_lifecycle.go index 0ac8dff2c6d..ed1d5f0abad 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_lifecycle.go +++ b/pkg/apis/eventing/v1alpha1/subscription_lifecycle.go @@ -22,7 +22,7 @@ import ( // subCondSet is a condition set with Ready as the happy condition and // ReferencesResolved and ChannelReady as the dependent conditions. -var subCondSet = apis.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionChannelReady) +var subCondSet = apis.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionAddedToChannel, SubscriptionConditionChannelReady) const ( // SubscriptionConditionReady has status True when all subconditions below have been set to True. @@ -31,8 +31,11 @@ const ( // resolved. SubscriptionConditionReferencesResolved apis.ConditionType = "Resolved" - // SubscriptionConditionChannelReady has status True when controller has successfully added a + // SubscriptionConditionAddedToChannel has status True when controller has successfully added a // subscription to the spec.channel resource. + SubscriptionConditionAddedToChannel apis.ConditionType = "AddedToChannel" + + // SubscriptionConditionChannelReady has status True when the channel has marked the subscriber as 'ready' SubscriptionConditionChannelReady apis.ConditionType = "ChannelReady" ) @@ -46,6 +49,16 @@ func (ss *SubscriptionStatus) IsReady() bool { return subCondSet.Manage(ss).IsHappy() } +// IsAddedToChannel returns true if SubscriptionConditionAddedToChannel is true +func (ss *SubscriptionStatus) IsAddedToChannel() bool { + return ss.GetCondition(SubscriptionConditionAddedToChannel).IsTrue() +} + +// AreReferencesResolved returns true if SubscriptionConditionReferencesResolved is true +func (ss *SubscriptionStatus) AreReferencesResolved() bool { + return ss.GetCondition(SubscriptionConditionReferencesResolved).IsTrue() +} + // InitializeConditions sets relevant unset conditions to Unknown state. func (ss *SubscriptionStatus) InitializeConditions() { subCondSet.Manage(ss).InitializeConditions() @@ -61,6 +74,11 @@ func (ss *SubscriptionStatus) MarkChannelReady() { subCondSet.Manage(ss).MarkTrue(SubscriptionConditionChannelReady) } +// MarkAddedToChannel sets the AddedToChannel condition to True state. +func (ss *SubscriptionStatus) MarkAddedToChannel() { + subCondSet.Manage(ss).MarkTrue(SubscriptionConditionAddedToChannel) +} + // MarkReferencesNotResolved sets the ReferencesResolved condition to False state. func (ss *SubscriptionStatus) MarkReferencesNotResolved(reason, messageFormat string, messageA ...interface{}) { subCondSet.Manage(ss).MarkFalse(SubscriptionConditionReferencesResolved, reason, messageFormat, messageA...) @@ -70,3 +88,8 @@ func (ss *SubscriptionStatus) MarkReferencesNotResolved(reason, messageFormat st func (ss *SubscriptionStatus) MarkChannelNotReady(reason, messageFormat string, messageA ...interface{}) { subCondSet.Manage(ss).MarkFalse(SubscriptionConditionChannelReady, reason, messageFormat, messageA) } + +// MarkNotAddedToChannel sets the AddedToChannel condition to False state. +func (ss *SubscriptionStatus) MarkNotAddedToChannel(reason, messageFormat string, messageA ...interface{}) { + subCondSet.Manage(ss).MarkFalse(SubscriptionConditionAddedToChannel, reason, messageFormat, messageA) +} diff --git a/pkg/apis/eventing/v1alpha1/subscription_lifecycle_test.go b/pkg/apis/eventing/v1alpha1/subscription_lifecycle_test.go index 7da9f8d9d2c..4de2dad88e4 100644 --- a/pkg/apis/eventing/v1alpha1/subscription_lifecycle_test.go +++ b/pkg/apis/eventing/v1alpha1/subscription_lifecycle_test.go @@ -120,6 +120,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) { want: &SubscriptionStatus{ Status: duckv1beta1.Status{ Conditions: []apis.Condition{{ + Type: SubscriptionConditionAddedToChannel, + Status: corev1.ConditionUnknown, + }, { Type: SubscriptionConditionChannelReady, Status: corev1.ConditionUnknown, }, { @@ -144,6 +147,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) { want: &SubscriptionStatus{ Status: duckv1beta1.Status{ Conditions: []apis.Condition{{ + Type: SubscriptionConditionAddedToChannel, + Status: corev1.ConditionUnknown, + }, { Type: SubscriptionConditionChannelReady, Status: corev1.ConditionFalse, }, { @@ -168,6 +174,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) { want: &SubscriptionStatus{ Status: duckv1beta1.Status{ Conditions: []apis.Condition{{ + Type: SubscriptionConditionAddedToChannel, + Status: corev1.ConditionUnknown, + }, { Type: SubscriptionConditionChannelReady, Status: corev1.ConditionUnknown, }, { @@ -193,30 +202,46 @@ func TestSubscriptionInitializeConditions(t *testing.T) { func TestSubscriptionIsReady(t *testing.T) { tests := []struct { - name string - markResolved bool - markChannelReady bool - wantReady bool + name string + markResolved bool + markChannelReady bool + wantReady bool + markAddedToChannel bool }{{ - name: "all happy", - markResolved: true, - markChannelReady: true, - wantReady: true, + name: "all happy", + markResolved: true, + markChannelReady: true, + markAddedToChannel: true, + wantReady: true, }, { - name: "one sad", - markResolved: false, - markChannelReady: true, - wantReady: false, + name: "one sad - markResolved", + markResolved: false, + markChannelReady: true, + markAddedToChannel: true, + wantReady: false, + }, { + name: "one sad - markChannelReady", + markResolved: true, + markChannelReady: false, + markAddedToChannel: true, + wantReady: false, + }, { + name: "one sad - markAddedToChannel", + markResolved: true, + markChannelReady: true, + markAddedToChannel: false, + wantReady: false, }, { name: "other sad", markResolved: true, markChannelReady: false, wantReady: false, }, { - name: "both sad", - markResolved: false, - markChannelReady: false, - wantReady: false, + name: "all sad", + markResolved: false, + markChannelReady: false, + markAddedToChannel: false, + wantReady: false, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -227,6 +252,9 @@ func TestSubscriptionIsReady(t *testing.T) { if test.markChannelReady { ss.MarkChannelReady() } + if test.markAddedToChannel { + ss.MarkAddedToChannel() + } got := ss.IsReady() if test.wantReady != got { t.Errorf("unexpected readiness: want %v, got %v", test.wantReady, got) diff --git a/pkg/apis/eventing/v1alpha1/test_helper.go b/pkg/apis/eventing/v1alpha1/test_helper.go index 6681cb45cc2..8a708979213 100644 --- a/pkg/apis/eventing/v1alpha1/test_helper.go +++ b/pkg/apis/eventing/v1alpha1/test_helper.go @@ -45,6 +45,7 @@ func (testHelper) ReadySubscriptionStatus() *SubscriptionStatus { ss := &SubscriptionStatus{} ss.MarkChannelReady() ss.MarkReferencesResolved() + ss.MarkAddedToChannel() return ss } diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index f325a152853..fe6dbb9bd2a 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -228,6 +228,7 @@ func (in *ChannelStatus) DeepCopyInto(out *ChannelStatus) { *out = new(runtime.RawExtension) (*in).DeepCopyInto(*out) } + in.SubscribableTypeStatus.DeepCopyInto(&out.SubscribableTypeStatus) return } diff --git a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go index af1e9d6224e..6d6638c996b 100644 --- a/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go +++ b/pkg/apis/messaging/v1alpha1/in_memory_channel_types.go @@ -76,7 +76,7 @@ type InMemoryChannelStatus struct { // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object -// ChannelList is a collection of Channels. +// InMemoryChannelList is a collection of in-memory channels. type InMemoryChannelList struct { metav1.TypeMeta `json:",inline"` // +optional diff --git a/pkg/duck/subscriber.go b/pkg/duck/subscriber.go index 7f0acbeeb7f..8be3fb8ccbb 100644 --- a/pkg/duck/subscriber.go +++ b/pkg/duck/subscriber.go @@ -22,6 +22,8 @@ import ( "fmt" "net/url" + "k8s.io/apimachinery/pkg/runtime/schema" + "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/logging" "github.com/knative/eventing/pkg/reconciler/names" @@ -46,8 +48,8 @@ func DomainToURL(domain string) string { } // ResourceInterface creates a resource interface for the given ObjectReference. -func ResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (dynamic.ResourceInterface, error) { - rc := dynamicClient.Resource(duckapis.KindToResource(ref.GroupVersionKind())) +func ResourceInterface(dynamicClient dynamic.Interface, namespace string, gvk schema.GroupVersionKind) (dynamic.ResourceInterface, error) { + rc := dynamicClient.Resource(duckapis.KindToResource(gvk)) if rc == nil { return nil, fmt.Errorf("failed to create dynamic client resource") @@ -57,7 +59,7 @@ func ResourceInterface(dynamicClient dynamic.Interface, namespace string, ref *c // ObjectReference resolves an object based on an ObjectReference. func ObjectReference(ctx context.Context, dynamicClient dynamic.Interface, namespace string, ref *corev1.ObjectReference) (duck.Marshalable, error) { - resourceClient, err := ResourceInterface(dynamicClient, namespace, ref) + resourceClient, err := ResourceInterface(dynamicClient, namespace, ref.GroupVersionKind()) if err != nil { logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return nil, err diff --git a/pkg/duck/subscriber_test.go b/pkg/duck/subscriber_test.go index 5bbd136f03e..1b6f37c2a22 100644 --- a/pkg/duck/subscriber_test.go +++ b/pkg/duck/subscriber_test.go @@ -60,7 +60,7 @@ func TestDomainToURL(t *testing.T) { } func TestResourceInterface_BadDynamicInterface(t *testing.T) { - actual, err := ResourceInterface(&badDynamicInterface{}, testNS, &corev1.ObjectReference{}) + actual, err := ResourceInterface(&badDynamicInterface{}, testNS, schema.GroupVersionKind{}) if err.Error() != "failed to create dynamic client resource" { t.Fatalf("Unexpected error '%v'", err) } diff --git a/pkg/reconciler/subscription/subscription.go b/pkg/reconciler/subscription/subscription.go index 0529aa4be9a..da66ad796ad 100644 --- a/pkg/reconciler/subscription/subscription.go +++ b/pkg/reconciler/subscription/subscription.go @@ -23,8 +23,6 @@ import ( "reflect" "time" - "k8s.io/apimachinery/pkg/api/meta" - eventingduckv1alpha1 "github.com/knative/eventing/pkg/apis/duck/v1alpha1" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" listers "github.com/knative/eventing/pkg/client/listers/eventing/v1alpha1" @@ -41,7 +39,7 @@ import ( apiextensionslisters "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - apierrs "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -53,13 +51,14 @@ const ( finalizerName = controllerAgentName // Name of the corev1.Events emitted from the reconciliation process - subscriptionReconciled = "SubscriptionReconciled" - subscriptionReadinessChanged = "SubscriptionReadinessChanged" - subscriptionUpdateStatusFailed = "SubscriptionUpdateStatusFailed" - physicalChannelSyncFailed = "PhysicalChannelSyncFailed" - channelReferenceFailed = "ChannelReferenceFailed" - subscriberResolveFailed = "SubscriberResolveFailed" - replyResolveFailed = "ReplyResolveFailed" + subscriptionReconciled = "SubscriptionReconciled" + subscriptionReadinessChanged = "SubscriptionReadinessChanged" + subscriptionUpdateStatusFailed = "SubscriptionUpdateStatusFailed" + physicalChannelSyncFailed = "PhysicalChannelSyncFailed" + subscriptionNotMarkedReadyByChannel = "SubscriptionNotMarkedReadyByChannel" + channelReferenceFailed = "ChannelReferenceFailed" + subscriberResolveFailed = "SubscriberResolveFailed" + replyResolveFailed = "ReplyResolveFailed" // Label to specify valid subscribable channel CRDs. channelCrdLabelKey = "messaging.knative.dev/subscribable" @@ -94,7 +93,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, key string) error { // Get the Subscription resource with this namespace/name original, err := r.subscriptionLister.Subscriptions(namespace).Get(name) - if apierrs.IsNotFound(err) { + if errors.IsNotFound(err) { // The resource may no longer exist, in which case we stop processing. logging.FromContext(ctx).Error("subscription key in work queue no longer exists") return nil @@ -132,12 +131,30 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc if err := subscription.Validate(ctx); err != nil { return err } - // See if the subscription has been deleted + + // Track the channel using the addressableInformer. + // We don't need to explicitly set a channelInformer, as this will dynamically generate one for us. + // This code needs to be called before checking the existence of the `channel`, in order to make sure the + // subscription controller will reconcile upon a `channel` change. + track := r.addressableInformer.TrackInNamespace(r.tracker, subscription) + if err := track(subscription.Spec.Channel); err != nil { + logging.FromContext(ctx).Error("Unable to track changes to spec.channel", zap.Error(err)) + return err + } if subscription.DeletionTimestamp != nil { + // If the subscription is Ready, then we have to remove it // from the channel's subscriber list. - if subscription.Status.IsReady() { - err := r.syncPhysicalChannel(ctx, subscription, true) + if channel, err := r.getChannelable(ctx, subscription.Namespace, &subscription.Spec.Channel); !errors.IsNotFound(err) && subscription.Status.IsAddedToChannel() { + if err != nil { + logging.FromContext(ctx).Warn("Failed to get Spec.Channel as Channelable duck type", + zap.Error(err), + zap.Any("channel", subscription.Spec.Channel)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) + subscription.Status.MarkReferencesNotResolved(channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) + return err + } + err := r.syncPhysicalChannel(ctx, subscription, channel, true) if err != nil { logging.FromContext(ctx).Warn("Failed to sync physical from Channel", zap.Error(err)) r.Recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) @@ -148,18 +165,16 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc _, err := r.EventingClientSet.EventingV1alpha1().Subscriptions(subscription.Namespace).Update(subscription) return err } - - // Track the channel using the addressableInformer. - // We don't need the explicitly set a channelInformer, as this will dynamically generate one for us. - // This code needs to be called before checking the existence of the `channel`, in order to make sure the - // subscription controller will reconcile upon a `channel` change. - track := r.addressableInformer.TrackInNamespace(r.tracker, subscription) - if err := track(subscription.Spec.Channel); err != nil { - logging.FromContext(ctx).Error("Unable to track changes to spec.channel", zap.Error(err)) + channel, err := r.getChannelable(ctx, subscription.Namespace, &subscription.Spec.Channel) + if err != nil { + logging.FromContext(ctx).Warn("Failed to get Spec.Channel as Channelable duck type", + zap.Error(err), + zap.Any("channel", subscription.Spec.Channel)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) + subscription.Status.MarkReferencesNotResolved(channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) return err } - - if err := r.validateChannel(ctx, subscription); err != nil { + if err := r.validateChannel(ctx, channel); err != nil { logging.FromContext(ctx).Warn("Failed to validate Channel", zap.Error(err), zap.Any("channel", subscription.Spec.Channel)) @@ -168,8 +183,6 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return err } - // Verify that `channel` exists. - subscriberURI, err := eventingduck.SubscriberSpec(ctx, r.DynamicClientSet, subscription.Namespace, subscription.Spec.Subscriber, track) if err != nil { logging.FromContext(ctx).Warn("Failed to resolve Subscriber", @@ -203,27 +216,95 @@ func (r *Reconciler) reconcile(ctx context.Context, subscription *v1alpha1.Subsc return err } - // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile - // the Channel with this information. - if err := r.syncPhysicalChannel(ctx, subscription, false); err != nil { - logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err)) - r.Recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) - subscription.Status.MarkChannelNotReady(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) - return err + // Check if the subscription needs to be added to the channel + if !r.subPresentInChannelSpec(subscription, channel) { + // Ok, now that we have the Channel and at least one of the Call/Result, let's reconcile + // the Channel with this information. + if err := r.syncPhysicalChannel(ctx, subscription, channel, false); err != nil { + logging.FromContext(ctx).Warn("Failed to sync physical Channel", zap.Error(err)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) + subscription.Status.MarkNotAddedToChannel(physicalChannelSyncFailed, "Failed to sync physical Channel: %v", err) + return err + } + } + subscription.Status.MarkAddedToChannel() + + // Check if the subscription is marked as ready by channel. + // Refresh subscribableChan to avoid another reconile loop. + // If it doesn't get refreshed, then next reconcile loop will get the updated channel + // Skip this if older channel of kind:channel and apiversion:eventing.knative.dev/v1alpha1 + if !deprecatedChannel(channel) { + channel, err = r.getChannelable(ctx, subscription.Namespace, &subscription.Spec.Channel) + if err != nil { + logging.FromContext(ctx).Warn("Failed to get Spec.Channel as Channelable duck type", + zap.Error(err), + zap.Any("channel", subscription.Spec.Channel)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) + subscription.Status.MarkChannelNotReady(channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. %s", err) + return err + } + if err := r.subMarkedReadyByChannel(subscription, channel); err != nil { + logging.FromContext(ctx).Warn("Subscription not marked by Channel as Ready.", zap.Error(err)) + r.Recorder.Eventf(subscription, corev1.EventTypeWarning, subscriptionNotMarkedReadyByChannel, err.Error()) + subscription.Status.MarkChannelNotReady(subscriptionNotMarkedReadyByChannel, "Subscription not marked by Channel as Ready: %s", err) + return err + } } - // Everything went well, set the fact that subscriptions have been modified subscription.Status.MarkChannelReady() return nil } -func (r *Reconciler) validateChannel(ctx context.Context, subscription *v1alpha1.Subscription) error { - // Verify the channel exists. - channel := &subscription.Spec.Channel - if _, err := eventingduck.ObjectReference(ctx, r.DynamicClientSet, subscription.Namespace, channel); err != nil { - return err +func (r Reconciler) subMarkedReadyByChannel(subscription *v1alpha1.Subscription, channel *eventingduckv1alpha1.Channelable) error { + if channel.Status.SubscribableStatus == nil { + return fmt.Errorf("channel.Status.SubscribableStatus is nil") } - // Special case for backwards compatibility, channel COs are valid channels. + for _, sub := range channel.Status.SubscribableStatus.Subscribers { + if sub.UID == subscription.GetUID() && + sub.ObservedGeneration == subscription.GetGeneration() { + if sub.Ready == corev1.ConditionTrue { + return nil + } + return fmt.Errorf(sub.Message) + } + } + return fmt.Errorf("subscription %q not present in channel %q subscriber's list", subscription.Name, channel.Name) +} +func (r Reconciler) subPresentInChannelSpec(subscription *v1alpha1.Subscription, channel *eventingduckv1alpha1.Channelable) bool { + if channel.Spec.Subscribable == nil { + return false + } + for _, sub := range channel.Spec.Subscribable.Subscribers { + if sub.UID == subscription.GetUID() && sub.Generation == subscription.GetGeneration() { + return true + } + } + return false +} + +// Todo: this needs to be changed to use cache rather than a API call each time +func (r *Reconciler) getChannelable(ctx context.Context, namespace string, chanReference *corev1.ObjectReference) (*eventingduckv1alpha1.Channelable, error) { + s, err := eventingduck.ObjectReference(ctx, r.DynamicClientSet, namespace, chanReference) + if err != nil { + return nil, err + } + channel := eventingduckv1alpha1.Channelable{} + err = duck.FromUnstructured(s, &channel) + if err != nil { + return nil, err + } + return &channel, nil +} + +func deprecatedChannel(channel *eventingduckv1alpha1.Channelable) bool { if channel.Kind == "Channel" && channel.APIVersion == "eventing.knative.dev/v1alpha1" { + return true + } + return false +} + +func (r *Reconciler) validateChannel(ctx context.Context, channel *eventingduckv1alpha1.Channelable) error { + // Special case for backwards compatibility, channel COs are valid channels. + if deprecatedChannel(channel) { return nil } @@ -285,7 +366,7 @@ func (r *Reconciler) updateStatus(ctx context.Context, desired *v1alpha1.Subscri return sub, err } -func (c *Reconciler) ensureFinalizer(sub *v1alpha1.Subscription) error { +func (r *Reconciler) ensureFinalizer(sub *v1alpha1.Subscription) error { finalizers := sets.NewString(sub.Finalizers...) if finalizers.Has(finalizerName) { return nil @@ -303,7 +384,7 @@ func (c *Reconciler) ensureFinalizer(sub *v1alpha1.Subscription) error { return err } - _, err = c.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Patch(sub.Name, types.MergePatchType, patch) + _, err = r.EventingClientSet.EventingV1alpha1().Subscriptions(sub.Namespace).Patch(sub.Name, types.MergePatchType, patch) return err } @@ -340,7 +421,7 @@ func (r *Reconciler) resolveResult(ctx context.Context, namespace string, replyS return "", fmt.Errorf("reply.status does not contain address") } -func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, isDeleted bool) error { +func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subscription, channel *eventingduckv1alpha1.Channelable, isDeleted bool) error { logging.FromContext(ctx).Debug("Reconciling physical from Channel", zap.Any("sub", sub)) subs, err := r.listAllSubscriptionsWithPhysicalChannel(ctx, sub) @@ -357,7 +438,7 @@ func (r *Reconciler) syncPhysicalChannel(ctx context.Context, sub *v1alpha1.Subs } subscribable := r.createSubscribable(subs) - if patchErr := r.patchPhysicalFrom(ctx, sub.Namespace, sub.Spec.Channel, subscribable); patchErr != nil { + if patchErr := r.patchPhysicalFrom(ctx, sub.Namespace, channel, subscribable); patchErr != nil { if isDeleted && errors.IsNotFound(patchErr) { logging.FromContext(ctx).Warn("Could not find Channel", zap.Any("channel", sub.Spec.Channel)) return nil @@ -389,7 +470,7 @@ func (r *Reconciler) listAllSubscriptionsWithPhysicalChannel(ctx context.Context func (r *Reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingduckv1alpha1.Subscribable { rv := &eventingduckv1alpha1.Subscribable{} for _, sub := range subs { - if sub.Status.PhysicalSubscription.SubscriberURI != "" || sub.Status.PhysicalSubscription.ReplyURI != "" { + if sub.Status.AreReferencesResolved() && sub.DeletionTimestamp == nil { rv.Subscribers = append(rv.Subscribers, eventingduckv1alpha1.SubscriberSpec{ DeprecatedRef: &corev1.ObjectReference{ APIVersion: sub.APIVersion, @@ -399,6 +480,7 @@ func (r *Reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd UID: sub.UID, }, UID: sub.UID, + Generation: sub.Generation, SubscriberURI: sub.Status.PhysicalSubscription.SubscriberURI, ReplyURI: sub.Status.PhysicalSubscription.ReplyURI, }) @@ -407,32 +489,22 @@ func (r *Reconciler) createSubscribable(subs []v1alpha1.Subscription) *eventingd return rv } -func (r *Reconciler) patchPhysicalFrom(ctx context.Context, namespace string, physicalFrom corev1.ObjectReference, subs *eventingduckv1alpha1.Subscribable) error { - // First get the original object and convert it to only the bits we care about - s, err := eventingduck.ObjectReference(ctx, r.DynamicClientSet, namespace, &physicalFrom) - if err != nil { - return err - } - original := eventingduckv1alpha1.SubscribableType{} - err = duck.FromUnstructured(s, &original) - if err != nil { - return err - } - - after := original.DeepCopy() +func (r *Reconciler) patchPhysicalFrom(ctx context.Context, namespace string, origChannel *eventingduckv1alpha1.Channelable, subs *eventingduckv1alpha1.Subscribable) error { + after := origChannel.DeepCopy() after.Spec.Subscribable = subs - patch, err := duck.CreateMergePatch(original, after) + patch, err := duck.CreateMergePatch(origChannel, after) + if err != nil { return err } - resourceClient, err := eventingduck.ResourceInterface(r.DynamicClientSet, namespace, &physicalFrom) + resourceClient, err := eventingduck.ResourceInterface(r.DynamicClientSet, namespace, origChannel.GroupVersionKind()) if err != nil { logging.FromContext(ctx).Warn("Failed to create dynamic resource client", zap.Error(err)) return err } - patched, err := resourceClient.Patch(original.Name, types.MergePatchType, patch, metav1.UpdateOptions{}) + patched, err := resourceClient.Patch(origChannel.GetName(), types.MergePatchType, patch, metav1.UpdateOptions{}) if err != nil { logging.FromContext(ctx).Warn("Failed to patch the Channel", zap.Error(err), zap.Any("patch", patch)) return err diff --git a/pkg/reconciler/subscription/subscription_test.go b/pkg/reconciler/subscription/subscription_test.go index 42344349dac..d780255c1be 100644 --- a/pkg/reconciler/subscription/subscription_test.go +++ b/pkg/reconciler/subscription/subscription_test.go @@ -47,9 +47,10 @@ const ( channelName = "origin" serviceName = "service" - subscriptionUID = subscriptionName + "-abc-123" - subscriptionName = "testsubscription" - testNS = "testnamespace" + subscriptionUID = subscriptionName + "-abc-123" + subscriptionName = "testsubscription" + testNS = "testnamespace" + subscriptionGeneration = 1 ) // subscriptions have: channel -> SUB -> subscriber -viaSub-> reply @@ -139,7 +140,7 @@ func TestAllCases(t *testing.T) { Key: testNS + "/" + subscriptionName, WantErr: true, WantEvents: []string{ - Eventf(corev1.EventTypeWarning, channelReferenceFailed, "Failed to validate spec.channel: channels.eventing.knative.dev %q not found", channelName), + Eventf(corev1.EventTypeWarning, channelReferenceFailed, "Failed to get Spec.Channel as Channelable duck type. channels.eventing.knative.dev %q not found", channelName), }, WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, @@ -148,7 +149,7 @@ func TestAllCases(t *testing.T) { WithSubscriptionSubscriberRef(subscriberGVK, subscriberName), // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, - WithSubscriptionReferencesNotResolved(channelReferenceFailed, fmt.Sprintf("Failed to validate spec.channel: channels.eventing.knative.dev %q not found", channelName)), + WithSubscriptionReferencesNotResolved(channelReferenceFailed, fmt.Sprintf("Failed to get Spec.Channel as Channelable duck type. channels.eventing.knative.dev %q not found", channelName)), ), }}, }, { @@ -399,6 +400,7 @@ func TestAllCases(t *testing.T) { // The first reconciliation will initialize the status conditions. WithInitSubscriptionConditions, MarkSubscriptionReady, + WithSubscriptionPhysicalSubscriptionSubscriber(subscriberURI), ), }}, @@ -499,6 +501,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), + WithSubscriptionGeneration(subscriptionGeneration), WithSubscriptionChannel(channelGVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName), WithInitSubscriptionConditions, @@ -525,6 +528,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), + WithSubscriptionGeneration(subscriptionGeneration), WithSubscriptionChannel(channelGVK, channelName), WithSubscriptionSubscriberRef(subscriberGVK, subscriberName), WithInitSubscriptionConditions, @@ -534,7 +538,7 @@ func TestAllCases(t *testing.T) { }}, WantPatches: []clientgotesting.PatchActionImpl{ patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, SubscriberURI: subscriberURI, DeprecatedRef: &corev1.ObjectReference{Name: subscriptionName, Namespace: testNS, UID: subscriptionUID}}, + {UID: subscriptionUID, Generation: subscriptionGeneration, SubscriberURI: subscriberURI, DeprecatedRef: &corev1.ObjectReference{Name: subscriptionName, Namespace: testNS, UID: subscriptionUID}}, }), patchFinalizers(testNS, subscriptionName), }, @@ -543,6 +547,7 @@ func TestAllCases(t *testing.T) { Objects: []runtime.Object{ NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), + WithSubscriptionGeneration(subscriptionGeneration), WithSubscriptionChannel(channelGVK, channelName), WithInitSubscriptionConditions, WithSubscriptionReply(channelGVK, replyName), @@ -570,6 +575,7 @@ func TestAllCases(t *testing.T) { WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ Object: NewSubscription(subscriptionName, testNS, WithSubscriptionUID(subscriptionUID), + WithSubscriptionGeneration(subscriptionGeneration), WithSubscriptionChannel(channelGVK, channelName), WithSubscriptionReply(channelGVK, replyName), WithInitSubscriptionConditions, @@ -579,7 +585,7 @@ func TestAllCases(t *testing.T) { }}, WantPatches: []clientgotesting.PatchActionImpl{ patchSubscribers(testNS, channelName, []eventingduck.SubscriberSpec{ - {UID: subscriptionUID, ReplyURI: replyURI, DeprecatedRef: &corev1.ObjectReference{Name: subscriptionName, Namespace: testNS, UID: subscriptionUID}}, + {UID: subscriptionUID, Generation: subscriptionGeneration, ReplyURI: replyURI, DeprecatedRef: &corev1.ObjectReference{Name: subscriptionName, Namespace: testNS, UID: subscriptionUID}}, }), patchFinalizers(testNS, subscriptionName), }, diff --git a/pkg/reconciler/testing/subscription.go b/pkg/reconciler/testing/subscription.go index 4857e8dfc44..a7be40a5e89 100644 --- a/pkg/reconciler/testing/subscription.go +++ b/pkg/reconciler/testing/subscription.go @@ -65,6 +65,12 @@ func WithSubscriptionUID(uid types.UID) SubscriptionOption { } } +func WithSubscriptionGeneration(gen int64) SubscriptionOption { + return func(s *v1alpha1.Subscription) { + s.Generation = gen + } +} + func WithSubscriptionGenerateName(generateName string) SubscriptionOption { return func(c *v1alpha1.Subscription) { c.ObjectMeta.GenerateName = generateName @@ -141,6 +147,7 @@ func WithSubscriptionFinalizers(finalizers ...string) SubscriptionOption { func MarkSubscriptionReady(s *v1alpha1.Subscription) { s.Status.MarkChannelReady() s.Status.MarkReferencesResolved() + s.Status.MarkAddedToChannel() } func WithSubscriptionReferencesNotResolved(reason, msg string) SubscriptionOption {