Skip to content
Merged
10 changes: 7 additions & 3 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ required = [
[[constraint]]
name = "github.com/knative/pkg"
# HEAD as of 2018-09-20
revision = "a133825579436877df315e7cb6d8aeba49b9f575"
revision = "51f6214feeea1485dc8691a661d953fea1409402"

[[constraint]]
name = "github.com/knative/serving"
Expand Down
4 changes: 4 additions & 0 deletions cmd/controller/controller-runtime-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package main

import (
channelsv1alpha1 "github.com/knative/eventing/pkg/apis/channels/v1alpha1"
subscriptionsv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
feedsv1alpha1 "github.com/knative/eventing/pkg/apis/feeds/v1alpha1"
flowsv1alpha1 "github.com/knative/eventing/pkg/apis/flows/v1alpha1"
"github.com/knative/eventing/pkg/controller/eventing/subscription"
"github.com/knative/eventing/pkg/controller/feed"
"github.com/knative/eventing/pkg/controller/flow"

Expand Down Expand Up @@ -58,6 +60,7 @@ func controllerRuntimeStart() error {
feedsv1alpha1.AddToScheme,
flowsv1alpha1.AddToScheme,
istiov1alpha3.AddToScheme,
subscriptionsv1alpha1.AddToScheme,
}
for _, schemeFunc := range schemeFuncs {
schemeFunc(mrg.GetScheme())
Expand All @@ -69,6 +72,7 @@ func controllerRuntimeStart() error {
eventtype.ProvideController,
feed.ProvideController,
flow.ProvideController,
subscription.ProvideController,
}

for _, provider := range providers {
Expand Down
31 changes: 31 additions & 0 deletions config/300-subscriptioneventing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2018 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: subscriptions.eventing.knative.dev
spec:
group: eventing.knative.dev
version: v1alpha1
names:
kind: Subscription
plural: subscriptions
singular: subscription
categories:
- all
- knative
- eventing
shortNames:
- sub
scope: Namespaced
37 changes: 12 additions & 25 deletions pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ var _ duckv1alpha1.ConditionsAccessor = (*ChannelStatus)(nil)

// Check that Channel implements the Conditions duck type.
var _ = duck.VerifyType(&Channel{}, &duckv1alpha1.Conditions{})
var _ = duck.VerifyType(&Channel{}, &duckv1alpha1.Channelable{})
var _ = duck.VerifyType(&Channel{}, &duckv1alpha1.Subscribable{})
var _ = duck.VerifyType(&Channel{}, &duckv1alpha1.Sinkable{})

// ChannelSpec specifies the Provisioner backing a channel and the configuration
// arguments for a Channel.
Expand All @@ -80,24 +83,8 @@ type ChannelSpec struct {
// +optional
Arguments *runtime.RawExtension `json:"arguments,omitempty"`

// Subscribers is a list of the Subscribers to this channel. This is filled in
// by the Subscriptions controller. Users should not mutate this field.
Subscribers []ChannelSubscriberSpec `json:"subscribers,omitempty"`
}

// ChannelSubscriberSpec defines a single subscriber to a Channel. At least one
// of Call or Result must be present.
type ChannelSubscriberSpec struct {
// Call is an optional reference to a function for processing events.
// Events from the From channel will be delivered here and replies
// are optionally handled by Result.
// +optional
Call *Callable `json:"call,omitempty"`

// Result optionally specifies how to handle events received from the Call
// target.
// +optional
Result *ResultStrategy `json:"result,omitempty"`
// Channel conforms to Duck type Channelable.
Channelable *duckv1alpha1.Channelable `json:"channelable,omitempty"`
}

var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned)
Expand All @@ -112,13 +99,13 @@ type ChannelStatus struct {
// +optional
ObservedGeneration int64 `json:"observedGeneration,omitempty"`

// DomainInternal holds the top-level domain that will distribute traffic
// over the provided targets from inside the cluster. It generally has the
// form {channel}.{namespace}.svc.cluster.local
// TODO: move this to a struct that can be embedded similar to ObjectMeta and
// TypeMeta.
// +optional
DomainInternal string `json:"domainInternal,omitempty"`
// Channel is Sinkable. It currently exposes the endpoint as top-level domain
// that will distribute traffic over the provided targets from inside the cluster.
// It generally has the form {channel}.{namespace}.svc.cluster.local
Sinkable duckv1alpha1.Sinkable `json:"sinkable,omitempty"`

// Channel is Subscribable. It just points to itself
Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"`

// Represents the latest available observations of a channel's current state.
// +optional
Expand Down
12 changes: 8 additions & 4 deletions pkg/apis/eventing/v1alpha1/channel_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,13 @@ func (cs *ChannelSpec) Validate() *apis.FieldError {
errs = errs.Also(apis.ErrMissingField("provisioner"))
}

for i, subscriber := range cs.Subscribers {
if subscriber.Call == nil && subscriber.Result == nil {
errs = errs.Also(apis.ErrMissingField("call", "result").ViaField(fmt.Sprintf("subscriber[%d]", i)))
if cs.Channelable != nil {
for i, subscriber := range cs.Channelable.Subscribers {
if subscriber.SinkableDomain == "" && subscriber.CallableDomain == "" {
fe := apis.ErrMissingField("sinkableDomain", "callableDomain")
fe.Details = "expected at least one of, got none"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc @n3wscott I think there's a helper for this?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find. I thought I found it but the name was a bit misleading (IMHO). I created one but didn't want to block this on it and instead would prefer to clean it up later.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this:

apis.ErrMissingField("sinkableDomain", "callableDomain").ViaField(fmt.Sprintf("subscribers[%d]", i))

Copy link
Copy Markdown
Contributor

@n3wscott n3wscott Sep 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no helper for adding details to a Err* type method, there is a helper for setting the index.

it should be:

errs = errs.Also(apis.ErrMissingField("sinkableDomain", "callableDomain").ViaFieldIndex("subscribers", i))

errs = errs.Also(fe.ViaField(fmt.Sprintf("subscriber[%d]", i)).ViaField("channelable"))
}
}
}

Expand All @@ -51,7 +55,7 @@ func (current *Channel) CheckImmutableFields(og apis.Immutable) *apis.FieldError
if !ok {
return &apis.FieldError{Message: "The provided resource was not a Channel"}
}
ignoreArguments := cmpopts.IgnoreFields(ChannelSpec{}, "Arguments")
ignoreArguments := cmpopts.IgnoreFields(ChannelSpec{}, "Arguments", "Channelable")
if diff := cmp.Diff(original.Spec, current.Spec, ignoreArguments); diff != "" {
return &apis.FieldError{
Message: "Immutable fields changed",
Expand Down
66 changes: 31 additions & 35 deletions pkg/apis/eventing/v1alpha1/channel_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/google/go-cmp/cmp"
"github.com/knative/pkg/apis"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
Expand Down Expand Up @@ -59,50 +60,35 @@ func TestChannelValidation(t *testing.T) {
Name: "foo",
},
},
Subscribers: []ChannelSubscriberSpec{{
Call: &Callable{
TargetURI: &targetURI,
},
}, {
Result: &ResultStrategy{
Target: &corev1.ObjectReference{
APIVersion: "eventing.knative.dev/v1alpha1",
Kind: "Channel",
Name: "to-chan",
},
},
}, {
Call: &Callable{
TargetURI: &targetURI,
},
Result: &ResultStrategy{
Target: &corev1.ObjectReference{
APIVersion: "eventing.knative.dev/v1alpha1",
Kind: "Channel",
Name: "to-chan",
},
},
Channelable: &duckv1alpha1.Channelable{
Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{
CallableDomain: "callableendpoint",
SinkableDomain: "resultendpoint",
}},
}},
},
},
want: nil,
}, {
name: "empty subscriber",
name: "empty subscriber at index 1",
c: &Channel{
Spec: ChannelSpec{
Provisioner: &ProvisionerReference{
Ref: &corev1.ObjectReference{
Name: "foo",
},
},
Subscribers: []ChannelSubscriberSpec{{
Call: &Callable{
TargetURI: &targetURI,
},
}, {}},
},
Channelable: &duckv1alpha1.Channelable{
Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{
CallableDomain: "callableendpoint",
SinkableDomain: "callableendpoint",
}, {}},
}},
},
want: apis.ErrMissingField("spec.subscriber[1].call", "spec.subscriber[1].result"),
want: func() *apis.FieldError {
fe := apis.ErrMissingField("spec.channelable.subscriber[1].sinkableDomain", "spec.channelable.subscriber[1].callableDomain")
fe.Details = "expected at least one of, got none"
return fe
}(),
}, {
name: "2 empty subscribers",
c: &Channel{
Expand All @@ -112,11 +98,21 @@ func TestChannelValidation(t *testing.T) {
Name: "foo",
},
},
Subscribers: []ChannelSubscriberSpec{{}, {}},
Channelable: &duckv1alpha1.Channelable{
Subscribers: []duckv1alpha1.ChannelSubscriberSpec{{}, {}},
},
},
},
want: apis.ErrMissingField("spec.subscriber[0].call", "spec.subscriber[0].result").
Also(apis.ErrMissingField("spec.subscriber[1].call", "spec.subscriber[1].result")),
want: func() *apis.FieldError {
var errs *apis.FieldError
fe := apis.ErrMissingField("spec.channelable.subscriber[0].sinkableDomain", "spec.channelable.subscriber[0].callableDomain")
fe.Details = "expected at least one of, got none"
errs = errs.Also(fe)
fe = apis.ErrMissingField("spec.channelable.subscriber[1].sinkableDomain", "spec.channelable.subscriber[1].callableDomain")
fe.Details = "expected at least one of, got none"
errs = errs.Also(fe)
return errs
}(),
}}

