Skip to content
Merged
14 changes: 1 addition & 13 deletions pkg/apis/eventing/v1alpha1/trigger_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,13 @@ package v1alpha1

import duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"

var triggerCondSet = duckv1alpha1.NewLivingConditionSet(TriggerConditionBrokerExists, TriggerConditionKubernetesService, TriggerConditionVirtualService, TriggerConditionSubscribed)
var triggerCondSet = duckv1alpha1.NewLivingConditionSet(TriggerConditionBrokerExists, TriggerConditionSubscribed)

const (
// TriggerConditionReady has status True when all subconditions below have been set to True.
TriggerConditionReady = duckv1alpha1.ConditionReady
TriggerConditionBrokerExists duckv1alpha1.ConditionType = "BrokerExists"

TriggerConditionKubernetesService duckv1alpha1.ConditionType = "KubernetesServiceReady"

TriggerConditionVirtualService duckv1alpha1.ConditionType = "VirtualServiceReady"

TriggerConditionSubscribed duckv1alpha1.ConditionType = "Subscribed"

// TriggerAnyFilter Constant to represent that we should allow anything.
Expand Down Expand Up @@ -58,14 +54,6 @@ func (ts *TriggerStatus) MarkBrokerDoesNotExist() {
triggerCondSet.Manage(ts).MarkFalse(TriggerConditionBrokerExists, "doesNotExist", "Broker does not exist")
}

func (ts *TriggerStatus) MarkKubernetesServiceExists() {
triggerCondSet.Manage(ts).MarkTrue(TriggerConditionKubernetesService)
}

func (ts *TriggerStatus) MarkVirtualServiceExists() {
triggerCondSet.Manage(ts).MarkTrue(TriggerConditionVirtualService)
}

func (ts *TriggerStatus) MarkSubscribed() {
triggerCondSet.Manage(ts).MarkTrue(TriggerConditionSubscribed)
}
Expand Down
60 changes: 5 additions & 55 deletions pkg/apis/eventing/v1alpha1/trigger_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ var (
Status: corev1.ConditionTrue,
}

triggerConditionKubernetesService = duckv1alpha1.Condition{
Type: TriggerConditionKubernetesService,
Status: corev1.ConditionTrue,
}

triggerConditionVirtualService = duckv1alpha1.Condition{
Type: TriggerConditionVirtualService,
Status: corev1.ConditionTrue,
}

triggerConditionSubscribed = duckv1alpha1.Condition{
Type: TriggerConditionSubscribed,
Status: corev1.ConditionFalse,
Expand Down Expand Up @@ -74,19 +64,18 @@ func TestTriggerGetCondition(t *testing.T) {
Status: duckv1alpha1.Status{
Conditions: []duckv1alpha1.Condition{
triggerConditionBrokerExists,
triggerConditionKubernetesService,
triggerConditionSubscribed,
},
},
},
condQuery: TriggerConditionKubernetesService,
want: &triggerConditionKubernetesService,
condQuery: TriggerConditionSubscribed,
want: &triggerConditionSubscribed,
}, {
name: "multiple conditions, condition false",
ts: &TriggerStatus{
Status: duckv1alpha1.Status{
Conditions: []duckv1alpha1.Condition{
triggerConditionBrokerExists,
triggerConditionKubernetesService,
triggerConditionSubscribed,
},
},
Expand All @@ -98,7 +87,6 @@ func TestTriggerGetCondition(t *testing.T) {
ts: &TriggerStatus{
Status: duckv1alpha1.Status{
Conditions: []duckv1alpha1.Condition{
triggerConditionVirtualService,
triggerConditionSubscribed,
},
},
Expand Down Expand Up @@ -130,18 +118,12 @@ func TestTriggerInitializeConditions(t *testing.T) {
Conditions: []duckv1alpha1.Condition{{
Type: TriggerConditionBrokerExists,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionKubernetesService,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionReady,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionSubscribed,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionVirtualService,
Status: corev1.ConditionUnknown,
}},
},
},
Expand All @@ -150,7 +132,7 @@ func TestTriggerInitializeConditions(t *testing.T) {
ts: &TriggerStatus{
Status: duckv1alpha1.Status{
Conditions: []duckv1alpha1.Condition{{
Type: TriggerConditionVirtualService,
Type: TriggerConditionBrokerExists,
Status: corev1.ConditionFalse,
}},
},
Expand All @@ -159,19 +141,13 @@ func TestTriggerInitializeConditions(t *testing.T) {
Status: duckv1alpha1.Status{
Conditions: []duckv1alpha1.Condition{{
Type: TriggerConditionBrokerExists,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionKubernetesService,
Status: corev1.ConditionUnknown,
Status: corev1.ConditionFalse,
}, {
Type: TriggerConditionReady,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionSubscribed,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionVirtualService,
Status: corev1.ConditionFalse,
}},
},
},
Expand All @@ -190,18 +166,12 @@ func TestTriggerInitializeConditions(t *testing.T) {
Conditions: []duckv1alpha1.Condition{{
Type: TriggerConditionBrokerExists,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionKubernetesService,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionReady,
Status: corev1.ConditionUnknown,
}, {
Type: TriggerConditionSubscribed,
Status: corev1.ConditionTrue,
}, {
Type: TriggerConditionVirtualService,
Status: corev1.ConditionUnknown,
}},
},
},
Expand Down Expand Up @@ -239,20 +209,6 @@ func TestTriggerIsReady(t *testing.T) {
markVirtualServiceExists: true,
markSubscribed: true,
wantReady: false,
}, {
name: "k8s service sad",
markBrokerExists: true,
markKubernetesServiceExists: false,
markVirtualServiceExists: true,
markSubscribed: true,
wantReady: false,
}, {
name: "virtual service sad",
markBrokerExists: true,
markKubernetesServiceExists: true,
markVirtualServiceExists: false,
markSubscribed: true,
wantReady: false,
}, {
name: "subscribed sad",
markBrokerExists: true,
Expand All @@ -274,12 +230,6 @@ func TestTriggerIsReady(t *testing.T) {
if test.markBrokerExists {
ts.MarkBrokerExists()
}
if test.markKubernetesServiceExists {
ts.MarkKubernetesServiceExists()
}
if test.markVirtualServiceExists {
ts.MarkVirtualServiceExists()
}
if test.markSubscribed {
ts.MarkSubscribed()
}
Expand Down
24 changes: 7 additions & 17 deletions pkg/broker/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
ceclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client"
cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http"
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/trigger/path"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -120,15 +120,10 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp
}

// tctx.URI is actually the path...
if tctx.URI != "/" {
resp.Status = http.StatusNotFound
return nil
}

triggerRef, err := provisioners.ParseChannel(tctx.Host)
triggerRef, err := path.Parse(tctx.URI)
if err != nil {
r.logger.Error("Unable to parse host as a trigger", zap.Error(err), zap.String("host", tctx.Host))
return errors.New("unable to parse host as a Trigger")
r.logger.Info("Unable to parse path as a trigger", zap.Error(err), zap.String("path", tctx.URI))
return errors.New("unable to parse path as a Trigger")
}

// Remove the TTL attribute that is used by the Broker.
Expand Down Expand Up @@ -170,7 +165,7 @@ func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp
}

// sendEvent sends an event to a subscriber if the trigger filter passes.
func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) {
func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext, trigger types.NamespacedName, event *cloudevents.Event) (*cloudevents.Event, error) {
t, err := r.getTrigger(ctx, trigger)
if err != nil {
r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger))
Expand Down Expand Up @@ -199,14 +194,9 @@ func (r *Receiver) sendEvent(ctx context.Context, tctx cehttp.TransportContext,
return r.ceClient.Send(sendingCTX, *event)
}

func (r *Receiver) getTrigger(ctx context.Context, ref provisioners.ChannelReference) (*eventingv1alpha1.Trigger, error) {
func (r *Receiver) getTrigger(ctx context.Context, ref types.NamespacedName) (*eventingv1alpha1.Trigger, error) {
t := &eventingv1alpha1.Trigger{}
err := r.client.Get(ctx,
types.NamespacedName{
Namespace: ref.Namespace,
Name: ref.Name,
},
t)
err := r.client.Get(ctx, ref, t)
return t, err
}

Expand Down
33 changes: 25 additions & 8 deletions pkg/broker/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ const (
)

var (
host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName())
host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName())
validPath = fmt.Sprintf("/triggers/%s/%s", testNS, triggerName)
)

func init() {
Expand Down Expand Up @@ -87,23 +88,39 @@ func TestReceiver(t *testing.T) {
tctx: &cehttp.TransportContext{
Method: "GET",
Host: host,
URI: "/",
URI: validPath,
},
expectedStatus: http.StatusMethodNotAllowed,
},
"Other path": {
"Path too short": {
tctx: &cehttp.TransportContext{
Method: "POST",
Host: host,
URI: "/someotherEndpoint",
URI: "/test-namespace/test-trigger",
},
expectedStatus: http.StatusNotFound,
expectedErr: true,
},
"Path too long": {
tctx: &cehttp.TransportContext{
Method: "POST",
Host: host,
URI: "/triggers/test-namespace/test-trigger/extra",
},
expectedErr: true,
},
"Path without prefix": {
tctx: &cehttp.TransportContext{
Method: "POST",
Host: host,
URI: "/something/test-namespace/test-trigger",
},
expectedErr: true,
},
"Bad host": {
tctx: &cehttp.TransportContext{
Method: "POST",
Host: "badhost-cant-be-parsed-as-a-trigger-name-plus-namespace",
URI: "/",
URI: validPath,
},
expectedErr: true,
},
Expand Down Expand Up @@ -178,7 +195,7 @@ func TestReceiver(t *testing.T) {
tctx: &cehttp.TransportContext{
Method: "POST",
Host: host,
URI: "/",
URI: validPath,
Header: http.Header{
// foo won't pass filtering.
"foo": []string{"bar"},
Expand Down Expand Up @@ -248,7 +265,7 @@ func TestReceiver(t *testing.T) {
tctx = &cehttp.TransportContext{
Method: http.MethodPost,
Host: host,
URI: "/",
URI: validPath,
}
}
ctx := cehttp.WithTransportContext(context.Background(), *tctx)
Expand Down
14 changes: 8 additions & 6 deletions pkg/reconciler/v1alpha1/broker/resources/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ func MakeFilterDeployment(args *FilterArgs) *appsv1.Deployment {
Kind: "Broker",
}),
},
Labels: filterLabels(args.Broker),
Labels: FilterLabels(args.Broker),
},
Spec: appsv1.DeploymentSpec{
Selector: &metav1.LabelSelector{
MatchLabels: filterLabels(args.Broker),
MatchLabels: FilterLabels(args.Broker),
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: filterLabels(args.Broker),
Labels: FilterLabels(args.Broker),
Annotations: map[string]string{
"sidecar.istio.io/inject": "true",
},
Expand Down Expand Up @@ -91,7 +91,7 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service {
ObjectMeta: metav1.ObjectMeta{
Namespace: b.Namespace,
Name: fmt.Sprintf("%s-broker-filter", b.Name),
Labels: filterLabels(b),
Labels: FilterLabels(b),
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(b, schema.GroupVersionKind{
Group: eventingv1alpha1.SchemeGroupVersion.Group,
Expand All @@ -101,7 +101,7 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service {
},
},
Spec: corev1.ServiceSpec{
Selector: filterLabels(b),
Selector: FilterLabels(b),
Ports: []corev1.ServicePort{
{
Name: "http",
Expand All @@ -113,7 +113,9 @@ func MakeFilterService(b *eventingv1alpha1.Broker) *corev1.Service {
}
}

func filterLabels(b *eventingv1alpha1.Broker) map[string]string {
// FilterLabels generates the labels present on all resources representing the filter of the given
// Broker.
func FilterLabels(b *eventingv1alpha1.Broker) map[string]string {
return map[string]string{
"eventing.knative.dev/broker": b.Name,
"eventing.knative.dev/brokerRole": "filter",
Expand Down
2 changes: 1 addition & 1 deletion pkg/reconciler/v1alpha1/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err
err := r.client.Get(ctx, request.NamespacedName, subscription)

if errors.IsNotFound(err) {
logging.FromContext(ctx).Error("Could not find Subscription")
logging.FromContext(ctx).Info("Could not find Subscription")
return reconcile.Result{}, nil
}

Expand Down
Loading