Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
testlib "knative.dev/eventing/test/lib"
)

func TestChannelEventModesAndSpecVersions(t *testing.T) {
helpers.ChannelMessageModesAndSpecVersionsTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop)
func TestChannelDataPlaneSuccess(t *testing.T) {
helpers.ChannelDataPlaneSuccessTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop)
}

func TestChannelDataPlaneFailure(t *testing.T) {
helpers.ChannelDataPlaneFailureTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ limitations under the License.
package helpers

import (
"fmt"
"net/http"
"strconv"
"strings"
"testing"

Expand All @@ -32,8 +34,8 @@ import (
"knative.dev/eventing/test/lib/sender"
)

// ChannelMessageModesAndSpecVersionsTestRunner tests the support of the channel ingress for different spec versions and message modes
func ChannelMessageModesAndSpecVersionsTestRunner(
// ChannelDataPlaneSuccessTestRunner tests the support of the channel ingress for different spec versions and message modes
func ChannelDataPlaneSuccessTestRunner(
t *testing.T,
channelTestRunner testlib.ComponentsTestRunner,
options ...testlib.SetupClientOption,
Expand All @@ -46,6 +48,7 @@ func ChannelMessageModesAndSpecVersionsTestRunner(

var testCases []testCase

// Generate matrix events/encoding/spec versions
for _, event := range []cloudevents.Event{MinEvent(), FullEvent()} {
for _, enc := range []cloudevents.Encoding{cloudevents.EncodingBinary, cloudevents.EncodingStructured} {
for _, version := range []string{cloudevents.VersionV03, cloudevents.VersionV1} {
Expand All @@ -57,14 +60,14 @@ func ChannelMessageModesAndSpecVersionsTestRunner(
channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) {
for _, tc := range testCases {
t.Run(tc.event.ID()+"_encoding_"+tc.encoding.String()+"_version_"+tc.version, func(t *testing.T) {
messageModeSpecVersionTest(t, channel, tc.event, tc.encoding, tc.version, options...)
channelDataPlaneSuccessTest(t, channel, tc.event, tc.encoding, tc.version, options...)
})
}
})
}

// Sender -> Channel -> Subscriber -> Record Events
func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event cloudevents.Event, encoding cloudevents.Encoding, version string, options ...testlib.SetupClientOption) {
func channelDataPlaneSuccessTest(t *testing.T, channel metav1.TypeMeta, event cloudevents.Event, encoding cloudevents.Encoding, version string, options ...testlib.SetupClientOption) {
client := testlib.Setup(t, true, options...)
defer testlib.TearDown(client)

Expand Down Expand Up @@ -133,3 +136,103 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo
recordevents.MatchEvent(sender.MatchStatusCode(http.StatusAccepted)),
)
}

// ChannelDataPlaneFailureTestRunner tests some status codes from the spec
func ChannelDataPlaneFailureTestRunner(
t *testing.T,
channelTestRunner testlib.ComponentsTestRunner,
options ...testlib.SetupClientOption,
) {
testCases := []struct {
statusCode int
senderFn func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string)
eventId string
}{{
statusCode: http.StatusMethodNotAllowed,
senderFn: func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string) {
c.SendRequestToAddressable(
senderName,
channelName,
&channel,
map[string]string{
"ce-specversion": "1.0",
"ce-type": "example",
"ce-source": "http://localhost",
"ce-id": eventId,
"content-type": "application/json",
},
"{}",
sender.WithMethod("PUT"),
sender.WithResponseSink(responseSink),
)
},
}, {
statusCode: http.StatusBadRequest,
senderFn: func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string) {
c.SendRequestToAddressable(
senderName,
channelName,
&channel,
map[string]string{
"ce-specversion": "10.0", // <-- Spec version not existing
"ce-type": "example",
"ce-source": "http://localhost",
"ce-id": eventId,
"content-type": "application/json",
},
"{}",
sender.WithResponseSink(responseSink),
)
},
}}

channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) {
for _, tc := range testCases {
t.Run("expecting-"+strconv.Itoa(tc.statusCode), func(t *testing.T) {
channelDataPlaneFailureTest(t, channel, tc.senderFn, tc.statusCode, options...)
})
}
})
}

