Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 76 additions & 36 deletions pkg/apis/duck/v1alpha1/channelable_combined_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"
Expand All @@ -31,8 +32,8 @@ import (
// +genduck
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

// ChannelableCombined is a skeleton type wrapping Subscribable and Addressable of both
// v1alpha1 and v1beta1 duck types. This is not to be used by resource writers and is
// ChannelableCombined is a skeleton type wrapping Subscribable and Addressable of
// v1alpha1, v1beta1 and v1 duck types. This is not to be used by resource writers and is
// only used by Subscription Controller to synthesize patches and read the Status
// of the Channelable Resources.
// This is not a real resource.
Expand All @@ -55,8 +56,17 @@ type ChannelableCombinedSpec struct {
eventingduckv1beta1.SubscribableSpec `json:",inline"`

// DeliverySpec contains options controlling the event delivery
// for the v1beta1 spec compatibility.
// +optional
Delivery *eventingduckv1beta1.DeliverySpec `json:"delivery,omitempty"`

// SubscribableSpecv1 is for the v1 spec compatibility.
SubscribableSpecv1 eventingduckv1.SubscribableSpec `json:",inline"`

// DeliverySpecv1 contains options controlling the event delivery
// for the v1 spec compatibility.
// +optional
Deliveryv1 *eventingduckv1.DeliverySpec `json:"deliveryv1,omitempty"`
}

// ChannelableStatus contains the Status of a Channelable object.
Expand All @@ -71,6 +81,8 @@ type ChannelableCombinedStatus struct {
SubscribableTypeStatus `json:",inline"`
// SubscribableStatus is the v1beta1 part of the Subscribers status.
eventingduckv1beta1.SubscribableStatus `json:",inline"`
// SubscribableStatusv1 is the v1 part of the Subscribers status.
SubscribableStatusv1 eventingduckv1.SubscribableStatus `json:",inline"`
// ErrorChannel is set by the channel when it supports native error handling via a channel
// +optional
ErrorChannel *corev1.ObjectReference `json:"errorChannel,omitempty"`
Expand Down Expand Up @@ -113,23 +125,67 @@ func (c *ChannelableCombined) Populate() {
ReplyURI: apis.HTTP("sink2"),
}},
}
c.Spec.SubscribableSpecv1 = eventingduckv1.SubscribableSpec{
// Populate ALL fields
Subscribers: []eventingduckv1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call1"),
ReplyURI: apis.HTTP("sink2"),
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 2,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}},
}
retry := int32(5)
linear := eventingduckv1beta1.BackoffPolicyLinear
linearv1 := eventingduckv1.BackoffPolicyLinear
delay := "5s"
c.Spec.Delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "aname",
},
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
deadLetterSink := duckv1.Destination{
Ref: &duckv1.KReference{
Name: "aname",
},
Retry: &retry,
BackoffPolicy: &linear,
BackoffDelay: &delay,
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
}
c.Spec.Delivery = &eventingduckv1beta1.DeliverySpec{
DeadLetterSink: &deadLetterSink,
Retry: &retry,
BackoffPolicy: &linear,
BackoffDelay: &delay,
}
c.Spec.Deliveryv1 = &eventingduckv1.DeliverySpec{
DeadLetterSink: &deadLetterSink,
Retry: &retry,
BackoffPolicy: &linearv1,
BackoffDelay: &delay,
}
subscribers := []eventingduckv1beta1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}}
subscribersv1 := []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}}
c.Status = ChannelableCombinedStatus{
AddressStatus: v1alpha1.AddressStatus{
Address: &v1alpha1.Addressable{
Expand All @@ -144,31 +200,15 @@ func (c *ChannelableCombined) Populate() {
},
},
SubscribableStatus: eventingduckv1beta1.SubscribableStatus{
Subscribers: []eventingduckv1beta1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribers: subscribers,
},
SubscribableStatusv1: eventingduckv1.SubscribableStatus{
Subscribers: subscribersv1,
},
SubscribableTypeStatus: SubscribableTypeStatus{
SubscribableStatus: &SubscribableStatus{
Subscribers: []eventingduckv1beta1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribers: subscribers,
Subscribersv1: subscribersv1,
},
},
}
Expand Down
54 changes: 54 additions & 0 deletions pkg/apis/duck/v1alpha1/channelable_combined_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
Expand All @@ -44,6 +45,7 @@ func TestChannelableCombinedPopulate(t *testing.T) {

retry := int32(5)
linear := eventingduckv1beta1.BackoffPolicyLinear
linearv1 := eventingduckv1.BackoffPolicyLinear
delay := "5s"
want := &ChannelableCombined{
Spec: ChannelableCombinedSpec{
Expand All @@ -61,6 +63,20 @@ func TestChannelableCombinedPopulate(t *testing.T) {
ReplyURI: apis.HTTP("sink2"),
}},
},
SubscribableSpecv1: eventingduckv1.SubscribableSpec{
// Populate ALL fields
Subscribers: []eventingduckv1.SubscriberSpec{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 1,
SubscriberURI: apis.HTTP("call1"),
ReplyURI: apis.HTTP("sink2"),
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
Generation: 2,
SubscriberURI: apis.HTTP("call2"),
ReplyURI: apis.HTTP("sink2"),
}},
},
SubscribableTypeSpec: SubscribableTypeSpec{
Subscribable: &Subscribable{
Subscribers: []SubscriberSpec{{
Expand Down Expand Up @@ -90,6 +106,20 @@ func TestChannelableCombinedPopulate(t *testing.T) {
BackoffPolicy: &linear,
BackoffDelay: &delay,
},
Deliveryv1: &eventingduckv1.DeliverySpec{
DeadLetterSink: &duckv1.Destination{
Ref: &duckv1.KReference{
Name: "aname",
},
URI: &apis.URL{
Scheme: "http",
Host: "test-error-domain",
},
},
Retry: &retry,
BackoffPolicy: &linearv1,
BackoffDelay: &delay,
},
},

Status: ChannelableCombinedStatus{
Expand Down Expand Up @@ -118,6 +148,19 @@ func TestChannelableCombinedPopulate(t *testing.T) {
Message: "Some message",
}},
},
SubscribableStatusv1: eventingduckv1.SubscribableStatus{
Subscribers: []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
},
SubscribableTypeStatus: SubscribableTypeStatus{
SubscribableStatus: &SubscribableStatus{
Subscribers: []eventingduckv1beta1.SubscriberStatus{{
Expand All @@ -131,6 +174,17 @@ func TestChannelableCombinedPopulate(t *testing.T) {
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribersv1: []eventingduckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
},
},
},
Expand Down
32 changes: 29 additions & 3 deletions pkg/apis/duck/v1alpha1/subscribable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
"knative.dev/pkg/apis"
"knative.dev/pkg/apis/duck"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingduckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
)

// +genduck
Expand Down Expand Up @@ -62,6 +62,8 @@ type SubscriberSpec struct {
DeadLetterSinkURI *apis.URL `json:"deadLetterSink,omitempty"`
// +optional
Delivery *duckv1beta1.DeliverySpec `json:"delivery,omitempty"`
// +optional
Deliveryv1 *duckv1.DeliverySpec `json:"deliveryv1,omitempty"`
}

// SubscribableStatus is the schema for the subscribable's status portion of the status
Expand All @@ -71,6 +73,11 @@ type SubscribableStatus struct {
// +patchMergeKey=uid
// +patchStrategy=merge
Subscribers []duckv1beta1.SubscriberStatus `json:"subscribers,omitempty" patchStrategy:"merge" patchMergeKey:"uid"`

// This is the list of subscription's statuses for this channel.
// +patchMergeKey=uid
// +patchStrategy=merge
Subscribersv1 []duckv1.SubscriberStatus `json:"subscribersv1,omitempty" patchStrategy:"merge" patchMergeKey:"uid"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down Expand Up @@ -132,11 +139,19 @@ func (s *SubscribableTypeStatus) SetSubscribableTypeStatus(subscriberStatus Subs
// AddSubscriberToSubscribableStatus method is a Helper method for type SubscribableTypeStatus, if Subscribable Status needs to be appended
// with Subscribers, use this function, so that the value is reflected in both the duplicate fields residing
// in SubscribableTypeStatus
func (s *SubscribableTypeStatus) AddSubscriberToSubscribableStatus(subscriberStatus eventingduckv1beta1.SubscriberStatus) {
func (s *SubscribableTypeStatus) AddSubscriberToSubscribableStatus(subscriberStatus duckv1beta1.SubscriberStatus) {
subscribers := append(s.GetSubscribableTypeStatus().Subscribers, subscriberStatus)
s.SubscribableStatus.Subscribers = subscribers
}

// AddSubscriberToSubscribableStatus method is a Helper method for type SubscribableTypeStatus, if Subscribable Status needs to be appended
// with Subscribers, use this function, so that the value is reflected in both the duplicate fields residing
// in SubscribableTypeStatus
func (s *SubscribableTypeStatus) AddSubscriberV1ToSubscribableStatus(subscriberStatus duckv1.SubscriberStatus) {
subscribersv1 := append(s.GetSubscribableTypeStatus().Subscribersv1, subscriberStatus)
s.SubscribableStatus.Subscribersv1 = subscribersv1
}

// GetFullType implements duck.Implementable
func (s *Subscribable) GetFullType() duck.Populatable {
return &SubscribableType{}
Expand All @@ -160,7 +175,18 @@ func (c *SubscribableType) Populate() {
}
c.Status.SetSubscribableTypeStatus(SubscribableStatus{
// Populate ALL fields
Subscribers: []eventingduckv1beta1.SubscriberStatus{{
Subscribers: []duckv1beta1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Message: "Some message",
}, {
UID: "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 2,
Ready: corev1.ConditionFalse,
Message: "Some message",
}},
Subscribersv1: []duckv1.SubscriberStatus{{
UID: "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1",
ObservedGeneration: 1,
Ready: corev1.ConditionTrue,
Expand Down
Loading