Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/core/resources/broker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ spec:
name: v1beta1
served: true
storage: true
- << : *version
name: v1
served: true
storage: false
names:
kind: Broker
plural: brokers
Expand Down
48 changes: 30 additions & 18 deletions config/core/resources/trigger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,34 @@ spec:
storage: false
subresources:
status: {}
schema:
openAPIV3Schema:
type: object
# this is a work around so we don't need to flush out the
# schema for each version at this time
#
# see issue: https://github.com/knative/serving/issues/912
x-kubernetes-preserve-unknown-fields: true
additionalPrinterColumns:
- name: Ready
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].status"
- name: Reason
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].reason"
- name: Broker
type: string
jsonPath: .spec.broker
- name: Subscriber_URI
type: string
jsonPath: .status.subscriberUri
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
- << : *version
name: v1beta1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
Expand Down Expand Up @@ -85,26 +113,10 @@ spec:
status:
type: object
x-kubernetes-preserve-unknown-fields: true
additionalPrinterColumns:
- name: Ready
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].status"
- name: Reason
type: string
jsonPath: ".status.conditions[?(@.type==\"Ready\")].reason"
- name: Broker
type: string
jsonPath: .spec.broker
- name: Subscriber_URI
type: string
jsonPath: .status.subscriberUri
- name: Age
type: date
jsonPath: .metadata.creationTimestamp
- << : *version
name: v1beta1
name: v1
served: true
storage: true
storage: false
schema:
openAPIV3Schema:
type: object
Expand Down
13 changes: 11 additions & 2 deletions test/e2e/broker_channel_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ Trigger2 logs all events,
Trigger3 filters the transformed event and sends it to Channel.

