diff --git a/Gopkg.lock b/Gopkg.lock index 22e74269e7e..5f095636bc3 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1258,6 +1258,7 @@ "github.com/bsm/sarama-cluster", "github.com/cloudevents/sdk-go/pkg/cloudevents", "github.com/cloudevents/sdk-go/pkg/cloudevents/client", + "github.com/cloudevents/sdk-go/pkg/cloudevents/context", "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http", "github.com/cloudevents/sdk-go/pkg/cloudevents/types", "github.com/fsnotify/fsnotify", diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index f60b33340e0..9f03cc9367a 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -58,10 +58,13 @@ func main() { // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - _, runnable := broker.New(logger, mgr.GetClient()) - err = mgr.Add(runnable) + receiver, err := broker.New(logger, mgr.GetClient()) if err != nil { - logger.Fatal("Unable to start the receivers runnable", zap.Error(err), zap.Any("runnable", runnable)) + logger.Fatal("Error creating Receiver", zap.Error(err)) + } + err = mgr.Add(receiver) + if err != nil { + logger.Fatal("Unable to start the receiver", zap.Error(err), zap.Any("receiver", receiver)) } // Set up signals so we handle the first shutdown signal gracefully. diff --git a/pkg/apis/eventing/v1alpha1/broker_types.go b/pkg/apis/eventing/v1alpha1/broker_types.go index fb653cb5e21..65fc76086cf 100644 --- a/pkg/apis/eventing/v1alpha1/broker_types.go +++ b/pkg/apis/eventing/v1alpha1/broker_types.go @@ -66,7 +66,13 @@ type BrokerSpec struct { ChannelTemplate *ChannelSpec `json:"channelTemplate,omitempty"` } -var brokerCondSet = duckv1alpha1.NewLivingConditionSet(BrokerConditionIngress, BrokerConditionChannel, BrokerConditionFilter, BrokerConditionAddressable) +var brokerCondSet = duckv1alpha1.NewLivingConditionSet( + BrokerConditionIngress, + BrokerConditionTriggerChannel, + BrokerConditionIngressChannel, + BrokerConditionFilter, + BrokerConditionAddressable, + BrokerConditionIngressSubscription) // BrokerStatus represents the current state of a Broker. type BrokerStatus struct { @@ -88,7 +94,11 @@ const ( BrokerConditionIngress duckv1alpha1.ConditionType = "IngressReady" - BrokerConditionChannel duckv1alpha1.ConditionType = "ChannelReady" + BrokerConditionTriggerChannel duckv1alpha1.ConditionType = "TriggerChannelReady" + + BrokerConditionIngressChannel duckv1alpha1.ConditionType = "IngressChannelReady" + + BrokerConditionIngressSubscription duckv1alpha1.ConditionType = "IngressSubscriptionReady" BrokerConditionFilter duckv1alpha1.ConditionType = "FilterReady" @@ -118,12 +128,28 @@ func (bs *BrokerStatus) MarkIngressFailed(err error) { brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngress, "failed", "%v", err) } -func (bs *BrokerStatus) MarkChannelReady() { - brokerCondSet.Manage(bs).MarkTrue(BrokerConditionChannel) +func (bs *BrokerStatus) MarkTriggerChannelReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionTriggerChannel) +} + +func (bs *BrokerStatus) MarkTriggerChannelFailed(err error) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionTriggerChannel, "failed", "%v", err) +} + +func (bs *BrokerStatus) MarkIngressChannelReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressChannel) +} + +func (bs *BrokerStatus) MarkIngressChannelFailed(err error) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressChannel, "failed", "%v", err) +} + +func (bs *BrokerStatus) MarkIngressSubscriptionReady() { + brokerCondSet.Manage(bs).MarkTrue(BrokerConditionIngressSubscription) } -func (bs *BrokerStatus) MarkChannelFailed(err error) { - brokerCondSet.Manage(bs).MarkFalse(BrokerConditionChannel, "failed", "%v", err) +func (bs *BrokerStatus) MarkIngressSubscriptionFailed(err error) { + brokerCondSet.Manage(bs).MarkFalse(BrokerConditionIngressSubscription, "failed", "%v", err) } func (bs *BrokerStatus) MarkFilterReady() { diff --git a/pkg/apis/eventing/v1alpha1/broker_types_test.go b/pkg/apis/eventing/v1alpha1/broker_types_test.go index 0f4e695d110..73fa5663e24 100644 --- a/pkg/apis/eventing/v1alpha1/broker_types_test.go +++ b/pkg/apis/eventing/v1alpha1/broker_types_test.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "errors" "testing" "github.com/google/go-cmp/cmp" @@ -24,6 +25,13 @@ import ( corev1 "k8s.io/api/core/v1" ) +var ( + trueVal = true + falseVal = false + + err = errors.New("foobar") +) + var ( brokerConditionReady = duckv1alpha1.Condition{ Type: BrokerConditionReady, @@ -35,8 +43,8 @@ var ( Status: corev1.ConditionTrue, } - brokerConditionChannel = duckv1alpha1.Condition{ - Type: BrokerConditionChannel, + brokerConditionTriggerChannel = duckv1alpha1.Condition{ + Type: BrokerConditionTriggerChannel, Status: corev1.ConditionTrue, } @@ -74,7 +82,7 @@ func TestBrokerGetCondition(t *testing.T) { Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{ brokerConditionIngress, - brokerConditionChannel, + brokerConditionTriggerChannel, brokerConditionFilter, }, }, @@ -86,7 +94,7 @@ func TestBrokerGetCondition(t *testing.T) { bs: &BrokerStatus{ Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{ - brokerConditionChannel, + brokerConditionTriggerChannel, brokerConditionFilter, brokerConditionAddressable, }, @@ -132,17 +140,23 @@ func TestBrokerInitializeConditions(t *testing.T) { Type: BrokerConditionAddressable, Status: corev1.ConditionUnknown, }, { - Type: BrokerConditionChannel, + Type: BrokerConditionFilter, Status: corev1.ConditionUnknown, }, { - Type: BrokerConditionFilter, + Type: BrokerConditionIngressChannel, Status: corev1.ConditionUnknown, }, { Type: BrokerConditionIngress, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionIngressSubscription, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionTriggerChannel, + Status: corev1.ConditionUnknown, }}, }, }, @@ -151,7 +165,7 @@ func TestBrokerInitializeConditions(t *testing.T) { bs: &BrokerStatus{ Status: duckv1alpha1.Status{ Conditions: []duckv1alpha1.Condition{{ - Type: BrokerConditionChannel, + Type: BrokerConditionTriggerChannel, Status: corev1.ConditionFalse, }}, }, @@ -161,18 +175,24 @@ func TestBrokerInitializeConditions(t *testing.T) { Conditions: []duckv1alpha1.Condition{{ Type: BrokerConditionAddressable, Status: corev1.ConditionUnknown, - }, { - Type: BrokerConditionChannel, - Status: corev1.ConditionFalse, }, { Type: BrokerConditionFilter, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionIngressChannel, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionIngress, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionIngressSubscription, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionTriggerChannel, + Status: corev1.ConditionFalse, }}, }, }, @@ -191,22 +211,28 @@ func TestBrokerInitializeConditions(t *testing.T) { Conditions: []duckv1alpha1.Condition{{ Type: BrokerConditionAddressable, Status: corev1.ConditionUnknown, - }, { - Type: BrokerConditionChannel, - Status: corev1.ConditionUnknown, }, { Type: BrokerConditionFilter, Status: corev1.ConditionTrue, + }, { + Type: BrokerConditionIngressChannel, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionIngress, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionIngressSubscription, + Status: corev1.ConditionUnknown, }, { Type: BrokerConditionReady, Status: corev1.ConditionUnknown, + }, { + Type: BrokerConditionTriggerChannel, + Status: corev1.ConditionUnknown, }}, }, - }, - }} + }}, + } for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -220,69 +246,128 @@ func TestBrokerInitializeConditions(t *testing.T) { func TestBrokerIsReady(t *testing.T) { tests := []struct { - name string - markChannelReady bool - markFilterReady bool - markIngressReady bool - address string - wantReady bool + name string + markIngressReady *bool + markTriggerChannelReady *bool + markIngressChannelReady *bool + markFilterReady *bool + address string + markIngressSubscriptionReady *bool + wantReady bool }{{ - name: "all happy", - markChannelReady: true, - markFilterReady: true, - markIngressReady: true, - address: "hostname", - wantReady: true, + name: "all happy", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &trueVal, + markFilterReady: &trueVal, + address: "hostname", + markIngressSubscriptionReady: &trueVal, + wantReady: true, + }, { + name: "ingress sad", + markIngressReady: &falseVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &trueVal, + markFilterReady: &trueVal, + address: "hostname", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { - name: "channel sad", - markChannelReady: false, - markFilterReady: true, - markIngressReady: true, - address: "hostname", - wantReady: false, + name: "trigger channel sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &falseVal, + markIngressChannelReady: &trueVal, + markFilterReady: &trueVal, + address: "hostname", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { - name: "filter sad", - markChannelReady: true, - markFilterReady: false, - markIngressReady: true, - address: "hostname", - wantReady: false, + name: "ingress channel sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &falseVal, + markFilterReady: &trueVal, + address: "hostname", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { - name: "ingress sad", - markChannelReady: true, - markFilterReady: true, - markIngressReady: false, - address: "hostname", - wantReady: false, + name: "filter sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &trueVal, + markFilterReady: &falseVal, + address: "hostname", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { - name: "addressable sad", - markChannelReady: true, - markFilterReady: true, - markIngressReady: true, - address: "", - wantReady: false, + name: "addressable sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &trueVal, + markFilterReady: &trueVal, + address: "", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }, { - name: "all sad", - markChannelReady: false, - markFilterReady: false, - markIngressReady: false, - address: "", - wantReady: false, + name: "ingress subscription sad", + markIngressReady: &trueVal, + markTriggerChannelReady: &trueVal, + markIngressChannelReady: &trueVal, + markFilterReady: &trueVal, + address: "hostname", + markIngressSubscriptionReady: &falseVal, + wantReady: false, + }, { + name: "all sad", + markIngressReady: &falseVal, + markTriggerChannelReady: &falseVal, + markIngressChannelReady: &trueVal, + markFilterReady: &falseVal, + address: "", + markIngressSubscriptionReady: &trueVal, + wantReady: false, }} for _, test := range tests { t.Run(test.name, func(t *testing.T) { - ts := &BrokerStatus{} - if test.markChannelReady { - ts.MarkChannelReady() + bs := &BrokerStatus{} + if test.markIngressReady != nil { + if *test.markIngressReady { + bs.MarkIngressReady() + } else { + bs.MarkIngressFailed(err) + } + } + if test.markTriggerChannelReady != nil { + if *test.markTriggerChannelReady { + bs.MarkTriggerChannelReady() + } else { + bs.MarkTriggerChannelFailed(err) + } } - if test.markFilterReady { - ts.MarkFilterReady() + if test.markIngressChannelReady != nil { + if *test.markIngressChannelReady { + bs.MarkIngressChannelReady() + } else { + bs.MarkIngressChannelFailed(err) + } } - if test.markIngressReady { - ts.MarkIngressReady() + if test.markIngressSubscriptionReady != nil { + if *test.markIngressSubscriptionReady { + bs.MarkIngressSubscriptionReady() + } else { + bs.MarkIngressSubscriptionFailed(err) + } } - ts.SetAddress(test.address) - got := ts.IsReady() + if test.markFilterReady != nil { + if *test.markFilterReady { + bs.MarkFilterReady() + } else { + bs.MarkFilterFailed(err) + } + } + bs.SetAddress(test.address) + + got := bs.IsReady() if test.wantReady != got { t.Errorf("unexpected readiness: want %v, got %v", test.wantReady, got) } diff --git a/pkg/broker/receiver.go b/pkg/broker/receiver.go index c1d4b22578b..8d06df8c9d9 100644 --- a/pkg/broker/receiver.go +++ b/pkg/broker/receiver.go @@ -19,82 +19,167 @@ package broker import ( "context" "errors" + "net/http" + "net/url" + "time" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + ceclient "github.com/cloudevents/sdk-go/pkg/cloudevents/client" + cecontext "github.com/cloudevents/sdk-go/pkg/cloudevents/context" + cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/manager" +) + +const ( + defaultPort = 8080 + + writeTimeout = 1 * time.Minute ) // Receiver parses Cloud Events, determines if they pass a filter, and sends them to a subscriber. type Receiver struct { - logger *zap.Logger - client client.Client - - dispatcher provisioners.Dispatcher + logger *zap.Logger + client client.Client + ceClient ceclient.Client + ceHttp *cehttp.Transport } // New creates a new Receiver and its associated MessageReceiver. The caller is responsible for // Start()ing the returned MessageReceiver. -func New(logger *zap.Logger, client client.Client) (*Receiver, manager.Runnable) { +func New(logger *zap.Logger, client client.Client) (*Receiver, error) { + ceHttp, err := cehttp.New(cehttp.WithBinaryEncoding(), cehttp.WithPort(defaultPort)) + if err != nil { + return nil, err + } + ceClient, err := ceclient.New(ceHttp) + if err != nil { + return nil, err + } + r := &Receiver{ - logger: logger, - client: client, - dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), + logger: logger, + client: client, + ceClient: ceClient, + ceHttp: ceHttp, + } + err = r.initClient() + if err != nil { + return nil, err } - return r, r.newMessageReceiver() + + return r, nil } -func (r *Receiver) newMessageReceiver() *provisioners.MessageReceiver { - if err := r.initClient(); err != nil { - r.logger.Warn("Failed to initialize client", zap.Error(err)) +// Initialize the client. Mainly intended to load stuff in its cache. +func (r *Receiver) initClient() error { + // We list triggers so that we do not drop messages. Otherwise, on receiving an event, it + // may not find the Trigger and would return an error. + opts := &client.ListOptions{} + tl := &eventingv1alpha1.TriggerList{} + if err := r.client.List(context.TODO(), opts, tl); err != nil { + return err } - return provisioners.NewMessageReceiver(r.sendEvent, r.logger.Sugar()) + return nil } -// sendEvent sends an event to a subscriber if the trigger filter passes. -func (r *Receiver) sendEvent(trigger provisioners.ChannelReference, message *provisioners.Message) error { - r.logger.Debug("Received message", zap.Any("triggerRef", trigger)) - ctx := context.Background() +// Start begins to receive messages for the receiver. +// +// Only HTTP POST requests to the root path (/) are accepted. If other paths or +// methods are needed, use the HandleRequest method directly with another HTTP +// server. +// +// This method will block until a message is received on the stop channel. +func (r *Receiver) Start(stopCh <-chan struct{}) error { + ctx, cancel := context.WithCancel(context.Background()) + defer ctx.Done() - t, err := r.getTrigger(ctx, trigger) - if err != nil { - r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) + errCh := make(chan error, 1) + go func() { + errCh <- r.ceClient.StartReceiver(ctx, r.serveHTTP) + }() + + // Stop either if the receiver stops (sending to errCh) or if stopCh is closed. + select { + case err := <-errCh: return err + case <-stopCh: + break } - subscriberURI := t.Status.SubscriberURI - if subscriberURI == "" { - r.logger.Error("Unable to read subscriberURI") - return errors.New("unable to read subscriberURI") + // stopCh has been closed, we need to gracefully shutdown h.ceClient. cancel() will start its + // shutdown, if it hasn't finished in a reasonable amount of time, just return an error. + cancel() + select { + case err := <-errCh: + return err + case <-time.After(writeTimeout): + return errors.New("timeout shutting down ceClient") } +} - if !r.shouldSendMessage(&t.Spec, message) { - r.logger.Debug("Message did not pass filter", zap.Any("triggerRef", trigger)) +func (r *Receiver) serveHTTP(ctx context.Context, event cloudevents.Event, resp *cloudevents.EventResponse) error { + tctx := cehttp.TransportContextFrom(ctx) + if tctx.Method != http.MethodPost { + resp.Status = http.StatusMethodNotAllowed return nil } - err = r.dispatcher.DispatchMessage(message, subscriberURI, "", provisioners.DispatchDefaults{}) + // tctx.URI is actually the path... + if tctx.URI != "/" { + resp.Status = http.StatusNotFound + return nil + } + + triggerRef, err := provisioners.ParseChannel(tctx.Host) + if err != nil { + r.logger.Error("Unable to parse host as a trigger", zap.Error(err), zap.String("host", tctx.Host)) + return errors.New("unable to parse host as a Trigger") + } + + r.logger.Debug("Received message", zap.Any("triggerRef", triggerRef)) + + responseEvent, err := r.sendEvent(ctx, triggerRef, &event) if err != nil { - r.logger.Info("Failed to dispatch message", zap.Error(err), zap.Any("triggerRef", trigger)) + r.logger.Error("Error sending the event", zap.Error(err)) return err } - r.logger.Debug("Successfully sent message", zap.Any("triggerRef", trigger)) + resp.Status = http.StatusAccepted + resp.Event = responseEvent return nil } -// Initialize the client. Mainly intended to create the informer/indexer in order not to drop messages. -func (r *Receiver) initClient() error { - // We list triggers so that we do not drop messages. Otherwise, on receiving an event, it - // may not find the trigger and would return an error. - opts := &client.ListOptions{} - tl := &eventingv1alpha1.TriggerList{} - if err := r.client.List(context.TODO(), opts, tl); err != nil { - return err +// sendEvent sends an event to a subscriber if the trigger filter passes. +func (r *Receiver) sendEvent(ctx context.Context, trigger provisioners.ChannelReference, event *cloudevents.Event) (*cloudevents.Event, error) { + t, err := r.getTrigger(ctx, trigger) + if err != nil { + r.logger.Info("Unable to get the Trigger", zap.Error(err), zap.Any("triggerRef", trigger)) + return nil, err } - return nil + + subscriberURIString := t.Status.SubscriberURI + if subscriberURIString == "" { + r.logger.Error("Unable to read subscriberURI") + return nil, errors.New("unable to read subscriberURI") + } + // We could just send the request to this URI regardless, but let's just check to see if it well + // formed first, that way we can generate better error message if it isn't. + subscriberURI, err := url.Parse(subscriberURIString) + if err != nil { + r.logger.Error("Unable to parse subscriberURI", zap.Error(err), zap.String("subscriberURIString", subscriberURIString)) + return nil, err + } + + if !r.shouldSendMessage(&t.Spec, event) { + r.logger.Debug("Message did not pass filter", zap.Any("triggerRef", trigger)) + return nil, nil + } + + sendingCtx := cecontext.WithTarget(ctx, subscriberURI.String()) + return r.ceHttp.Send(sendingCtx, *event) } func (r *Receiver) getTrigger(ctx context.Context, ref provisioners.ChannelReference) (*eventingv1alpha1.Trigger, error) { @@ -110,33 +195,21 @@ func (r *Receiver) getTrigger(ctx context.Context, ref provisioners.ChannelRefer // shouldSendMessage determines whether message 'm' should be sent based on the triggerSpec 'ts'. // Currently it supports exact matching on type and/or source of events. -func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, m *provisioners.Message) bool { +func (r *Receiver) shouldSendMessage(ts *eventingv1alpha1.TriggerSpec, event *cloudevents.Event) bool { if ts.Filter == nil || ts.Filter.SourceAndType == nil { r.logger.Error("No filter specified") return false } filterType := ts.Filter.SourceAndType.Type - // TODO the inspection of Headers should be removed once we start using the cloud events SDK. - cloudEventType := "" - if et, ok := m.Headers["Ce-Eventtype"]; ok { - // cloud event spec v0.1. - cloudEventType = et - } else if et, ok := m.Headers["Ce-Type"]; ok { - // cloud event spec v0.2. - cloudEventType = et - } - if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != cloudEventType { - r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("message.type", cloudEventType)) + if filterType != eventingv1alpha1.TriggerAnyFilter && filterType != event.Type() { + r.logger.Debug("Wrong type", zap.String("trigger.spec.filter.sourceAndType.type", filterType), zap.String("event.Type()", event.Type())) return false } filterSource := ts.Filter.SourceAndType.Source - cloudEventSource := "" - // cloud event spec v0.1 and v0.2. - if es, ok := m.Headers["Ce-Source"]; ok { - cloudEventSource = es - } - if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != cloudEventSource { - r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", cloudEventSource)) + s := event.Context.AsV01().Source + actualSource := s.String() + if filterSource != eventingv1alpha1.TriggerAnyFilter && filterSource != actualSource { + r.logger.Debug("Wrong source", zap.String("trigger.spec.filter.sourceAndType.source", filterSource), zap.String("message.source", actualSource)) return false } return true diff --git a/pkg/broker/receiver_test.go b/pkg/broker/receiver_test.go index d98edb6be45..56ce6b17685 100644 --- a/pkg/broker/receiver_test.go +++ b/pkg/broker/receiver_test.go @@ -17,133 +17,268 @@ package broker import ( + "context" "errors" "fmt" "net/http" "net/http/httptest" - "strings" + "net/url" "testing" - "github.com/knative/eventing/pkg/utils" + "sigs.k8s.io/controller-runtime/pkg/client" - "github.com/knative/eventing/pkg/provisioners" + controllertesting "github.com/knative/eventing/pkg/reconciler/testing" - eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" - "k8s.io/client-go/kubernetes/scheme" + "github.com/google/go-cmp/cmp" - "go.uber.org/zap" + "github.com/cloudevents/sdk-go/pkg/cloudevents" + "github.com/cloudevents/sdk-go/pkg/cloudevents/types" + cehttp "github.com/cloudevents/sdk-go/pkg/cloudevents/transport/http" + eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" + "github.com/knative/eventing/pkg/utils" + "go.uber.org/zap" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) const ( testNS = "test-namespace" triggerName = "test-trigger" - eventType = `"com.example.someevent"` - eventSource = `"/mycontext"` + eventType = `com.example.someevent` + eventSource = `/mycontext` + + toBeReplaced = "toBeReplaced" +) + +var ( + host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName()) ) func init() { // Add types to scheme. - eventingv1alpha1.AddToScheme(scheme.Scheme) + _ = eventingv1alpha1.AddToScheme(scheme.Scheme) } func TestReceiver(t *testing.T) { testCases := map[string]struct { - initialState []runtime.Object - dispatchErr error + triggers []*eventingv1alpha1.Trigger + mocks controllertesting.Mocks + tctx *cehttp.TransportContext + requestFails bool + returnedEvent *cloudevents.Event + expectNewToFail bool expectedErr bool expectedDispatch bool + expectedStatus int }{ + "Cannot init": { + mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, _ *client.ListOptions, _ runtime.Object) (controllertesting.MockHandled, error) { + return controllertesting.Handled, errors.New("test induced error") + }, + }, + }, + expectNewToFail: true, + }, + "Not POST": { + tctx: &cehttp.TransportContext{ + Method: "GET", + Host: host, + URI: "/", + }, + expectedStatus: http.StatusMethodNotAllowed, + }, + "Other path": { + tctx: &cehttp.TransportContext{ + Method: "POST", + Host: host, + URI: "/someotherEndpoint", + }, + expectedStatus: http.StatusNotFound, + }, + "Bad host": { + tctx: &cehttp.TransportContext{ + Method: "POST", + Host: "badhost-cant-be-parsed-as-a-trigger-name-plus-namespace", + URI: "/", + }, + expectedErr: true, + }, "Trigger.Get fails": { // No trigger exists, so the Get will fail. expectedErr: true, }, "Trigger doesn't have SubscriberURI": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTriggerWithoutSubscriberURI(), }, expectedErr: true, }, + "Trigger with bad SubscriberURI": { + triggers: []*eventingv1alpha1.Trigger{ + makeTriggerWithBadSubscriberURI(), + }, + expectedErr: true, + }, "Trigger without a Filter": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTriggerWithoutFilter(), }, }, "Wrong type": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTrigger("some-other-type", "Any"), }, }, "Wrong source": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTrigger("Any", "some-other-source"), }, }, "Dispatch failed": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTrigger("Any", "Any"), }, - dispatchErr: errors.New("test error dispatching"), + requestFails: true, expectedErr: true, expectedDispatch: true, }, "Dispatch succeeded - Any": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTrigger("Any", "Any"), }, expectedDispatch: true, }, "Dispatch succeeded - Specific": { - initialState: []runtime.Object{ + triggers: []*eventingv1alpha1.Trigger{ makeTrigger(eventType, eventSource), }, expectedDispatch: true, }, + "Returned Cloud Event": { + triggers: []*eventingv1alpha1.Trigger{ + makeTrigger("Any", "Any"), + }, + expectedDispatch: true, + returnedEvent: makeDifferentEvent(), + }, } for n, tc := range testCases { t.Run(n, func(t *testing.T) { - // Support cloud spec v0.1 and v0.2 requests. - requests := [2]*http.Request{makeV01Request(), makeV02Request()} - for _, request := range requests { - mr, _ := New( - zap.NewNop(), - fake.NewFakeClient(tc.initialState...)) - fd := &fakeDispatcher{ - err: tc.dispatchErr, + + fh := fakeHandler{ + failRequest: tc.requestFails, + returnedEvent: tc.returnedEvent, + } + s := httptest.NewServer(&fh) + defer s.Client() + + // Replace the SubscriberURI to point at our fake server. + correctURI := make([]runtime.Object, 0, len(tc.triggers)) + for _, trig := range tc.triggers { + if trig.Status.SubscriberURI == toBeReplaced { + trig.Status.SubscriberURI = s.URL + } + correctURI = append(correctURI, trig) + } + + r, err := New( + zap.NewNop(), + getClient(correctURI, tc.mocks)) + if tc.expectNewToFail { + if err == nil { + t.Fatal("Expected New to fail, it didn't") } - mr.dispatcher = fd - - resp := httptest.NewRecorder() - mr.newMessageReceiver().HandleRequest(resp, request) - if tc.expectedErr { - if resp.Result().StatusCode >= 200 && resp.Result().StatusCode < 300 { - t.Errorf("Expected an error. Actual: %v", resp.Result()) - } - } else { - if resp.Result().StatusCode < 200 || resp.Result().StatusCode >= 300 { - t.Errorf("Expected success. Actual: %v", resp.Result()) - } + return + } else if err != nil { + t.Fatalf("Unable to create receiver: %v", err) + } + + tctx := tc.tctx + if tctx == nil { + tctx = &cehttp.TransportContext{ + Method: http.MethodPost, + Host: host, + URI: "/", } - if tc.expectedDispatch != fd.requestReceived { - t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fd.requestReceived) + } + ctx := cehttp.WithTransportContext(context.Background(), *tctx) + resp := &cloudevents.EventResponse{} + err = r.serveHTTP(ctx, makeEvent(), resp) + + if tc.expectedErr && err == nil { + t.Errorf("Expected an error, received nil") + } else if !tc.expectedErr && err != nil { + t.Errorf("Expected no error, received %v", err) + } + + if tc.expectedStatus != 0 && tc.expectedStatus != resp.Status { + t.Errorf("Unexpected status. Expected %v. Actual %v.", tc.expectedStatus, resp.Status) + } + if tc.expectedDispatch != fh.requestReceived { + t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived) + } + + // Compare the returned event. + if tc.returnedEvent == nil { + if resp.Event != nil { + t.Errorf("Unexpected response event: %v", resp.Event) } + return + } + if diff := cmp.Diff(tc.returnedEvent.Context.AsV02(), resp.Event.Context.AsV02()); diff != "" { + t.Errorf("Incorrect response event context (-want +got): %s", diff) + } + if diff := cmp.Diff(tc.returnedEvent.Data, resp.Event.Data); diff != "" { + t.Errorf("Incorrect response event data (-want +got): %s", diff) } }) } } -type fakeDispatcher struct { - err error +type fakeHandler struct { + failRequest bool requestReceived bool + returnedEvent *cloudevents.Event + t testing.T } -func (d *fakeDispatcher) DispatchMessage(_ *provisioners.Message, _, _ string, _ provisioners.DispatchDefaults) error { - d.requestReceived = true - return d.err +func (h *fakeHandler) ServeHTTP(resp http.ResponseWriter, _ *http.Request) { + h.requestReceived = true + if h.failRequest { + resp.WriteHeader(http.StatusBadRequest) + return + } + if h.returnedEvent == nil { + resp.WriteHeader(http.StatusAccepted) + return + } + + c := &cehttp.CodecV02{} + m, err := c.Encode(*h.returnedEvent) + if err != nil { + h.t.Fatalf("Could not encode message: %v", err) + } + msg := m.(*cehttp.Message) + for k, vs := range msg.Header { + resp.Header().Del(k) + for _, v := range vs { + resp.Header().Set(k, v) + } + } + _, err = resp.Write(msg.Body) + if err != nil { + h.t.Fatalf("Unable to write body: %v", err) + } +} + +func getClient(initial []runtime.Object, mocks controllertesting.Mocks) *controllertesting.MockClient { + innerClient := fake.NewFakeClient(initial...) + return controllertesting.NewMockClient(innerClient, mocks) } func makeTrigger(t, s string) *eventingv1alpha1.Trigger { @@ -165,7 +300,7 @@ func makeTrigger(t, s string) *eventingv1alpha1.Trigger { }, }, Status: eventingv1alpha1.TriggerStatus{ - SubscriberURI: "subscriberURI", + SubscriberURI: "toBeReplaced", }, } } @@ -182,29 +317,38 @@ func makeTriggerWithoutSubscriberURI() *eventingv1alpha1.Trigger { return t } -func makeRequest(cloudEventVersionValue, eventTypeVersionValue, eventTypeKey, eventSourceKey string) *http.Request { - req := httptest.NewRequest("POST", "/", strings.NewReader(``)) - req.Host = fmt.Sprintf("%s.%s.triggers.%s", triggerName, testNS, utils.GetClusterDomainName()) - - eventAttributes := map[string]string{ - "CE-CloudEventsVersion": cloudEventVersionValue, - eventTypeKey: eventType, - "CE-EventTypeVersion": eventTypeVersionValue, - eventSourceKey: eventSource, - "CE-EventID": `"A234-1234-1234"`, - "CE-EventTime": `"2018-04-05T17:31:00Z"`, - "contentType": "text/xml", - } - for k, v := range eventAttributes { - req.Header.Set(k, v) - } - return req +func makeTriggerWithBadSubscriberURI() *eventingv1alpha1.Trigger { + t := makeTrigger("Any", "Any") + // This should fail url.Parse(). It was taken from the unit tests for url.Parse(), it violates + // rfc3986 3.2.3, namely that the port must be digits. + t.Status.SubscriberURI = "http://[::1]:namedport" + return t } -func makeV01Request() *http.Request { - return makeRequest(`"0.1"`, `"1.0"`, "CE-EventType", "CE-Source") +func makeEvent() cloudevents.Event { + return cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: eventType, + Source: types.URLRef{ + URL: url.URL{ + Path: eventSource, + }, + }, + ContentType: cloudevents.StringOfApplicationJSON(), + }, + } } -func makeV02Request() *http.Request { - return makeRequest(`"0.2"`, `"2.0"`, "ce-type", "ce-source") +func makeDifferentEvent() *cloudevents.Event { + return &cloudevents.Event{ + Context: cloudevents.EventContextV02{ + Type: "some-other-type", + Source: types.URLRef{ + URL: url.URL{ + Path: eventSource, + }, + }, + ContentType: cloudevents.StringOfApplicationJSON(), + }, + } } diff --git a/pkg/reconciler/v1alpha1/broker/broker.go b/pkg/reconciler/v1alpha1/broker/broker.go index 343abfc699f..4f0842d0143 100644 --- a/pkg/reconciler/v1alpha1/broker/broker.go +++ b/pkg/reconciler/v1alpha1/broker/broker.go @@ -52,8 +52,10 @@ const ( controllerAgentName = "broker-controller" // Name of the corev1.Events emitted from the reconciliation process. - brokerReconciled = "BrokerReconciled" - brokerUpdateStatusFailed = "BrokerUpdateStatusFailed" + brokerReconciled = "BrokerReconciled" + brokerUpdateStatusFailed = "BrokerUpdateStatusFailed" + ingressSubscriptionDeleteFailed = "IngressSubscriptionDeleteFailed" + ingressSubscriptionCreateFailed = "IngressSubscriptionCreateFailed" ) type reconciler struct { @@ -164,27 +166,33 @@ func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, err func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconcile.Result, error) { b.Status.InitializeConditions() - // 1. Channel is created for all events. + // 1. Trigger Channel is created for all events. Triggers will Subscribe to this Channel. // 2. Filter Deployment. // 3. Ingress Deployment. // 4. K8s Services that point at the Deployments. + // 5. Ingress Channel is created to get events from Triggers back into this Broker via the + // Ingress Deployment. + // - Ideally this wouldn't exist and we would point the Trigger's reply directly to the K8s + // Service. However, Subscriptions only allow us to send replies to Channels, so we need + // this as an intermediary. + // 6. Subscription from the Ingress Channel to the Ingress Service. if b.DeletionTimestamp != nil { // Everything is cleaned up by the garbage collector. return reconcile.Result{}, nil } - c, err := r.reconcileChannel(ctx, b) + triggerChan, err := r.reconcileTriggerChannel(ctx, b) if err != nil { - logging.FromContext(ctx).Error("Problem reconciling the channel", zap.Error(err)) - b.Status.MarkChannelFailed(err) + logging.FromContext(ctx).Error("Problem reconciling the trigger channel", zap.Error(err)) + b.Status.MarkTriggerChannelFailed(err) return reconcile.Result{}, err - } else if c.Status.Address.Hostname == "" { - logging.FromContext(ctx).Info("Channel is not yet ready", zap.Any("c", c)) + } else if triggerChan.Status.Address.Hostname == "" { + logging.FromContext(ctx).Info("Trigger Channel is not yet ready", zap.Any("triggerChan", triggerChan)) // Give the Channel some time to get its address. One second was chosen arbitrarily. return reconcile.Result{RequeueAfter: time.Second}, nil } - b.Status.MarkChannelReady() + b.Status.MarkTriggerChannelReady() _, err = r.reconcileFilterDeployment(ctx, b) if err != nil { @@ -200,7 +208,7 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci } b.Status.MarkFilterReady() - _, err = r.reconcileIngressDeployment(ctx, b, c) + _, err = r.reconcileIngressDeployment(ctx, b, triggerChan) if err != nil { logging.FromContext(ctx).Error("Problem reconciling ingress Deployment", zap.Error(err)) b.Status.MarkIngressFailed(err) @@ -216,6 +224,26 @@ func (r *reconciler) reconcile(ctx context.Context, b *v1alpha1.Broker) (reconci b.Status.MarkIngressReady() b.Status.SetAddress(names.ServiceHostName(svc.Name, svc.Namespace)) + ingressChan, err := r.reconcileIngressChannel(ctx, b) + if err != nil { + logging.FromContext(ctx).Error("Problem reconciling the ingress channel", zap.Error(err)) + b.Status.MarkIngressChannelFailed(err) + return reconcile.Result{}, err + } else if ingressChan.Status.Address.Hostname == "" { + logging.FromContext(ctx).Info("Ingress Channel is not yet ready", zap.Any("ingressChan", ingressChan)) + // Give the Channel some time to get its address. One second was chosen arbitrarily. + return reconcile.Result{RequeueAfter: time.Second}, nil + } + b.Status.MarkIngressChannelReady() + + _, err = r.reconcileIngressSubscription(ctx, b, ingressChan, svc) + if err != nil { + logging.FromContext(ctx).Error("Problem reconciling the ingress subscription", zap.Error(err)) + b.Status.MarkIngressSubscriptionFailed(err) + return reconcile.Result{}, err + } + b.Status.MarkIngressSubscriptionReady() + return reconcile.Result{}, nil } @@ -275,12 +303,26 @@ func (r *reconciler) reconcileFilterService(ctx context.Context, b *v1alpha1.Bro return r.reconcileService(ctx, expected) } +func (r *reconciler) reconcileTriggerChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { + get := func() (*v1alpha1.Channel, error) { + return r.getChannel(ctx, b, labels.SelectorFromSet(TriggerChannelLabels(b))) + } + return r.reconcileChannel(ctx, get, newTriggerChannel(b)) +} + +func (r *reconciler) reconcileIngressChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { + get := func() (*v1alpha1.Channel, error) { + return r.getChannel(ctx, b, labels.SelectorFromSet(IngressChannelLabels(b))) + } + return r.reconcileChannel(ctx, get, newIngressChannel(b)) +} + // reconcileChannel reconciles Broker's 'b' underlying channel. -func (r *reconciler) reconcileChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { - c, err := r.getChannel(ctx, b) +func (r *reconciler) reconcileChannel(ctx context.Context, get func() (*v1alpha1.Channel, error), newChan *v1alpha1.Channel) (*v1alpha1.Channel, error) { + c, err := get() // If the resource doesn't exist, we'll create it if k8serrors.IsNotFound(err) { - c = newChannel(b) + c = newChan err = r.client.Create(ctx, c) if err != nil { return nil, err @@ -305,11 +347,11 @@ func (r *reconciler) reconcileChannel(ctx context.Context, b *v1alpha1.Broker) ( } // getChannel returns the Channel object for Broker 'b' if exists, otherwise it returns an error. -func (r *reconciler) getChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { +func (r *reconciler) getChannel(ctx context.Context, b *v1alpha1.Broker, ls labels.Selector) (*v1alpha1.Channel, error) { list := &v1alpha1.ChannelList{} opts := &runtimeclient.ListOptions{ Namespace: b.Namespace, - LabelSelector: labels.SelectorFromSet(ChannelLabels(b)), + LabelSelector: ls, // Set Raw because if we need to get more than one page, then we will put the continue token // into opts.Raw.Continue. Raw: &metav1.ListOptions{}, @@ -328,8 +370,16 @@ func (r *reconciler) getChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alp return nil, k8serrors.NewNotFound(schema.GroupResource{}, "") } +func newTriggerChannel(b *v1alpha1.Broker) *v1alpha1.Channel { + return newChannel(b, TriggerChannelLabels(b)) +} + +func newIngressChannel(b *v1alpha1.Broker) *v1alpha1.Channel { + return newChannel(b, IngressChannelLabels(b)) +} + // newChannel creates a new Channel for Broker 'b'. -func newChannel(b *v1alpha1.Broker) *v1alpha1.Channel { +func newChannel(b *v1alpha1.Broker, l map[string]string) *v1alpha1.Channel { var spec v1alpha1.ChannelSpec if b.Spec.ChannelTemplate != nil { spec = *b.Spec.ChannelTemplate @@ -339,7 +389,7 @@ func newChannel(b *v1alpha1.Broker) *v1alpha1.Channel { ObjectMeta: metav1.ObjectMeta{ Namespace: b.Namespace, GenerateName: fmt.Sprintf("%s-broker-", b.Name), - Labels: ChannelLabels(b), + Labels: l, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(b, schema.GroupVersionKind{ Group: v1alpha1.SchemeGroupVersion.Group, @@ -352,13 +402,20 @@ func newChannel(b *v1alpha1.Broker) *v1alpha1.Channel { } } -func ChannelLabels(b *v1alpha1.Broker) map[string]string { +func TriggerChannelLabels(b *v1alpha1.Broker) map[string]string { return map[string]string{ "eventing.knative.dev/broker": b.Name, "eventing.knative.dev/brokerEverything": "true", } } +func IngressChannelLabels(b *v1alpha1.Broker) map[string]string { + return map[string]string{ + "eventing.knative.dev/broker": b.Name, + "eventing.knative.dev/brokerIngress": "true", + } +} + // reconcileDeployment reconciles the K8s Deployment 'd'. func (r *reconciler) reconcileDeployment(ctx context.Context, d *v1.Deployment) (*v1.Deployment, error) { name := types.NamespacedName{ @@ -434,3 +491,105 @@ func (r *reconciler) reconcileIngressService(ctx context.Context, b *v1alpha1.Br expected := resources.MakeIngressService(b) return r.reconcileService(ctx, expected) } + +func (r *reconciler) reconcileIngressSubscription(ctx context.Context, b *v1alpha1.Broker, c *v1alpha1.Channel, svc *corev1.Service) (*v1alpha1.Subscription, error) { + expected := makeSubscription(b, c, svc) + + sub, err := r.getIngressSubscription(ctx, b) + // If the resource doesn't exist, we'll create it + if k8serrors.IsNotFound(err) { + sub = expected + err = r.client.Create(ctx, sub) + if err != nil { + return nil, err + } + return sub, nil + } else if err != nil { + return nil, err + } + + // Update Subscription if it has changed. Ignore the generation. + expected.Spec.DeprecatedGeneration = sub.Spec.DeprecatedGeneration + if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { + // Given that spec.channel is immutable, we cannot just update the subscription. We delete + // it instead, and re-create it. + err = r.client.Delete(ctx, sub) + if err != nil { + logging.FromContext(ctx).Info("Cannot delete subscription", zap.Error(err)) + r.recorder.Eventf(b, corev1.EventTypeWarning, ingressSubscriptionDeleteFailed, "Delete Broker Ingress' subscription failed: %v", err) + return nil, err + } + sub = expected + err = r.client.Create(ctx, sub) + if err != nil { + logging.FromContext(ctx).Info("Cannot create subscription", zap.Error(err)) + r.recorder.Eventf(b, corev1.EventTypeWarning, ingressSubscriptionCreateFailed, "Create Broker Ingress' subscription failed: %v", err) + return nil, err + } + } + return sub, nil +} + +// getSubscription returns the subscription of trigger 't' if exists, +// otherwise it returns an error. +func (r *reconciler) getIngressSubscription(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Subscription, error) { + list := &v1alpha1.SubscriptionList{} + opts := &runtimeclient.ListOptions{ + Namespace: b.Namespace, + LabelSelector: labels.SelectorFromSet(ingressSubscriptionLabels(b)), + // Set Raw because if we need to get more than one page, then we will put the continue token + // into opts.Raw.Continue. + Raw: &metav1.ListOptions{}, + } + + err := r.client.List(ctx, opts, list) + if err != nil { + return nil, err + } + for _, s := range list.Items { + if metav1.IsControlledBy(&s, b) { + return &s, nil + } + } + + return nil, k8serrors.NewNotFound(schema.GroupResource{}, "") +} + +// makeSubscription returns a placeholder subscription for trigger 't', channel 'c', and service 'svc'. +func makeSubscription(b *v1alpha1.Broker, c *v1alpha1.Channel, svc *corev1.Service) *v1alpha1.Subscription { + return &v1alpha1.Subscription{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: b.Namespace, + GenerateName: fmt.Sprintf("internal-ingress-%s-", b.Name), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(b, schema.GroupVersionKind{ + Group: v1alpha1.SchemeGroupVersion.Group, + Version: v1alpha1.SchemeGroupVersion.Version, + Kind: "Broker", + }), + }, + Labels: ingressSubscriptionLabels(b), + }, + Spec: v1alpha1.SubscriptionSpec{ + Channel: corev1.ObjectReference{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: c.Name, + }, + Subscriber: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: svc.Name, + }, + }, + }, + } +} + +func ingressSubscriptionLabels(b *v1alpha1.Broker) map[string]string { + return map[string]string{ + "eventing.knative.dev/broker": b.Name, + "eventing.knative.dev/brokerIngress": "true", + } +} diff --git a/pkg/reconciler/v1alpha1/broker/broker_test.go b/pkg/reconciler/v1alpha1/broker/broker_test.go index 42d4fa7958f..c6e7a9799b5 100644 --- a/pkg/reconciler/v1alpha1/broker/broker_test.go +++ b/pkg/reconciler/v1alpha1/broker/broker_test.go @@ -24,16 +24,17 @@ import ( "testing" "time" - "github.com/knative/eventing/pkg/utils" - + "github.com/google/go-cmp/cmp" "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker/resources" + "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" "go.uber.org/zap" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" @@ -60,11 +61,22 @@ var ( Name: "my-provisioner", } - channelHostname = fmt.Sprintf("foo.bar.svc.%s", utils.GetClusterDomainName()) + triggerChannelHostname = fmt.Sprintf("foo.bar.svc.%s", utils.GetClusterDomainName()) + ingressChannelHostname = fmt.Sprintf("baz.qux.svc.%s", utils.GetClusterDomainName()) + + ingressChannelName = "ingress-channel" // deletionTime is used when objects are marked as deleted. Rfc3339Copy() // truncates to seconds to match the loss of precision during serialization. deletionTime = metav1.Now().Rfc3339Copy() + + // Map of events to set test cases' expectations easier. + events = map[string]corev1.Event{ + brokerReconciled: {Reason: brokerReconciled, Type: corev1.EventTypeNormal}, + brokerUpdateStatusFailed: {Reason: brokerUpdateStatusFailed, Type: corev1.EventTypeWarning}, + ingressSubscriptionDeleteFailed: {Reason: ingressSubscriptionDeleteFailed, Type: corev1.EventTypeWarning}, + ingressSubscriptionCreateFailed: {Reason: ingressSubscriptionCreateFailed, Type: corev1.EventTypeWarning}, + } ) func init() { @@ -141,25 +153,29 @@ func TestReconcile(t *testing.T) { }, }, { - Name: "Channel.List error", + Name: "Trigger Channel.List error", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), }, Mocks: controllertesting.Mocks{ MockLists: []controllertesting.MockList{ - func(_ client.Client, _ context.Context, _ *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := list.(*v1alpha1.ChannelList); ok { - return controllertesting.Handled, errors.New("test error listing channels") + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + // Only match the Trigger Channel labels. + ls := labels.FormatLabels(TriggerChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + + if _, ok := list.(*v1alpha1.ChannelList); ok && opts.LabelSelector.Matches(l) { + return controllertesting.Handled, errors.New("test error getting Trigger Channel") } return controllertesting.Unhandled, nil }, }, }, - WantErrMsg: "test error listing channels", + WantErrMsg: "test error getting Trigger Channel", }, { - Name: "Channel.Create error", + Name: "Trigger Channel.Create error", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), @@ -167,28 +183,30 @@ func TestReconcile(t *testing.T) { Mocks: controllertesting.Mocks{ MockCreates: []controllertesting.MockCreate{ func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := obj.(*v1alpha1.Channel); ok { - return controllertesting.Handled, errors.New("test error creating Channel") + if c, ok := obj.(*v1alpha1.Channel); ok { + if cmp.Equal(c.Labels, TriggerChannelLabels(makeBroker())) { + return controllertesting.Handled, errors.New("test error creating Trigger Channel") + } } return controllertesting.Unhandled, nil }, }, }, - WantErrMsg: "test error creating Channel", + WantErrMsg: "test error creating Trigger Channel", }, { - Name: "Channel is different than expected", + Name: "Trigger Channel is different than expected", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeDifferentChannel(), + makeDifferentTriggerChannel(), }, WantPresent: []runtime.Object{ // This is special because the Channel is not updated, unlike most things that // differ from expected. // TODO uncomment the following line once our test framework supports searching for // GenerateName. - // makeDifferentChannel(), + // makeDifferentTriggerChannel(), }, WantEvent: []corev1.Event{ { @@ -197,11 +215,11 @@ func TestReconcile(t *testing.T) { }, }, { - Name: "Channel is not yet Addressable", + Name: "Trigger Channel is not yet Addressable", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeNonAddressableChannel(), + makeNonAddressableTriggerChannel(), }, WantResult: reconcile.Result{RequeueAfter: time.Second}, }, @@ -210,7 +228,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ @@ -231,7 +249,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockCreates: []controllertesting.MockCreate{ @@ -252,7 +270,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), makeDifferentFilterDeployment(), }, Mocks: controllertesting.Mocks{ @@ -274,7 +292,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ @@ -295,7 +313,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockCreates: []controllertesting.MockCreate{ @@ -316,7 +334,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), makeDifferentFilterService(), }, Mocks: controllertesting.Mocks{ @@ -338,7 +356,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ @@ -359,7 +377,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockCreates: []controllertesting.MockCreate{ @@ -380,7 +398,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), makeDifferentIngressDeployment(), }, Mocks: controllertesting.Mocks{ @@ -402,7 +420,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ @@ -423,7 +441,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Mocks: controllertesting.Mocks{ MockCreates: []controllertesting.MockCreate{ @@ -444,7 +462,7 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), makeDifferentIngressService(), }, Mocks: controllertesting.Mocks{ @@ -461,12 +479,247 @@ func TestReconcile(t *testing.T) { }, WantErrMsg: "test error updating ingress Service", }, + { + Name: "Ingress Channel.List error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + + if _, ok := list.(*v1alpha1.ChannelList); ok && opts.LabelSelector.Matches(l) { + return controllertesting.Handled, errors.New("test error getting Ingress Channel") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error getting Ingress Channel", + }, + { + Name: "Ingress Channel.Create error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + // Controller Runtime's fake client totally ignores the opts.LabelSelector, so + // picks up the Trigger Channel while listing the Ingress Channel. Use a mock to + // force the correct behavior. + func(innerClient client.Client, ctx context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + if _, ok := list.(*v1alpha1.ChannelList); ok { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + if opts.LabelSelector.Matches(l) { + return controllertesting.Handled, nil + } + } + return controllertesting.Unhandled, nil + }, + }, + MockCreates: []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if c, ok := obj.(*v1alpha1.Channel); ok { + if cmp.Equal(c.Labels, IngressChannelLabels(makeBroker())) { + return controllertesting.Handled, errors.New("test error creating Ingress Channel") + } + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error creating Ingress Channel", + }, + { + Name: "Ingress Channel is different than expected", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeDifferentIngressChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + // Controller Runtime's fake client totally ignores the opts.LabelSelector, so + // picks up the Trigger Channel while listing the Ingress Channel. Use a mock to + // force the correct behavior. + func(innerClient client.Client, ctx context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + if cl, ok := list.(*v1alpha1.ChannelList); ok { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + if opts.LabelSelector.Matches(l) { + cl.Items = append(cl.Items, *makeDifferentIngressChannel()) + return controllertesting.Handled, nil + } + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantPresent: []runtime.Object{ + // This is special because the Channel is not updated, unlike most things that + // differ from expected. + // TODO uncomment the following line once our test framework supports searching for + // GenerateName. + // makeDifferentIngressChannel(), + }, + WantEvent: []corev1.Event{ + { + Reason: brokerReconciled, Type: corev1.EventTypeNormal, + }, + }, + }, + { + Name: "Ingress Channel is not yet Addressable", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeNonAddressableIngressChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + // Controller Runtime's fake client totally ignores the opts.LabelSelector, so + // picks up the Trigger Channel while listing the Ingress Channel. Use a mock to + // force the correct behavior. + func(innerClient client.Client, ctx context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + if cl, ok := list.(*v1alpha1.ChannelList); ok { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + if opts.LabelSelector.Matches(l) { + cl.Items = append(cl.Items, *makeNonAddressableIngressChannel()) + return controllertesting.Handled, nil + } + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantResult: reconcile.Result{RequeueAfter: time.Second}, + }, + { + Name: "Subscription.List error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeIngressChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := list.(*v1alpha1.SubscriptionList); ok { + return controllertesting.Handled, errors.New("test error getting Subscription") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error getting Subscription", + }, + { + Name: "Subscription.Create error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeIngressChannel(), + }, + Mocks: controllertesting.Mocks{ + MockCreates: []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*v1alpha1.Subscription); ok { + return controllertesting.Handled, errors.New("test error creating Subscription") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error creating Subscription", + }, + { + Name: "Subscription is different than expected", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeIngressChannel(), + }, + WantPresent: []runtime.Object{ + // This is special because the Channel is not updated, unlike most things that + // differ from expected. + // TODO uncomment the following line once our test framework supports searching for + // GenerateName. + // makeDifferentSubscription(), + }, + WantEvent: []corev1.Event{ + { + Reason: brokerReconciled, Type: corev1.EventTypeNormal, + }, + }, + }, + { + Name: "Subscription.Delete error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeIngressChannel(), + makeDifferentSubscription(), + }, + Mocks: controllertesting.Mocks{ + MockDeletes: []controllertesting.MockDelete{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*v1alpha1.Subscription); ok { + return controllertesting.Handled, errors.New("test error deleting Subscription") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantEvent: []corev1.Event{events[ingressSubscriptionDeleteFailed]}, + WantErrMsg: "test error deleting Subscription", + }, + { + Name: "Subscription.Create error when recreating", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeBroker(), + makeTriggerChannel(), + makeIngressChannel(), + makeDifferentSubscription(), + }, + Mocks: controllertesting.Mocks{ + MockCreates: []controllertesting.MockCreate{ + func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) { + if _, ok := obj.(*v1alpha1.Subscription); ok { + return controllertesting.Handled, errors.New("test error creating Subscription") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantEvent: []corev1.Event{events[ingressSubscriptionCreateFailed]}, + WantErrMsg: "test error creating Subscription", + }, { Name: "Broker.Get for status update fails", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), + makeIngressChannel(), }, Mocks: controllertesting.Mocks{ MockGets: []controllertesting.MockGet{ @@ -501,7 +754,8 @@ func TestReconcile(t *testing.T) { Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeBroker(), - makeChannel(), + makeTriggerChannel(), + makeIngressChannel(), }, Mocks: controllertesting.Mocks{ MockStatusUpdates: []controllertesting.MockStatusUpdate{ @@ -529,16 +783,40 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeBroker(), // The Channel needs to be addressable for the reconcile to succeed. - makeChannel(), + makeTriggerChannel(), + makeIngressChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + // Controller Runtime's fake client totally ignores the opts.LabelSelector, so + // picks up the Trigger Channel while listing the Ingress Channel. Use a mock to + // force the correct behavior. + func(innerClient client.Client, ctx context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + if cl, ok := list.(*v1alpha1.ChannelList); ok { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + if opts.LabelSelector.Matches(l) { + cl.Items = append(cl.Items, *makeIngressChannel()) + return controllertesting.Handled, nil + } + } + return controllertesting.Unhandled, nil + }, + }, }, WantPresent: []runtime.Object{ makeReadyBroker(), - // TODO Uncomment makeChannel() when our test framework handles generateName. - // makeChannel(), + // TODO Uncomment makeTriggerChannel() when our test framework handles generateName. + // makeTriggerChannel(), makeFilterDeployment(), makeFilterService(), makeIngressDeployment(), makeIngressService(), + // TODO Uncomment makeIngressChannel() when our test framework handles generateName. + // makeIngressChannel(), + // Because the + makeTestSubscription(), }, WantEvent: []corev1.Event{ { @@ -588,10 +866,12 @@ func makeBroker() *v1alpha1.Broker { func makeReadyBroker() *v1alpha1.Broker { b := makeBroker() b.Status.InitializeConditions() - b.Status.MarkChannelReady() - b.Status.SetAddress(fmt.Sprintf("%s-broker.%s.svc.%s", brokerName, testNS, utils.GetClusterDomainName())) - b.Status.MarkFilterReady() b.Status.MarkIngressReady() + b.Status.MarkTriggerChannelReady() + b.Status.MarkIngressChannelReady() + b.Status.MarkFilterReady() + b.Status.SetAddress(fmt.Sprintf("%s-broker.%s.svc.%s", brokerName, testNS, utils.GetClusterDomainName())) + b.Status.MarkIngressSubscriptionReady() return b } @@ -601,7 +881,7 @@ func makeDeletingBroker() *v1alpha1.Broker { return b } -func makeChannel() *v1alpha1.Channel { +func makeTriggerChannel() *v1alpha1.Channel { return &v1alpha1.Channel{ ObjectMeta: metav1.ObjectMeta{ Namespace: testNS, @@ -619,20 +899,59 @@ func makeChannel() *v1alpha1.Channel { }, Status: v1alpha1.ChannelStatus{ Address: duckv1alpha1.Addressable{ - Hostname: channelHostname, + Hostname: triggerChannelHostname, + }, + }, + } +} + +func makeNonAddressableTriggerChannel() *v1alpha1.Channel { + c := makeTriggerChannel() + c.Status.Address = duckv1alpha1.Addressable{} + return c +} + +func makeDifferentTriggerChannel() *v1alpha1.Channel { + c := makeTriggerChannel() + c.Spec.Provisioner.Name = "some-other-provisioner" + return c +} + +func makeIngressChannel() *v1alpha1.Channel { + return &v1alpha1.Channel{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + GenerateName: fmt.Sprintf("%s-broker-ingress-", brokerName), + // The Fake library doesn't understand GenerateName, so give this a name so it doesn't + // collide with the Trigger Channel. + Name: ingressChannelName, + Labels: map[string]string{ + "eventing.knative.dev/broker": brokerName, + "eventing.knative.dev/brokerIngress": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + getOwnerReference(), + }, + }, + Spec: v1alpha1.ChannelSpec{ + Provisioner: channelProvisioner, + }, + Status: v1alpha1.ChannelStatus{ + Address: duckv1alpha1.Addressable{ + Hostname: ingressChannelHostname, }, }, } } -func makeNonAddressableChannel() *v1alpha1.Channel { - c := makeChannel() +func makeNonAddressableIngressChannel() *v1alpha1.Channel { + c := makeIngressChannel() c.Status.Address = duckv1alpha1.Addressable{} return c } -func makeDifferentChannel() *v1alpha1.Channel { - c := makeChannel() +func makeDifferentIngressChannel() *v1alpha1.Channel { + c := makeIngressChannel() c.Spec.Provisioner.Name = "some-other-provisioner" return c } @@ -676,7 +995,7 @@ func makeIngressDeployment() *appsv1.Deployment { Broker: makeBroker(), Image: ingressImage, ServiceAccountName: ingressSA, - ChannelAddress: channelHostname, + ChannelAddress: triggerChannelHostname, }) d.TypeMeta = metav1.TypeMeta{ APIVersion: "apps/v1", @@ -706,6 +1025,48 @@ func makeDifferentIngressService() *corev1.Service { return s } +func makeTestSubscription() *v1alpha1.Subscription { + return &v1alpha1.Subscription{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "eventing.knative.dev/v1alpha1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: testNS, + GenerateName: fmt.Sprintf("internal-ingress-%s-", brokerName), + Labels: map[string]string{ + "eventing.knative.dev/broker": brokerName, + "eventing.knative.dev/brokerIngress": "true", + }, + OwnerReferences: []metav1.OwnerReference{ + getOwnerReference(), + }, + }, + Spec: v1alpha1.SubscriptionSpec{ + Channel: corev1.ObjectReference{ + APIVersion: v1alpha1.SchemeGroupVersion.String(), + Kind: "Channel", + Name: ingressChannelName, + }, + Subscriber: &v1alpha1.SubscriberSpec{ + Ref: &corev1.ObjectReference{ + APIVersion: "v1", + Kind: "Service", + Name: makeIngressService().Name, + }, + }, + }, + } +} + +func makeDifferentSubscription() *v1alpha1.Subscription { + s := makeTestSubscription() + s.Spec.Subscriber.Ref = nil + url := "http://example.com/" + s.Spec.Subscriber.DNSName = &url + return s +} + func getOwnerReference() metav1.OwnerReference { return metav1.OwnerReference{ APIVersion: v1alpha1.SchemeGroupVersion.String(), diff --git a/pkg/reconciler/v1alpha1/trigger/trigger.go b/pkg/reconciler/v1alpha1/trigger/trigger.go index 5d3f4b71012..f0365f054d2 100644 --- a/pkg/reconciler/v1alpha1/trigger/trigger.go +++ b/pkg/reconciler/v1alpha1/trigger/trigger.go @@ -226,9 +226,14 @@ func (r *reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error { } t.Status.MarkBrokerExists() - c, err := r.getBrokerChannel(ctx, b) + brokerTrigger, err := r.getBrokerTriggerChannel(ctx, b) if err != nil { - logging.FromContext(ctx).Error("Unable to get the Broker's Channel", zap.Error(err)) + logging.FromContext(ctx).Error("Unable to get the Broker's Trigger Channel", zap.Error(err)) + return err + } + brokerIngress, err := r.getBrokerIngressChannel(ctx, b) + if err != nil { + logging.FromContext(ctx).Error("Unable to get the Broker's Ingress Channel", zap.Error(err)) return err } @@ -253,7 +258,7 @@ func (r *reconciler) reconcile(ctx context.Context, t *v1alpha1.Trigger) error { } t.Status.MarkVirtualServiceExists() - _, err = r.subscribeToBrokerChannel(ctx, t, c, svc) + _, err = r.subscribeToBrokerChannel(ctx, t, brokerTrigger, brokerIngress, svc) if err != nil { logging.FromContext(ctx).Error("Unable to Subscribe", zap.Error(err)) t.Status.MarkNotSubscribed("notSubscribed", "%v", err) @@ -315,12 +320,24 @@ func (r *reconciler) getBroker(ctx context.Context, t *v1alpha1.Trigger) (*v1alp return b, err } -// getBrokerChannel returns the Broker's channel if exists, otherwise it returns an error. -func (r *reconciler) getBrokerChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { +// getBrokerTriggerChannel return the Broker's Trigger Channel if it exists, otherwise it returns an +// error. +func (r *reconciler) getBrokerTriggerChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { + return r.getChannel(ctx, b, labels.SelectorFromSet(broker.TriggerChannelLabels(b))) +} + +// getBrokerIngressChannel return the Broker's Ingress Channel if it exists, otherwise it returns an +// error. +func (r *reconciler) getBrokerIngressChannel(ctx context.Context, b *v1alpha1.Broker) (*v1alpha1.Channel, error) { + return r.getChannel(ctx, b, labels.SelectorFromSet(broker.IngressChannelLabels(b))) +} + +// getChannel returns the Broker's channel if it exists, otherwise it returns an error. +func (r *reconciler) getChannel(ctx context.Context, b *v1alpha1.Broker, ls labels.Selector) (*v1alpha1.Channel, error) { list := &v1alpha1.ChannelList{} opts := &runtimeclient.ListOptions{ Namespace: b.Namespace, - LabelSelector: labels.SelectorFromSet(broker.ChannelLabels(b)), + LabelSelector: ls, // Set Raw because if we need to get more than one page, then we will put the continue token // into opts.Raw.Continue. Raw: &metav1.ListOptions{}, @@ -521,9 +538,9 @@ func virtualServiceLabels(t *v1alpha1.Trigger) map[string]string { } } -// subscribeToBrokerChannel subscribes service 'svc' to Broker's channel 'c'. -func (r *reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.Trigger, c *v1alpha1.Channel, svc *corev1.Service) (*v1alpha1.Subscription, error) { - expected := makeSubscription(t, c, svc) +// subscribeToBrokerChannel subscribes service 'svc' to the Broker's channels. +func (r *reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.Trigger, brokerTrigger, brokerIngress *v1alpha1.Channel, svc *corev1.Service) (*v1alpha1.Subscription, error) { + expected := makeSubscription(t, brokerTrigger, brokerIngress, svc) sub, err := r.getSubscription(ctx, t) // If the resource doesn't exist, we'll create it @@ -541,8 +558,8 @@ func (r *reconciler) subscribeToBrokerChannel(ctx context.Context, t *v1alpha1.T // Update Subscription if it has changed. Ignore the generation. expected.Spec.DeprecatedGeneration = sub.Spec.DeprecatedGeneration if !equality.Semantic.DeepDerivative(expected.Spec, sub.Spec) { - // Given that the backing channel spec is immutable, we cannot just update the subscription. - // We delete it instead, and re-create it. + // Given that spec.channel is immutable, we cannot just update the Subscription. We delete + // it and re-create it instead. err = r.client.Delete(ctx, sub) if err != nil { logging.FromContext(ctx).Info("Cannot delete subscription", zap.Error(err)) @@ -585,8 +602,9 @@ func (r *reconciler) getSubscription(ctx context.Context, t *v1alpha1.Trigger) ( return nil, k8serrors.NewNotFound(schema.GroupResource{}, "") } -// makeSubscription returns a placeholder subscription for trigger 't', channel 'c', and service 'svc'. -func makeSubscription(t *v1alpha1.Trigger, c *v1alpha1.Channel, svc *corev1.Service) *v1alpha1.Subscription { +// makeSubscription returns a placeholder subscription for trigger 't', from brokerTrigger to 'svc' +// replying to brokerIngress. +func makeSubscription(t *v1alpha1.Trigger, brokerTrigger, brokerIngress *v1alpha1.Channel, svc *corev1.Service) *v1alpha1.Subscription { return &v1alpha1.Subscription{ ObjectMeta: metav1.ObjectMeta{ Namespace: t.Namespace, @@ -604,7 +622,7 @@ func makeSubscription(t *v1alpha1.Trigger, c *v1alpha1.Channel, svc *corev1.Serv Channel: corev1.ObjectReference{ APIVersion: v1alpha1.SchemeGroupVersion.String(), Kind: "Channel", - Name: c.Name, + Name: brokerTrigger.Name, }, Subscriber: &v1alpha1.SubscriberSpec{ Ref: &corev1.ObjectReference{ @@ -613,12 +631,11 @@ func makeSubscription(t *v1alpha1.Trigger, c *v1alpha1.Channel, svc *corev1.Serv Name: svc.Name, }, }, - // TODO This pushes directly into the Channel, it should probably point at the Broker ingress instead. Reply: &v1alpha1.ReplyStrategy{ Channel: &corev1.ObjectReference{ APIVersion: v1alpha1.SchemeGroupVersion.String(), Kind: "Channel", - Name: c.Name, + Name: brokerIngress.Name, }, }, }, diff --git a/pkg/reconciler/v1alpha1/trigger/trigger_test.go b/pkg/reconciler/v1alpha1/trigger/trigger_test.go index ea05130025a..a861e924486 100644 --- a/pkg/reconciler/v1alpha1/trigger/trigger_test.go +++ b/pkg/reconciler/v1alpha1/trigger/trigger_test.go @@ -25,6 +25,7 @@ import ( "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/reconciler/names" controllertesting "github.com/knative/eventing/pkg/reconciler/testing" + "github.com/knative/eventing/pkg/reconciler/v1alpha1/broker" "github.com/knative/eventing/pkg/utils" duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1" istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" @@ -32,6 +33,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" @@ -181,7 +183,7 @@ func TestReconcile(t *testing.T) { WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { - Name: "Get Broker channel error", + Name: "Get Broker Trigger channel error", Scheme: scheme.Scheme, InitialState: []runtime.Object{ makeTrigger(), @@ -189,15 +191,44 @@ func TestReconcile(t *testing.T) { }, Mocks: controllertesting.Mocks{ MockLists: []controllertesting.MockList{ - func(_ client.Client, _ context.Context, _ *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { - if _, ok := list.(*v1alpha1.ChannelList); ok { - return controllertesting.Handled, errors.New("test error getting broker's channel") + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (controllertesting.MockHandled, error) { + // Only match the Trigger Channel labels. + ls := labels.FormatLabels(broker.TriggerChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + + if _, ok := list.(*v1alpha1.ChannelList); ok && opts.LabelSelector.Matches(l) { + return controllertesting.Handled, errors.New("test error getting broker's Trigger channel") + } + return controllertesting.Unhandled, nil + }, + }, + }, + WantErrMsg: "test error getting broker's Trigger channel", + WantEvent: []corev1.Event{events[triggerReconcileFailed]}, + }, + { + Name: "Get Broker Ingress channel error", + Scheme: scheme.Scheme, + InitialState: []runtime.Object{ + makeTrigger(), + makeBroker(), + makeTriggerChannel(), + }, + Mocks: controllertesting.Mocks{ + MockLists: []controllertesting.MockList{ + func(_ client.Client, _ context.Context, opts *client.ListOptions, list runtime.Object) (handled controllertesting.MockHandled, e error) { + // Only match the Ingress Channel labels. + ls := labels.FormatLabels(broker.IngressChannelLabels(makeBroker())) + l, _ := labels.ConvertSelectorToLabelsMap(ls) + + if _, ok := list.(*v1alpha1.ChannelList); ok && opts.LabelSelector.Matches(l) { + return controllertesting.Handled, errors.New("test error getting broker's Ingress channel") } return controllertesting.Unhandled, nil }, }, }, - WantErrMsg: "test error getting broker's channel", + WantErrMsg: "test error getting broker's Ingress channel", WantEvent: []corev1.Event{events[triggerReconcileFailed]}, }, { @@ -206,7 +237,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), }, DynamicMocks: controllertesting.DynamicMocks{ MockGets: []controllertesting.MockDynamicGet{ @@ -228,7 +259,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), }, Objects: []runtime.Object{ makeSubscriberServiceAsUnstructured(), @@ -252,7 +283,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeDifferentK8sService(), }, Objects: []runtime.Object{ @@ -277,7 +308,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), }, Objects: []runtime.Object{ @@ -302,7 +333,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeDifferentVirtualService(), }, @@ -328,7 +359,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeVirtualService(), }, @@ -354,7 +385,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeVirtualService(), makeDifferentSubscription(), @@ -381,7 +412,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeVirtualService(), makeDifferentSubscription(), @@ -408,7 +439,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeVirtualService(), makeSameSubscription(), @@ -435,7 +466,7 @@ func TestReconcile(t *testing.T) { InitialState: []runtime.Object{ makeTrigger(), makeBroker(), - makeChannel(), + makeTriggerChannel(), makeK8sService(), makeVirtualService(), makeSameSubscription(), @@ -562,7 +593,7 @@ func newChannel(name string) *v1alpha1.Channel { } } -func makeChannel() *v1alpha1.Channel { +func makeTriggerChannel() *v1alpha1.Channel { return newChannel(fmt.Sprintf("%s-broker", brokerName)) } @@ -611,11 +642,11 @@ func makeDifferentVirtualService() *istiov1alpha3.VirtualService { } func makeSameSubscription() *v1alpha1.Subscription { - return makeSubscription(makeTrigger(), makeChannel(), makeK8sService()) + return makeSubscription(makeTrigger(), makeTriggerChannel(), makeTriggerChannel(), makeK8sService()) } func makeDifferentSubscription() *v1alpha1.Subscription { - return makeSubscription(makeTrigger(), makeDifferentChannel(), makeK8sService()) + return makeSubscription(makeTrigger(), makeTriggerChannel(), makeDifferentChannel(), makeK8sService()) } func getOwnerReference() metav1.OwnerReference {