// (Request) Sender -> Channel -> Subscriber -> Record Events
func channelDataPlaneFailureTest(
t *testing.T,
channel metav1.TypeMeta,
senderFn func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string),
expectingStatusCode int,
options ...testlib.SetupClientOption,
) {
client := testlib.Setup(t, true, options...)
defer testlib.TearDown(client)

resourcesNamePrefix := fmt.Sprintf("expecting-%d", expectingStatusCode)

channelName := resourcesNamePrefix + "-ch"
client.CreateChannelOrFail(channelName, &channel)

subscriberName := resourcesNamePrefix + "-recordevents"
eventTracker, _ := recordevents.StartEventRecordOrFail(client, subscriberName)

client.CreateSubscriptionOrFail(
resourcesNamePrefix+"-sub",
channelName,
&channel,
resources.WithSubscriberForSubscription(subscriberName),
)

client.WaitForAllTestResourcesReadyOrFail()

eventId := "xyz"
senderFn(client, resourcesNamePrefix+"-sender", channelName, channel, eventId, "http://"+client.GetServiceHost(subscriberName))

eventTracker.AssertExact(
1,
recordevents.MatchEvent(sender.MatchStatusCode(expectingStatusCode)),
)

// Assert the event is not received
eventTracker.AssertNot(
recordevents.MatchEvent(HasId(eventId)),
)
}
4 changes: 2 additions & 2 deletions test/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
configtracing "knative.dev/pkg/tracing/config"

eventing "knative.dev/eventing/pkg/client/clientset/versioned"
"knative.dev/eventing/test/lib/tracing"
"knative.dev/eventing/test/test_images"
)

// Client holds instances of interfaces for making requests to Knative.
Expand Down Expand Up @@ -109,5 +109,5 @@ func getTracingConfig(c *kubernetes.Clientset) (corev1.EnvVar, error) {
return corev1.EnvVar{}, fmt.Errorf("error while serializing the config-tracing config map: %+v", errors.WithStack(err))
}

return corev1.EnvVar{Name: tracing.ConfigTracingEnv, Value: configSerialized}, nil
return corev1.EnvVar{Name: test_images.ConfigTracingEnv, Value: configSerialized}, nil
}
40 changes: 38 additions & 2 deletions test/lib/send_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package lib

import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
pkgTest "knative.dev/pkg/test"