for _, test := range tests {
Expand Down
25 changes: 25 additions & 0 deletions pkg/apis/eventing/v1alpha1/subscription_defaults_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
Copyright 2018 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import "testing"

// No-op test because method does nothing.
func TestSubscriptionDefaults(t *testing.T) {
s := Subscription{}
s.SetDefaults()
}
20 changes: 20 additions & 0 deletions pkg/apis/eventing/v1alpha1/subscription_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ var _ duckv1alpha1.ConditionsAccessor = (*SubscriptionStatus)(nil)
// Check that Subscription implements the Conditions duck type.
var _ = duck.VerifyType(&Subscription{}, &duckv1alpha1.Conditions{})

// And it's Subscribable
var _ = duck.VerifyType(&Subscription{}, &duckv1alpha1.Subscribable{})

// SubscriptionSpec specifies the Channel for incoming events, a Call target for
// processing those events and where to put the result of the processing. Only
// From (where the events are coming from) is always required. You can optionally
Expand Down Expand Up @@ -175,8 +178,25 @@ type SubscriptionStatus struct {
// +patchMergeKey=type
// +patchStrategy=merge
Conditions duckv1alpha1.Conditions `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type"`

// Subscription might be Subscribable. This depends if there's a Result channel
// In that case, this points to that resource.
Subscribable duckv1alpha1.Subscribable `json:"subscribable,omitempty"`
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, if a Subscription has a Result Channel, then chained Subscriptions are allowed. It would be possible to create another Subscription that looks like this:

from:
  ref:
    kind: Subscription
    name: some-sub
  call:
    // ...
  result:
    // ...

Is this the intent?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES! :) That's exactly right.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YES! That's exactly right, and only if a Subscription has a Result. Just need to fill the status up for that and Result.Status would then point to the resolved ResultChannel

}

const (
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to update line 173 with:

var subCondSet = duckv1alpha1.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionFromReady)

// SubscriptionConditionReady has status True when all subconditions below have been set to True.
SubscriptionConditionReady = duckv1alpha1.ConditionReady

// SubscriptionReferencesResolved has status True when all the specified references have been successfully
// resolved.
SubscriptionConditionReferencesResolved duckv1alpha1.ConditionType = "Resolved"

// SubscriptionConditionFromReady has status True when controller has successfully added a subscription to From
// resource.
SubscriptionConditionFromReady duckv1alpha1.ConditionType = "FromReady"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After you add these, it is helpful to expose an IsReady method using the Conditions helper to be the judge.

https://github.com/knative/eventing/pull/457/files#diff-67a9c730f0d592911dfa317323fea9e5R131

Also a InitializeConditions

https://github.com/knative/eventing/pull/457/files#diff-67a9c730f0d592911dfa317323fea9e5R137

And then Mark* style methods as you need them.

)

// GetSpecJSON returns spec as json
func (s *Subscription) GetSpecJSON() ([]byte, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can delete GetSpecJSON now that the duck work has gone in.

return json.Marshal(s.Spec)
Expand Down
Loading