diff --git a/cmd/webhook/main.go b/cmd/webhook/main.go index 90289bf3f67..7718d836be7 100644 --- a/cmd/webhook/main.go +++ b/cmd/webhook/main.go @@ -26,9 +26,9 @@ import ( configsv1alpha1 "knative.dev/eventing/pkg/apis/configs/v1alpha1" eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" "knative.dev/eventing/pkg/apis/eventing" - "knative.dev/eventing/pkg/apis/eventing/v1alpha1" + baseeventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" - "knative.dev/eventing/pkg/apis/eventing/v1beta1" + baseeventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" flowsv1alpha1 "knative.dev/eventing/pkg/apis/flows/v1alpha1" legacysourcesv1alpha1 "knative.dev/eventing/pkg/apis/legacysources/v1alpha1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" @@ -201,8 +201,10 @@ func NewLegacySinkBindingWebhook(ctx context.Context, cmw configmap.Watcher) *co func NewConversionController(ctx context.Context, cmw configmap.Watcher) *controller.Impl { var ( - v1alpha1_ = v1alpha1.SchemeGroupVersion.Version - v1beta1_ = v1beta1.SchemeGroupVersion.Version + eventingv1alpha1_ = baseeventingv1alpha1.SchemeGroupVersion.Version + eventingv1beta1_ = baseeventingv1beta1.SchemeGroupVersion.Version + // messagingv1alpha1_ = basemessagingv1alpha1.SchemeGroupVersion.Version + // messagingv1beta1_ = basemessagingv1beta1.SchemeGroupVersion.Version ) return conversion.NewConversionController(ctx, @@ -211,22 +213,34 @@ func NewConversionController(ctx context.Context, cmw configmap.Watcher) *contro // Specify the types of custom resource definitions that should be converted map[schema.GroupKind]conversion.GroupKindConversion{ - v1beta1.Kind("Trigger"): { + // eventing + baseeventingv1beta1.Kind("Trigger"): { DefinitionName: eventing.TriggersResource.String(), - HubVersion: v1alpha1_, + HubVersion: eventingv1alpha1_, Zygotes: map[string]conversion.ConvertibleObject{ - v1alpha1_: &v1alpha1.Trigger{}, - v1beta1_: &v1beta1.Trigger{}, + eventingv1alpha1_: &baseeventingv1alpha1.Trigger{}, + eventingv1beta1_: &baseeventingv1beta1.Trigger{}, }, }, - v1beta1.Kind("Broker"): { + baseeventingv1beta1.Kind("Broker"): { DefinitionName: eventing.BrokersResource.String(), - HubVersion: v1alpha1_, + HubVersion: eventingv1alpha1_, Zygotes: map[string]conversion.ConvertibleObject{ - v1alpha1_: &v1alpha1.Broker{}, - v1beta1_: &v1beta1.Broker{}, + eventingv1alpha1_: &baseeventingv1alpha1.Broker{}, + eventingv1beta1_: &baseeventingv1beta1.Broker{}, }, }, + // messaging + /* + basemessagingv1beta1.Kind("Subscription"): { + DefinitionName: messaging.TriggersResource.String(), + HubVersion: messagingv1alpha1_, + Zygotes: map[string]conversion.ConvertibleObject{ + messagingv1alpha1_: &basemessagingv1alpha1.Subscription{}, + messagingv1beta1_: &basemessagingv1beta1.Subscription{}, + }, + }, + */ }, // A function that infuses the context passed to ConvertUp/ConvertDown/SetDefaults with custom metadata. diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index 77798b28523..02cbe755ea3 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -20,6 +20,7 @@ metadata: knative.dev/crd-install: "true" spec: group: messaging.knative.dev + preserveUnknownFields: false names: kind: Subscription plural: subscriptions @@ -33,6 +34,13 @@ spec: scope: Namespaced subresources: status: {} + conversion: + strategy: None +# strategy: Webhook +# webhookClientConfig: +# service: +# name: eventing-webhook +# namespace: knative-eventing additionalPrinterColumns: - name: Ready type: string @@ -43,85 +51,98 @@ spec: - name: Age type: date JSONPath: .metadata.creationTimestamp + validation: + openAPIV3Schema: + type: object + properties: + spec: + required: + - channel + type: object + properties: + channel: + type: object + description: "Channel that forwards incoming events to the subscription." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + name: + type: string + minLength: 1 + subscriber: + type: object + description: "the subscriber that (optionally) processes events." + properties: + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + minLength: 1 + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + namespace: + type: string + minLength: 1 + reply: + type: object + description: "the destination that (optionally) receive events." + properties: + uri: + type: string + description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." + minLength: 1 + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + namespace: + type: string + minLength: 1 + delivery: + description: "Subscription delivery options. More information: https://knative.dev/docs/eventing/event-delivery." + type: object + x-kubernetes-preserve-unknown-fields: true + status: + type: object + x-kubernetes-preserve-unknown-fields: true versions: - name: v1alpha1 served: true storage: true - schema: - openAPIV3Schema: - type: object - properties: - spec: - required: - - channel - type: object - properties: - channel: - type: object - description: "Channel that forwards incoming events to the subscription." - required: - - apiVersion - - kind - - name - properties: - apiVersion: - type: string - minLength: 1 - kind: - type: string - name: - type: string - minLength: 1 - subscriber: - type: object - description: "the subscriber that (optionally) processes events." - properties: - uri: - type: string - description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." - minLength: 1 - ref: - type: object - description: "a reference to a Kubernetes object from which to retrieve the target URI." - required: - - apiVersion - - kind - - name - properties: - apiVersion: - type: string - minLength: 1 - kind: - type: string - minLength: 1 - name: - type: string - minLength: 1 - reply: - type: object - description: "the destination that (optionally) receive events." - properties: - uri: - type: string - description: "the target URI or, if ref is provided, a relative URI reference that will be combined with ref to produce a target URI." - minLength: 1 - ref: - type: object - description: "a reference to a Kubernetes object from which to retrieve the target URI." - required: - - apiVersion - - kind - - name - properties: - apiVersion: - type: string - minLength: 1 - kind: - type: string - minLength: 1 - name: - type: string - minLength: 1 - name: v1beta1 - served: false + served: true storage: false diff --git a/pkg/apis/messaging/register.go b/pkg/apis/messaging/register.go index 8f678adcd23..97137dcb503 100644 --- a/pkg/apis/messaging/register.go +++ b/pkg/apis/messaging/register.go @@ -16,6 +16,16 @@ limitations under the License. package messaging +import "k8s.io/apimachinery/pkg/runtime/schema" + const ( GroupName = "messaging.knative.dev" ) + +var ( + // SubscriptionssResource respresents a Knative Subscription + TriggersResource = schema.GroupResource{ + Group: GroupName, + Resource: "subscriptions", + } +) diff --git a/pkg/apis/messaging/v1alpha1/subscription_conversion.go b/pkg/apis/messaging/v1alpha1/subscription_conversion.go new file mode 100644 index 00000000000..aaf606c9652 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/subscription_conversion.go @@ -0,0 +1,81 @@ +/* +Copyright 2020 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "fmt" + + duckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1" + duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" + "knative.dev/eventing/pkg/apis/messaging/v1beta1" + "knative.dev/pkg/apis" +) + +// ConvertUp implements apis.Convertible. +// Converts source (from v1alpha1.Subscription) into v1beta1.Subscription +func (source *Subscription) ConvertUp(ctx context.Context, obj apis.Convertible) error { + switch sink := obj.(type) { + case *v1beta1.Subscription: + sink.ObjectMeta = source.ObjectMeta + sink.Spec.Channel = source.Spec.Channel + if source.Spec.Delivery != nil { + sink.Spec.Delivery = &duckv1beta1.DeliverySpec{} + if err := source.Spec.Delivery.ConvertUp(ctx, sink.Spec.Delivery); err != nil { + return err + } + } + sink.Spec.Subscriber = source.Spec.Subscriber + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.PhysicalSubscription.SubscriberURI = source.Status.PhysicalSubscription.SubscriberURI + sink.Status.PhysicalSubscription.ReplyURI = source.Status.PhysicalSubscription.ReplyURI + sink.Status.PhysicalSubscription.DeadLetterSinkURI = source.Status.PhysicalSubscription.DeadLetterSinkURI + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", sink) + + } +} + +// ConvertDown implements apis.Convertible. +// Converts obj from v1beta1.Subscription into v1alpha1.Subscription +func (sink *Subscription) ConvertDown(ctx context.Context, obj apis.Convertible) error { + switch source := obj.(type) { + case *v1beta1.Subscription: + sink.ObjectMeta = source.ObjectMeta + sink.Spec.Channel = source.Spec.Channel + if source.Spec.Delivery != nil { + sink.Spec.Delivery = &duckv1alpha1.DeliverySpec{} + if err := sink.Spec.Delivery.ConvertDown(ctx, source.Spec.Delivery); err != nil { + return err + } + } + sink.Spec.Subscriber = source.Spec.Subscriber + sink.Spec.Reply = source.Spec.Reply + + sink.Status.Status = source.Status.Status + sink.Status.PhysicalSubscription.SubscriberURI = source.Status.PhysicalSubscription.SubscriberURI + sink.Status.PhysicalSubscription.ReplyURI = source.Status.PhysicalSubscription.ReplyURI + sink.Status.PhysicalSubscription.DeadLetterSinkURI = source.Status.PhysicalSubscription.DeadLetterSinkURI + + return nil + default: + return fmt.Errorf("Unknown conversion, got: %T", source) + } +} diff --git a/pkg/apis/messaging/v1alpha1/subscription_conversion_test.go b/pkg/apis/messaging/v1alpha1/subscription_conversion_test.go new file mode 100644 index 00000000000..2bc5d620f28 --- /dev/null +++ b/pkg/apis/messaging/v1alpha1/subscription_conversion_test.go @@ -0,0 +1,140 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1alpha1 + +import ( + "context" + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/pointer" + "knative.dev/eventing/pkg/apis/duck/v1alpha1" + "knative.dev/eventing/pkg/apis/messaging/v1beta1" + "knative.dev/pkg/apis" + duckv1 "knative.dev/pkg/apis/duck/v1" +) + +// TODO: Replace dummy some other messaging object once they +// implement apis.Convertible +type dummy struct{} + +func (*dummy) ConvertUp(ctx context.Context, obj apis.Convertible) error { + return errors.New("Won't go") +} + +func (*dummy) ConvertDown(ctx context.Context, obj apis.Convertible) error { + return errors.New("Won't go") +} + +func TestSubscriptionConversionBadType(t *testing.T) { + good, bad := &Subscription{}, &dummy{} + + if err := good.ConvertUp(context.Background(), bad); err == nil { + t.Errorf("ConvertUp() = %#v, wanted error", bad) + } + + if err := good.ConvertDown(context.Background(), bad); err == nil { + t.Errorf("ConvertDown() = %#v, wanted error", good) + } +} + +func TestSubscriptionConversion(t *testing.T) { + // Just one for now, just adding the for loop for ease of future changes. + versions := []apis.Convertible{&v1beta1.Subscription{}} + + linear := v1alpha1.BackoffPolicyLinear + + tests := []struct { + name string + in *Subscription + }{{ + name: "min configuration", + in: &Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: "broker-name", + Namespace: "broker-ns", + Generation: 17, + }, + Spec: SubscriptionSpec{}, + }, + }, { + name: "full configuration", + in: &Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: "broker-name", + Namespace: "broker-ns", + Generation: 17, + }, + Spec: SubscriptionSpec{ + Channel: corev1.ObjectReference{ + Kind: "channelKind", + Namespace: "channelNamespace", + Name: "channelName", + APIVersion: "channelAPIVersion", + }, + Delivery: &v1alpha1.DeliverySpec{ + DeadLetterSink: &duckv1.Destination{ + Ref: &duckv1.KReference{ + Kind: "dlKind", + Namespace: "dlNamespace", + Name: "dlName", + APIVersion: "dlAPIVersion", + }, + URI: apis.HTTP("dls"), + }, + Retry: pointer.Int32Ptr(5), + BackoffPolicy: &linear, + BackoffDelay: pointer.StringPtr("5s"), + }, + }, + Status: SubscriptionStatus{ + Status: duckv1.Status{ + ObservedGeneration: 1, + Conditions: duckv1.Conditions{{ + Type: "Ready", + Status: "True", + }}, + }, + PhysicalSubscription: SubscriptionStatusPhysicalSubscription{ + SubscriberURI: apis.HTTP("subscriber.example.com"), + ReplyURI: apis.HTTP("reply.example.com"), + DeadLetterSinkURI: apis.HTTP("dlc.example.com"), + }, + }, + }, + }} + for _, test := range tests { + for _, version := range versions { + t.Run(test.name, func(t *testing.T) { + ver := version + if err := test.in.ConvertUp(context.Background(), ver); err != nil { + t.Errorf("ConvertUp() = %v", err) + } + got := &Subscription{} + if err := got.ConvertDown(context.Background(), ver); err != nil { + t.Errorf("ConvertDown() = %v", err) + } + if diff := cmp.Diff(test.in, got); diff != "" { + t.Errorf("roundtrip (-want, +got) = %v", diff) + } + }) + } + } +} diff --git a/pkg/apis/messaging/v1beta1/subscription_conversion.go b/pkg/apis/messaging/v1beta1/subscription_conversion.go new file mode 100644 index 00000000000..915d3ee111e --- /dev/null +++ b/pkg/apis/messaging/v1beta1/subscription_conversion.go @@ -0,0 +1,34 @@ +/* +Copyright 2020 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "context" + "fmt" + + "knative.dev/pkg/apis" +) + +// ConvertUp implements apis.Convertible +func (source *Subscription) ConvertUp(ctx context.Context, sink apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", sink) +} + +// ConvertDown implements apis.Convertible +func (sink *Subscription) ConvertDown(ctx context.Context, source apis.Convertible) error { + return fmt.Errorf("v1beta1 is the highest known version, got: %T", source) +} diff --git a/pkg/apis/messaging/v1beta1/subscription_conversion_test.go b/pkg/apis/messaging/v1beta1/subscription_conversion_test.go new file mode 100644 index 00000000000..d733ff2a527 --- /dev/null +++ b/pkg/apis/messaging/v1beta1/subscription_conversion_test.go @@ -0,0 +1,34 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1beta1 + +import ( + "context" + "testing" +) + +func TestSubscriptionConversionBadType(t *testing.T) { + good, bad := &Subscription{}, &Subscription{} + + if err := good.ConvertUp(context.Background(), bad); err == nil { + t.Errorf("ConvertUp() = %#v, wanted error", bad) + } + + if err := good.ConvertDown(context.Background(), bad); err == nil { + t.Errorf("ConvertDown() = %#v, wanted error", good) + } +} diff --git a/pkg/reconciler/broker/trigger.go b/pkg/reconciler/broker/trigger.go index bbca1d80b98..8d8701ff592 100644 --- a/pkg/reconciler/broker/trigger.go +++ b/pkg/reconciler/broker/trigger.go @@ -148,6 +148,7 @@ func (r *Reconciler) reconcileSubscription(ctx context.Context, t *v1alpha1.Trig if equality.Semantic.DeepDerivative(expected.Spec, actual.Spec) { return actual, nil } + logging.FromContext(ctx).Info("Differing Subscription", zap.Any("expected", expected.Spec), zap.Any("actual", actual.Spec)) // Given that spec.channel is immutable, we cannot just update the Subscription. We delete // it and re-create it instead. diff --git a/test/e2e/channel_single_event_test.go b/test/e2e/channel_single_event_test.go index 32ee9a4a085..b4e32c5491d 100644 --- a/test/e2e/channel_single_event_test.go +++ b/test/e2e/channel_single_event_test.go @@ -37,6 +37,7 @@ func TestSingleBinaryEventForChannel(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, cloudevents.Binary, + "v1alpha1", channelTestRunner, ) } @@ -45,6 +46,25 @@ func TestSingleStructuredEventForChannel(t *testing.T) { helpers.SingleEventForChannelTestHelper( t, cloudevents.Structured, + "v1alpha1", + channelTestRunner, + ) +} + +func TestSingleBinaryEventForChannelV1Beta1(t *testing.T) { + helpers.SingleEventForChannelTestHelper( + t, + cloudevents.Binary, + "v1beta1", + channelTestRunner, + ) +} + +func TestSingleStructuredEventForChannelV1Beta1(t *testing.T) { + helpers.SingleEventForChannelTestHelper( + t, + cloudevents.Structured, + "v1beta1", channelTestRunner, ) } diff --git a/test/e2e/helpers/channel_single_event_helper.go b/test/e2e/helpers/channel_single_event_helper.go index 551181aebd7..1f5a13b80cc 100644 --- a/test/e2e/helpers/channel_single_event_helper.go +++ b/test/e2e/helpers/channel_single_event_helper.go @@ -28,8 +28,16 @@ import ( "knative.dev/eventing/test/lib/resources" ) +type subscriptionVersion string + +const ( + subscriptionV1alpha1 subscriptionVersion = "v1alpha1" + subscriptionV1beta1 subscriptionVersion = "v1beta1" +) + // SingleEventForChannelTestHelper is the helper function for channel_single_event_test func SingleEventForChannelTestHelper(t *testing.T, encoding string, + subscriptionVersion subscriptionVersion, channelTestRunner lib.ChannelTestRunner, options ...lib.SetupClientOption) { channelName := "e2e-singleevent-channel-" + encoding @@ -50,12 +58,22 @@ func SingleEventForChannelTestHelper(t *testing.T, encoding string, client.CreatePodOrFail(pod, lib.WithService(loggerPodName)) // create subscription to subscribe the channel, and forward the received events to the logger service - client.CreateSubscriptionOrFail( - subscriptionName, - channelName, - &channel, - resources.WithSubscriberForSubscription(loggerPodName), - ) + switch subscriptionVersion { + case subscriptionV1alpha1: + client.CreateSubscriptionOrFail( + subscriptionName, + channelName, + &channel, + resources.WithSubscriberForSubscription(loggerPodName), + ) + case subscriptionV1beta1: + client.CreateSubscriptionOrFailV1Beta1( + subscriptionName, + channelName, + &channel, + resources.WithSubscriberForSubscriptionV1Beta1(loggerPodName), + ) + } // wait for all test resources to be ready, so that we can start sending events client.WaitForAllTestResourcesReadyOrFail() diff --git a/test/lib/creation.go b/test/lib/creation.go index ffc345d3986..79f18ceb96b 100644 --- a/test/lib/creation.go +++ b/test/lib/creation.go @@ -94,6 +94,24 @@ func (client *Client) CreateSubscriptionOrFail( client.Tracker.AddObj(subscription) } +// CreateSubscriptionOrFailV1Beta1 will create a Subscription or fail the test if there is an error. +func (client *Client) CreateSubscriptionOrFailV1Beta1( + name, channelName string, + channelTypeMeta *metav1.TypeMeta, + options ...resources.SubscriptionOptionV1Beta1, +) { + namespace := client.Namespace + subscription := resources.SubscriptionV1Beta1(name, channelName, channelTypeMeta, options...) + subscriptions := client.Eventing.MessagingV1beta1().Subscriptions(namespace) + client.T.Logf("Creating v1beta1 subscription %s for channel %+v-%s", name, channelTypeMeta, channelName) + // update subscription with the new reference + subscription, err := subscriptions.Create(subscription) + if err != nil { + client.T.Fatalf("Failed to create subscription %q: %v", name, err) + } + client.Tracker.AddObj(subscription) +} + // CreateSubscriptionsOrFail will create a list of Subscriptions with the same configuration except the name. func (client *Client) CreateSubscriptionsOrFail( names []string, diff --git a/test/lib/resources/eventing.go b/test/lib/resources/eventing.go index 539ae1d8315..e854ae15d12 100644 --- a/test/lib/resources/eventing.go +++ b/test/lib/resources/eventing.go @@ -33,6 +33,7 @@ import ( eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" messagingv1alpha1 "knative.dev/eventing/pkg/apis/messaging/v1alpha1" + messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" "knative.dev/eventing/pkg/reconciler/namespace/resources" ) @@ -48,6 +49,9 @@ type TriggerOptionV1Beta1 func(*eventingv1beta1.Trigger) // SubscriptionOption enables further configuration of a Subscription. type SubscriptionOption func(*messagingv1alpha1.Subscription) +// SubscriptionOptionV1Beta1 enables further configuration of a Subscription. +type SubscriptionOptionV1Beta1 func(*messagingv1beta1.Subscription) + // DeliveryOption enables further configuration of DeliverySpec. type DeliveryOption func(*eventingduckv1alpha1.DeliverySpec) @@ -76,6 +80,18 @@ func WithSubscriberForSubscription(name string) SubscriptionOption { } } +// WithSubscriberForSubscriptionV1Beta1 returns an option that adds a Subscriber for the given +// v1beta1 Subscription. +func WithSubscriberForSubscriptionV1Beta1(name string) SubscriptionOptionV1Beta1 { + return func(s *messagingv1beta1.Subscription) { + if name != "" { + s.Spec.Subscriber = &duckv1.Destination{ + Ref: KnativeRefForService(name, ""), + } + } + } +} + // WithReplyForSubscription returns an options that adds a ReplyStrategy for the given Subscription. func WithReplyForSubscription(name string, typemeta *metav1.TypeMeta) SubscriptionOption { return func(s *messagingv1alpha1.Subscription) { @@ -129,6 +145,26 @@ func Subscription( return subscription } +// SubscriptionV1Beta1 returns a v1beta1 Subscription. +func SubscriptionV1Beta1( + name, channelName string, + channelTypeMeta *metav1.TypeMeta, + options ...SubscriptionOptionV1Beta1, +) *messagingv1beta1.Subscription { + subscription := &messagingv1beta1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: messagingv1beta1.SubscriptionSpec{ + Channel: *channelRef(channelName, channelTypeMeta), + }, + } + for _, option := range options { + option(subscription) + } + return subscription +} + // WithChannelTemplateForBroker returns a function that adds a ChannelTemplate for the given Broker. func WithChannelTemplateForBroker(channelTypeMeta *metav1.TypeMeta) BrokerOption { return func(b *eventingv1alpha1.Broker) {