*/
func TestBrokerChannelFlow(t *testing.T) {
helpers.BrokerChannelFlowWithTransformation(t, brokerClass, channelTestRunner)
func TestBrokerChannelFlowTriggerV1BrokerV1(t *testing.T) {
helpers.BrokerChannelFlowWithTransformation(t, brokerClass, "v1", "v1", channelTestRunner)
}
func TestBrokerChannelFlowV1Beta1BrokerV1(t *testing.T) {
helpers.BrokerChannelFlowWithTransformation(t, brokerClass, "v1", "v1beta1", channelTestRunner)
}
func TestBrokerChannelFlowTriggerV1Beta1BrokerV1Beta1(t *testing.T) {
helpers.BrokerChannelFlowWithTransformation(t, brokerClass, "v1beta1", "v1beta1", channelTestRunner)
}
func TestBrokerChannelFlowTriggerV1BrokerV1Beta1(t *testing.T) {
helpers.BrokerChannelFlowWithTransformation(t, brokerClass, "v1beta1", "v1", channelTestRunner)
}
13 changes: 11 additions & 2 deletions test/e2e/broker_event_transformation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ EventSource ---> Broker ---> Trigger1 -------> Service(Transformation)

Note: the number denotes the sequence of the event that flows in this test case.
*/
func TestEventTransformationForTrigger(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, channelTestRunner)
func TestEventTransformationForTriggerV1BrokerV1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1", "v1", channelTestRunner)
}
func TestEventTransformationForTriggerV1Beta1BrokerV1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1", "v1beta1", channelTestRunner)
}
func TestEventTransformationForTriggerV1Beta1BrokerV1Beta1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1beta1", "v1beta1", channelTestRunner)
}
func TestEventTransformationForTriggerV1BrokerV1Beta1(t *testing.T) {
helpers.EventTransformationForTriggerTestHelper(t, brokerClass, "v1beta1", "v1", channelTestRunner)
}
73 changes: 52 additions & 21 deletions test/e2e/helpers/broker_channel_flow_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ Trigger3 filters the transformed event and sends it to Channel.
*/
func BrokerChannelFlowWithTransformation(t *testing.T,
brokerClass string,
brokerVersion string,
triggerVersion string,
channelTestRunner testlib.ComponentsTestRunner,
options ...testlib.SetupClientOption) {
const (
Expand Down Expand Up @@ -85,7 +87,11 @@ func BrokerChannelFlowWithTransformation(t *testing.T,
//&channel

// create a new broker
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
if brokerVersion == "v1" {
client.CreateBrokerV1OrFail(brokerName, resources.WithBrokerClassForBrokerV1(brokerClass), resources.WithConfigForBrokerV1(config))
} else {
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
}
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)

// eventToSend is the event sent as input of the test
Expand All @@ -107,25 +113,41 @@ func BrokerChannelFlowWithTransformation(t *testing.T,
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 to receive the original event, and do event transformation
client.CreateTriggerOrFailV1Beta1(
triggerName1,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)

if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
triggerName1,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(transformationPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
triggerName1,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)
}
// create event tracker that should receive all sent events
allEventTracker, _ := recordevents.StartEventRecordOrFail(client, allEventsRecorderPodName)
defer allEventTracker.Cleanup()

// create trigger to receive all the events
client.CreateTriggerOrFailV1Beta1(
triggerName2,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(any, any, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(allEventsRecorderPodName),
)

if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
triggerName2,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(any, any, nil),
resources.WithSubscriberServiceRefForTriggerV1(allEventsRecorderPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
triggerName2,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(any, any, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(allEventsRecorderPodName),
)
}
// create channel for trigger3
client.CreateChannelOrFail(channelName, &channel)
client.WaitForResourceReadyOrFail(channelName, &channel)
Expand All @@ -135,12 +157,21 @@ func BrokerChannelFlowWithTransformation(t *testing.T,
if err != nil {
st.Fatalf("Failed to get the url for the channel %q: %+v", channelName, err)
}
client.CreateTriggerOrFailV1Beta1(
triggerName3,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberURIForTriggerV1Beta1(channelURL),
)
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
triggerName3,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberURIForTriggerV1(channelURL),
)
} else {
client.CreateTriggerOrFailV1Beta1(
triggerName3,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberURIForTriggerV1Beta1(channelURL),
)
}

// create event tracker that should receive only transformed events
transformedEventTracker, _ := recordevents.StartEventRecordOrFail(client, transformedEventsRecorderPodName)
Expand Down
56 changes: 39 additions & 17 deletions test/e2e/helpers/broker_event_transformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"knative.dev/eventing/pkg/apis/eventing/v1beta1"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
Expand All @@ -45,13 +44,14 @@ Note: the number denotes the sequence of the event that flows in this test case.
*/
func EventTransformationForTriggerTestHelper(t *testing.T,
brokerClass string,
channelTestRunner testlib.ComponentsTestRunner,
brokerVersion string,
triggerVersion string,
componentsTestRunner testlib.ComponentsTestRunner,
options ...testlib.SetupClientOption) {
const (
senderName = "e2e-eventtransformation-sender"
brokerName = "e2e-eventtransformation-broker"

any = v1beta1.TriggerAnyFilter
eventType = "type1"
transformedEventType = "type2"
eventSource = "source1"
Expand All @@ -66,15 +66,19 @@ func EventTransformationForTriggerTestHelper(t *testing.T,
recordEventsPodName = "recordevents-pod"
)

channelTestRunner.RunTests(t, testlib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
componentsTestRunner.RunTests(t, testlib.FeatureBasic, func(st *testing.T, channel metav1.TypeMeta) {
client := testlib.Setup(st, true, options...)
defer testlib.TearDown(client)

// Create a configmap used by the broker.
config := client.CreateBrokerConfigMapOrFail(brokerName, &channel)

// create a new broker
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
if brokerVersion == "v1" {
client.CreateBrokerV1OrFail(brokerName, resources.WithBrokerClassForBrokerV1(brokerClass), resources.WithConfigForBrokerV1(config))
} else {
client.CreateBrokerV1Beta1OrFail(brokerName, resources.WithBrokerClassForBrokerV1Beta1(brokerClass), resources.WithConfigForBrokerV1Beta1(config))
}
client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta)

// create the transformation service
Expand All @@ -87,24 +91,42 @@ func EventTransformationForTriggerTestHelper(t *testing.T,
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create trigger1 for event transformation
client.CreateTriggerOrFailV1Beta1(
originalTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
originalTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(transformationPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
originalTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(eventSource, eventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(transformationPodName),
)
}

// create logger pod and service
eventTracker, _ := recordevents.StartEventRecordOrFail(client, recordEventsPodName)
defer eventTracker.Cleanup()

// create trigger2 for event receiving
client.CreateTriggerOrFailV1Beta1(
transformedTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(recordEventsPodName),
)
if triggerVersion == "v1" {
client.CreateTriggerV1OrFail(
transformedTriggerName,
resources.WithBrokerV1(brokerName),
resources.WithAttributesTriggerFilterV1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1(recordEventsPodName),
)
} else {
client.CreateTriggerOrFailV1Beta1(
transformedTriggerName,
resources.WithBrokerV1Beta1(brokerName),
resources.WithAttributesTriggerFilterV1Beta1(transformedEventSource, transformedEventType, nil),
resources.WithSubscriberServiceRefForTriggerV1Beta1(recordEventsPodName),
)
}

// wait for all test resources to be ready, so that we can start sending events
client.WaitForAllTestResourcesReadyOrFail()
Expand Down
37 changes: 34 additions & 3 deletions test/lib/creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/eventing/v1beta1"
flowsv1beta1 "knative.dev/eventing/pkg/apis/flows/v1beta1"
messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -145,16 +146,16 @@ func (c *Client) CreateBrokerConfigMapOrFail(name string, channel *metav1.TypeMe
}
}

// CreateBrokerV1Beta1OrFail will create a Broker or fail the test if there is an error.
// CreateBrokerV1Beta1OrFail will create a v1beta1 Broker or fail the test if there is an error.
func (c *Client) CreateBrokerV1Beta1OrFail(name string, options ...resources.BrokerV1Beta1Option) *v1beta1.Broker {
namespace := c.Namespace
broker := resources.BrokerV1Beta1(name, options...)
brokers := c.Eventing.EventingV1beta1().Brokers(namespace)
c.T.Logf("Creating broker %s", name)
c.T.Logf("Creating v1beta1 broker %s", name)
// update broker with the new reference
broker, err := brokers.Create(broker)
if err != nil {
c.T.Fatalf("Failed to create broker %q: %v", name, err)
c.T.Fatalf("Failed to create v1beta1 broker %q: %v", name, err)
}
c.Tracker.AddObj(broker)
return broker
Expand All @@ -175,6 +176,36 @@ func (c *Client) CreateTriggerOrFailV1Beta1(name string, options ...resources.Tr
return trigger
}

// CreateBrokerV1OrFail will create a v1 Broker or fail the test if there is an error.
func (c *Client) CreateBrokerV1OrFail(name string, options ...resources.BrokerV1Option) *eventingv1.Broker {
namespace := c.Namespace
broker := resources.BrokerV1(name, options...)
brokers := c.Eventing.EventingV1().Brokers(namespace)
c.T.Logf("Creating v1 broker %s", name)
// update broker with the new reference
broker, err := brokers.Create(broker)
if err != nil {
c.T.Fatalf("Failed to create v1 broker %q: %v", name, err)
}
c.Tracker.AddObj(broker)
return broker
}

// CreateTriggerOrFailV1 will create a v1 Trigger or fail the test if there is an error.
func (c *Client) CreateTriggerV1OrFail(name string, options ...resources.TriggerOptionV1) *eventingv1.Trigger {
namespace := c.Namespace
trigger := resources.TriggerV1(name, options...)
triggers := c.Eventing.EventingV1().Triggers(namespace)
c.T.Logf("Creating v1 trigger %s", name)
// update trigger with the new reference
trigger, err := triggers.Create(trigger)
if err != nil {
c.T.Fatalf("Failed to create v1 trigger %q: %v", name, err)
}
c.Tracker.AddObj(trigger)
return trigger
}

// CreateFlowsSequenceOrFail will create a Sequence (in flows.knative.dev api group) or
// fail the test if there is an error.
func (c *Client) CreateFlowsSequenceOrFail(sequence *flowsv1beta1.Sequence) {
Expand Down
Loading