Expand All @@ -32,7 +33,7 @@ func (c *Client) SendEventToAddressable(
addressableName string,
typemeta *metav1.TypeMeta,
event cloudevents.Event,
option ...sender.EventSenderOption,
option ...func(*corev1.Pod),
) {
uri, err := c.GetAddressableURI(addressableName, typemeta)
if err != nil {
Expand All @@ -46,7 +47,7 @@ func (c *Client) SendEvent(
senderName string,
uri string,
event cloudevents.Event,
option ...sender.EventSenderOption,
option ...func(*corev1.Pod),
) {
namespace := c.Namespace
pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...)
Expand All @@ -58,3 +59,38 @@ func (c *Client) SendEvent(
c.T.Fatalf("Failed to send event %v to %s: %+v", event, uri, errors.WithStack(err))
}
}

// SendRequestToAddressable will send the given request to the given Addressable.
func (c *Client) SendRequestToAddressable(
senderName,
addressableName string,
typemeta *metav1.TypeMeta,
headers map[string]string,
body string,
option ...func(*corev1.Pod),
) {
uri, err := c.GetAddressableURI(addressableName, typemeta)
if err != nil {
c.T.Fatalf("Failed to get the URI for %+v-%s", typemeta, addressableName)
}
c.SendRequest(senderName, uri, headers, body, option...)
}

// SendRequest will create a sender pod, which will send the given request to the given url.
func (c *Client) SendRequest(
senderName string,
uri string,
headers map[string]string,
body string,
option ...func(*corev1.Pod),
) {
namespace := c.Namespace
pod, err := sender.RequestSenderPod("request-sender", senderName, uri, headers, body, option...)
if err != nil {
c.T.Fatalf("Failed to create request-sender pod: %+v", errors.WithStack(err))
}
c.CreatePodOrFail(pod)
if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil {
c.T.Fatalf("Failed to send request to %s: %+v", uri, errors.WithStack(err))
}
}
13 changes: 13 additions & 0 deletions test/lib/sender/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package sender

import (
"net/http"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
Expand Down Expand Up @@ -46,3 +47,15 @@ func NewSenderEvent(id string, source string, event *cloudevents.Event, result *

return ev
}

func NewSenderEventFromRaw(id string, source string, response *http.Response) cloudevents.Event {
ev := cloudevents.NewEvent()
ev.SetID(id)
ev.SetSource(source)
ev.SetType(EventType)
ev.SetTime(time.Now())

ev.SetExtension(ResponseStatusCodeExtension, response.StatusCode)

return ev
}
80 changes: 63 additions & 17 deletions test/lib/sender/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ import (
pkgTest "knative.dev/pkg/test"
)

type EventSenderOption func(*corev1.Pod)

// EnableTracing enables tracing in sender pod
func EnableTracing() EventSenderOption {
func EnableTracing() func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
Expand All @@ -39,8 +37,8 @@ func EnableTracing() EventSenderOption {
}
}

// EnableIncrementalId creates a new incremental id for each event sent from the sender pod
func EnableIncrementalId() EventSenderOption {
// EnableIncrementalId creates a new incremental id for each event sent from the sender pod. Supported only by event-sender
func EnableIncrementalId() func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
Expand All @@ -50,8 +48,8 @@ func EnableIncrementalId() EventSenderOption {
}
}

// WithEncoding forces the encoding of the event to send from the sender pod
func WithEncoding(encoding cloudevents.Encoding) EventSenderOption {
// WithEncoding forces the encoding of the event to send from the sender pod. Supported only by event-sender
func WithEncoding(encoding cloudevents.Encoding) func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
Expand All @@ -62,7 +60,7 @@ func WithEncoding(encoding cloudevents.Encoding) EventSenderOption {
}

// WithResponseSink sends the response information as CloudEvent to another sink
func WithResponseSink(responseSink string) EventSenderOption {
func WithResponseSink(responseSink string) func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
Expand All @@ -72,24 +70,19 @@ func WithResponseSink(responseSink string) EventSenderOption {
}
}

// WithEncoding forces the encoding of the event to send from the sender pod
func WithAdditionalHeaders(headers map[string]string) EventSenderOption {
var kv []string
for k, v := range headers {
kv = append(kv, k+"="+v)
}
serializedHeaders := strings.Join(kv, ",")
// WithEncoding forces the encoding of the event to send from the sender pod. Supported only by event-sender
func WithAdditionalHeaders(headers map[string]string) func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
"-additional-headers",
serializedHeaders,
serializeHeaders(headers),
)
}
}

// EventSenderPod creates a Pod that sends events to the given address.
func EventSenderPod(imageName string, name string, sink string, event cloudevents.Event, options ...EventSenderOption) (*corev1.Pod, error) {
func EventSenderPod(imageName string, name string, sink string, event cloudevents.Event, options ...func(*corev1.Pod)) (*corev1.Pod, error) {
encodedEvent, err := json.Marshal(event)
if err != nil {
return nil, err
Expand Down Expand Up @@ -124,3 +117,56 @@ func EventSenderPod(imageName string, name string, sink string, event cloudevent

return p, nil
}

// WithMethod configures the method used to send the http request. Supported only by request-sender
func WithMethod(method string) func(*corev1.Pod) {
return func(pod *corev1.Pod) {
pod.Spec.Containers[0].Args = append(
pod.Spec.Containers[0].Args,
"-method",
method,
)
}
}

// EventSenderPod creates a Pod that sends http requests to the given address.
func RequestSenderPod(imageName string, name string, sink string, headers map[string]string, body string, options ...func(*corev1.Pod)) (*corev1.Pod, error) {
args := []string{
"-sink",
sink,
"-headers",
serializeHeaders(headers),
"-body",
body,
}

p := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: imageName,
Image: pkgTest.ImagePath(imageName),
ImagePullPolicy: corev1.PullAlways,
Args: args,
}},
// Never restart the event sender Pod.
RestartPolicy: corev1.RestartPolicyNever,
},
}

for _, opt := range options {
opt(p)
}

return p, nil
}

func serializeHeaders(headers map[string]string) string {
var kv []string
for k, v := range headers {
kv = append(kv, k+"="+v)
}
return strings.Join(kv, ",")
}
Loading