From b0da5b1380912acaea560f6ffd96031778983c10 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 30 Jun 2020 17:01:14 +0200 Subject: [PATCH] status code failures asserts in channel conformance data plane Signed-off-by: Francesco Guardiani --- ...ion_test.go => channel_data_plane_test.go} | 8 +- ...helper.go => channel_data_plane_helper.go} | 111 +++++++++++++- test/lib/client.go | 4 +- test/lib/send_event.go | 40 +++++- test/lib/sender/events.go | 13 ++ test/lib/sender/resources.go | 80 ++++++++--- test/test_images/event-sender/main.go | 34 ++--- test/test_images/recordevents/main.go | 4 +- test/test_images/request-sender/main.go | 136 ++++++++++++++++++ test/test_images/request-sender/pod.yaml | 9 ++ test/test_images/transformevents/main.go | 4 +- .../config.go => test_images/utils.go} | 29 +++- 12 files changed, 412 insertions(+), 60 deletions(-) rename test/conformance/{channel_message_modes_specversion_test.go => channel_data_plane_test.go} (71%) rename test/conformance/helpers/{channel_message_modes_specversion_helper.go => channel_data_plane_helper.go} (50%) create mode 100644 test/test_images/request-sender/main.go create mode 100644 test/test_images/request-sender/pod.yaml rename test/{lib/tracing/config.go => test_images/utils.go} (61%) diff --git a/test/conformance/channel_message_modes_specversion_test.go b/test/conformance/channel_data_plane_test.go similarity index 71% rename from test/conformance/channel_message_modes_specversion_test.go rename to test/conformance/channel_data_plane_test.go index 7cc5309b6ec..d68614a579e 100644 --- a/test/conformance/channel_message_modes_specversion_test.go +++ b/test/conformance/channel_data_plane_test.go @@ -25,6 +25,10 @@ import ( testlib "knative.dev/eventing/test/lib" ) -func TestChannelEventModesAndSpecVersions(t *testing.T) { - helpers.ChannelMessageModesAndSpecVersionsTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop) +func TestChannelDataPlaneSuccess(t *testing.T) { + helpers.ChannelDataPlaneSuccessTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop) +} + +func TestChannelDataPlaneFailure(t *testing.T) { + helpers.ChannelDataPlaneFailureTestRunner(t, channelTestRunner, testlib.SetupClientOptionNoop) } diff --git a/test/conformance/helpers/channel_message_modes_specversion_helper.go b/test/conformance/helpers/channel_data_plane_helper.go similarity index 50% rename from test/conformance/helpers/channel_message_modes_specversion_helper.go rename to test/conformance/helpers/channel_data_plane_helper.go index 9bea1add21d..b64fbb33b0b 100644 --- a/test/conformance/helpers/channel_message_modes_specversion_helper.go +++ b/test/conformance/helpers/channel_data_plane_helper.go @@ -17,7 +17,9 @@ limitations under the License. package helpers import ( + "fmt" "net/http" + "strconv" "strings" "testing" @@ -32,8 +34,8 @@ import ( "knative.dev/eventing/test/lib/sender" ) -// ChannelMessageModesAndSpecVersionsTestRunner tests the support of the channel ingress for different spec versions and message modes -func ChannelMessageModesAndSpecVersionsTestRunner( +// ChannelDataPlaneSuccessTestRunner tests the support of the channel ingress for different spec versions and message modes +func ChannelDataPlaneSuccessTestRunner( t *testing.T, channelTestRunner testlib.ComponentsTestRunner, options ...testlib.SetupClientOption, @@ -46,6 +48,7 @@ func ChannelMessageModesAndSpecVersionsTestRunner( var testCases []testCase + // Generate matrix events/encoding/spec versions for _, event := range []cloudevents.Event{MinEvent(), FullEvent()} { for _, enc := range []cloudevents.Encoding{cloudevents.EncodingBinary, cloudevents.EncodingStructured} { for _, version := range []string{cloudevents.VersionV03, cloudevents.VersionV1} { @@ -57,14 +60,14 @@ func ChannelMessageModesAndSpecVersionsTestRunner( channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { for _, tc := range testCases { t.Run(tc.event.ID()+"_encoding_"+tc.encoding.String()+"_version_"+tc.version, func(t *testing.T) { - messageModeSpecVersionTest(t, channel, tc.event, tc.encoding, tc.version, options...) + channelDataPlaneSuccessTest(t, channel, tc.event, tc.encoding, tc.version, options...) }) } }) } // Sender -> Channel -> Subscriber -> Record Events -func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event cloudevents.Event, encoding cloudevents.Encoding, version string, options ...testlib.SetupClientOption) { +func channelDataPlaneSuccessTest(t *testing.T, channel metav1.TypeMeta, event cloudevents.Event, encoding cloudevents.Encoding, version string, options ...testlib.SetupClientOption) { client := testlib.Setup(t, true, options...) defer testlib.TearDown(client) @@ -133,3 +136,103 @@ func messageModeSpecVersionTest(t *testing.T, channel metav1.TypeMeta, event clo recordevents.MatchEvent(sender.MatchStatusCode(http.StatusAccepted)), ) } + +// ChannelDataPlaneFailureTestRunner tests some status codes from the spec +func ChannelDataPlaneFailureTestRunner( + t *testing.T, + channelTestRunner testlib.ComponentsTestRunner, + options ...testlib.SetupClientOption, +) { + testCases := []struct { + statusCode int + senderFn func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string) + eventId string + }{{ + statusCode: http.StatusMethodNotAllowed, + senderFn: func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string) { + c.SendRequestToAddressable( + senderName, + channelName, + &channel, + map[string]string{ + "ce-specversion": "1.0", + "ce-type": "example", + "ce-source": "http://localhost", + "ce-id": eventId, + "content-type": "application/json", + }, + "{}", + sender.WithMethod("PUT"), + sender.WithResponseSink(responseSink), + ) + }, + }, { + statusCode: http.StatusBadRequest, + senderFn: func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string) { + c.SendRequestToAddressable( + senderName, + channelName, + &channel, + map[string]string{ + "ce-specversion": "10.0", // <-- Spec version not existing + "ce-type": "example", + "ce-source": "http://localhost", + "ce-id": eventId, + "content-type": "application/json", + }, + "{}", + sender.WithResponseSink(responseSink), + ) + }, + }} + + channelTestRunner.RunTests(t, testlib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) { + for _, tc := range testCases { + t.Run("expecting-"+strconv.Itoa(tc.statusCode), func(t *testing.T) { + channelDataPlaneFailureTest(t, channel, tc.senderFn, tc.statusCode, options...) + }) + } + }) +} + +// (Request) Sender -> Channel -> Subscriber -> Record Events +func channelDataPlaneFailureTest( + t *testing.T, + channel metav1.TypeMeta, + senderFn func(c *testlib.Client, senderName string, channelName string, channel metav1.TypeMeta, eventId string, responseSink string), + expectingStatusCode int, + options ...testlib.SetupClientOption, +) { + client := testlib.Setup(t, true, options...) + defer testlib.TearDown(client) + + resourcesNamePrefix := fmt.Sprintf("expecting-%d", expectingStatusCode) + + channelName := resourcesNamePrefix + "-ch" + client.CreateChannelOrFail(channelName, &channel) + + subscriberName := resourcesNamePrefix + "-recordevents" + eventTracker, _ := recordevents.StartEventRecordOrFail(client, subscriberName) + + client.CreateSubscriptionOrFail( + resourcesNamePrefix+"-sub", + channelName, + &channel, + resources.WithSubscriberForSubscription(subscriberName), + ) + + client.WaitForAllTestResourcesReadyOrFail() + + eventId := "xyz" + senderFn(client, resourcesNamePrefix+"-sender", channelName, channel, eventId, "http://"+client.GetServiceHost(subscriberName)) + + eventTracker.AssertExact( + 1, + recordevents.MatchEvent(sender.MatchStatusCode(expectingStatusCode)), + ) + + // Assert the event is not received + eventTracker.AssertNot( + recordevents.MatchEvent(HasId(eventId)), + ) +} diff --git a/test/lib/client.go b/test/lib/client.go index 9fab2cbebbb..29d9f57aca8 100644 --- a/test/lib/client.go +++ b/test/lib/client.go @@ -34,7 +34,7 @@ import ( configtracing "knative.dev/pkg/tracing/config" eventing "knative.dev/eventing/pkg/client/clientset/versioned" - "knative.dev/eventing/test/lib/tracing" + "knative.dev/eventing/test/test_images" ) // Client holds instances of interfaces for making requests to Knative. @@ -109,5 +109,5 @@ func getTracingConfig(c *kubernetes.Clientset) (corev1.EnvVar, error) { return corev1.EnvVar{}, fmt.Errorf("error while serializing the config-tracing config map: %+v", errors.WithStack(err)) } - return corev1.EnvVar{Name: tracing.ConfigTracingEnv, Value: configSerialized}, nil + return corev1.EnvVar{Name: test_images.ConfigTracingEnv, Value: configSerialized}, nil } diff --git a/test/lib/send_event.go b/test/lib/send_event.go index 171cfce5693..94466ee1701 100644 --- a/test/lib/send_event.go +++ b/test/lib/send_event.go @@ -18,6 +18,7 @@ package lib import ( "github.com/pkg/errors" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" pkgTest "knative.dev/pkg/test" @@ -32,7 +33,7 @@ func (c *Client) SendEventToAddressable( addressableName string, typemeta *metav1.TypeMeta, event cloudevents.Event, - option ...sender.EventSenderOption, + option ...func(*corev1.Pod), ) { uri, err := c.GetAddressableURI(addressableName, typemeta) if err != nil { @@ -46,7 +47,7 @@ func (c *Client) SendEvent( senderName string, uri string, event cloudevents.Event, - option ...sender.EventSenderOption, + option ...func(*corev1.Pod), ) { namespace := c.Namespace pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...) @@ -58,3 +59,38 @@ func (c *Client) SendEvent( c.T.Fatalf("Failed to send event %v to %s: %+v", event, uri, errors.WithStack(err)) } } + +// SendRequestToAddressable will send the given request to the given Addressable. +func (c *Client) SendRequestToAddressable( + senderName, + addressableName string, + typemeta *metav1.TypeMeta, + headers map[string]string, + body string, + option ...func(*corev1.Pod), +) { + uri, err := c.GetAddressableURI(addressableName, typemeta) + if err != nil { + c.T.Fatalf("Failed to get the URI for %+v-%s", typemeta, addressableName) + } + c.SendRequest(senderName, uri, headers, body, option...) +} + +// SendRequest will create a sender pod, which will send the given request to the given url. +func (c *Client) SendRequest( + senderName string, + uri string, + headers map[string]string, + body string, + option ...func(*corev1.Pod), +) { + namespace := c.Namespace + pod, err := sender.RequestSenderPod("request-sender", senderName, uri, headers, body, option...) + if err != nil { + c.T.Fatalf("Failed to create request-sender pod: %+v", errors.WithStack(err)) + } + c.CreatePodOrFail(pod) + if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil { + c.T.Fatalf("Failed to send request to %s: %+v", uri, errors.WithStack(err)) + } +} diff --git a/test/lib/sender/events.go b/test/lib/sender/events.go index f01b53e709f..d358e2ac78b 100644 --- a/test/lib/sender/events.go +++ b/test/lib/sender/events.go @@ -17,6 +17,7 @@ limitations under the License. package sender import ( + "net/http" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -46,3 +47,15 @@ func NewSenderEvent(id string, source string, event *cloudevents.Event, result * return ev } + +func NewSenderEventFromRaw(id string, source string, response *http.Response) cloudevents.Event { + ev := cloudevents.NewEvent() + ev.SetID(id) + ev.SetSource(source) + ev.SetType(EventType) + ev.SetTime(time.Now()) + + ev.SetExtension(ResponseStatusCodeExtension, response.StatusCode) + + return ev +} diff --git a/test/lib/sender/resources.go b/test/lib/sender/resources.go index a13a993d333..697c805cc00 100644 --- a/test/lib/sender/resources.go +++ b/test/lib/sender/resources.go @@ -26,10 +26,8 @@ import ( pkgTest "knative.dev/pkg/test" ) -type EventSenderOption func(*corev1.Pod) - // EnableTracing enables tracing in sender pod -func EnableTracing() EventSenderOption { +func EnableTracing() func(*corev1.Pod) { return func(pod *corev1.Pod) { pod.Spec.Containers[0].Args = append( pod.Spec.Containers[0].Args, @@ -39,8 +37,8 @@ func EnableTracing() EventSenderOption { } } -// EnableIncrementalId creates a new incremental id for each event sent from the sender pod -func EnableIncrementalId() EventSenderOption { +// EnableIncrementalId creates a new incremental id for each event sent from the sender pod. Supported only by event-sender +func EnableIncrementalId() func(*corev1.Pod) { return func(pod *corev1.Pod) { pod.Spec.Containers[0].Args = append( pod.Spec.Containers[0].Args, @@ -50,8 +48,8 @@ func EnableIncrementalId() EventSenderOption { } } -// WithEncoding forces the encoding of the event to send from the sender pod -func WithEncoding(encoding cloudevents.Encoding) EventSenderOption { +// WithEncoding forces the encoding of the event to send from the sender pod. Supported only by event-sender +func WithEncoding(encoding cloudevents.Encoding) func(*corev1.Pod) { return func(pod *corev1.Pod) { pod.Spec.Containers[0].Args = append( pod.Spec.Containers[0].Args, @@ -62,7 +60,7 @@ func WithEncoding(encoding cloudevents.Encoding) EventSenderOption { } // WithResponseSink sends the response information as CloudEvent to another sink -func WithResponseSink(responseSink string) EventSenderOption { +func WithResponseSink(responseSink string) func(*corev1.Pod) { return func(pod *corev1.Pod) { pod.Spec.Containers[0].Args = append( pod.Spec.Containers[0].Args, @@ -72,24 +70,19 @@ func WithResponseSink(responseSink string) EventSenderOption { } } -// WithEncoding forces the encoding of the event to send from the sender pod -func WithAdditionalHeaders(headers map[string]string) EventSenderOption { - var kv []string - for k, v := range headers { - kv = append(kv, k+"="+v) - } - serializedHeaders := strings.Join(kv, ",") +// WithEncoding forces the encoding of the event to send from the sender pod. Supported only by event-sender +func WithAdditionalHeaders(headers map[string]string) func(*corev1.Pod) { return func(pod *corev1.Pod) { pod.Spec.Containers[0].Args = append( pod.Spec.Containers[0].Args, "-additional-headers", - serializedHeaders, + serializeHeaders(headers), ) } } // EventSenderPod creates a Pod that sends events to the given address. -func EventSenderPod(imageName string, name string, sink string, event cloudevents.Event, options ...EventSenderOption) (*corev1.Pod, error) { +func EventSenderPod(imageName string, name string, sink string, event cloudevents.Event, options ...func(*corev1.Pod)) (*corev1.Pod, error) { encodedEvent, err := json.Marshal(event) if err != nil { return nil, err @@ -124,3 +117,56 @@ func EventSenderPod(imageName string, name string, sink string, event cloudevent return p, nil } + +// WithMethod configures the method used to send the http request. Supported only by request-sender +func WithMethod(method string) func(*corev1.Pod) { + return func(pod *corev1.Pod) { + pod.Spec.Containers[0].Args = append( + pod.Spec.Containers[0].Args, + "-method", + method, + ) + } +} + +// EventSenderPod creates a Pod that sends http requests to the given address. +func RequestSenderPod(imageName string, name string, sink string, headers map[string]string, body string, options ...func(*corev1.Pod)) (*corev1.Pod, error) { + args := []string{ + "-sink", + sink, + "-headers", + serializeHeaders(headers), + "-body", + body, + } + + p := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: imageName, + Image: pkgTest.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + Args: args, + }}, + // Never restart the event sender Pod. + RestartPolicy: corev1.RestartPolicyNever, + }, + } + + for _, opt := range options { + opt(p) + } + + return p, nil +} + +func serializeHeaders(headers map[string]string) string { + var kv []string + for k, v := range headers { + kv = append(kv, k+"="+v) + } + return strings.Join(kv, ",") +} diff --git a/test/test_images/event-sender/main.go b/test/test_images/event-sender/main.go index 99a0da982ea..3c3a1cec2fc 100644 --- a/test/test_images/event-sender/main.go +++ b/test/test_images/event-sender/main.go @@ -23,8 +23,6 @@ import ( "fmt" "log" nethttp "net/http" - "strconv" - "strings" "time" cloudevents "github.com/cloudevents/sdk-go/v2" @@ -34,7 +32,7 @@ import ( "knative.dev/pkg/tracing/propagation/tracecontextb3" "knative.dev/eventing/test/lib/sender" - "knative.dev/eventing/test/lib/tracing" + "knative.dev/eventing/test/test_images" ) var ( @@ -44,7 +42,7 @@ var ( eventEncoding string periodStr string delayStr string - maxMsgStr string + maxMsg int addTracing bool addSequence bool incrementalId bool @@ -58,32 +56,17 @@ func init() { flag.StringVar(&eventEncoding, "event-encoding", "binary", "The encoding of the cloud event: [binary, structured].") flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") flag.StringVar(&delayStr, "delay", "5", "The number of seconds to wait before sending messages.") - flag.StringVar(&maxMsgStr, "max-messages", "1", "The number of messages to attempt to send. 0 for unlimited.") + flag.IntVar(&maxMsg, "max-messages", 1, "The number of messages to attempt to send. 0 for unlimited.") flag.BoolVar(&addTracing, "add-tracing", false, "Should tracing be added to events sent.") flag.BoolVar(&addSequence, "add-sequence-extension", false, "Should add extension 'sequence' identifying the sequence number.") flag.BoolVar(&incrementalId, "incremental-id", false, "Override the event id with an incremental id.") flag.StringVar(&additionalHeaders, "additional-headers", "", "Additional non-CloudEvents headers to send") } -func parseDurationStr(durationStr string, defaultDuration int) time.Duration { - var duration time.Duration - if d, err := strconv.Atoi(durationStr); err != nil { - duration = time.Duration(defaultDuration) * time.Second - } else { - duration = time.Duration(d) * time.Second - } - return duration -} - func main() { flag.Parse() - period := parseDurationStr(periodStr, 5) - delay := parseDurationStr(delayStr, 5) - - maxMsg := 1 - if m, err := strconv.Atoi(maxMsgStr); err == nil { - maxMsg = m - } + period := test_images.ParseDurationStr(periodStr, 5) + delay := test_images.ParseDurationStr(delayStr, 5) if delay > 0 { log.Printf("will sleep for %s", delay) @@ -115,9 +98,8 @@ func main() { ) } if additionalHeaders != "" { - for _, kv := range strings.Split(additionalHeaders, ",") { - splitted := strings.Split(kv, "=") - httpOpts = append(httpOpts, cloudevents.WithHeader(splitted[0], splitted[1])) + for k, v := range test_images.ParseHeaders(additionalHeaders) { + httpOpts = append(httpOpts, cloudevents.WithHeader(k, v[0])) } } @@ -130,7 +112,7 @@ func main() { if addTracing { log.Println("Adding tracing") logger, _ := zap.NewDevelopment() - if err := tracing.ConfigureTracing(logger.Sugar(), ""); err != nil { + if err := test_images.ConfigureTracing(logger.Sugar(), ""); err != nil { log.Fatalf("Unable to setup trace publishing: %v", err) } diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 7aca97ac9af..b0bb15917a7 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -32,7 +32,7 @@ import ( "knative.dev/eventing/pkg/kncloudevents" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/recordevents" - "knative.dev/eventing/test/lib/tracing" + "knative.dev/eventing/test/test_images" ) type eventRecorder struct { @@ -150,7 +150,7 @@ func main() { er.StartServer(recordevents.RecordEventsPort) logger, _ := zap.NewDevelopment() - if err := tracing.ConfigureTracing(logger.Sugar(), ""); err != nil { + if err := test_images.ConfigureTracing(logger.Sugar(), ""); err != nil { log.Fatalf("Unable to setup trace publishing: %v", err) } diff --git a/test/test_images/request-sender/main.go b/test/test_images/request-sender/main.go new file mode 100644 index 00000000000..ee99bb7b18d --- /dev/null +++ b/test/test_images/request-sender/main.go @@ -0,0 +1,136 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "flag" + "io" + "log" + nethttp "net/http" + "strings" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "go.opencensus.io/plugin/ochttp" + "knative.dev/pkg/tracing/propagation/tracecontextb3" + + "knative.dev/eventing/test/lib/sender" + "knative.dev/eventing/test/test_images" +) + +var ( + sink string + method string + responseSink string + inputHeaders string + inputBody string + periodStr string + delayStr string + maxMsg int + addTracing bool +) + +func init() { + flag.StringVar(&sink, "sink", "", "The sink url for the message destination.") + flag.StringVar(&method, "method", "POST", "HTTP Method to send.") + flag.StringVar(&responseSink, "response-sink", "", "The response sink url to send the response.") + flag.StringVar(&inputHeaders, "headers", "", "HTTP Headers to send.") + flag.StringVar(&inputBody, "body", "", "HTTP body to send.") + flag.StringVar(&periodStr, "period", "5", "The number of seconds between messages.") + flag.StringVar(&delayStr, "delay", "5", "The number of seconds to wait before sending messages.") + flag.IntVar(&maxMsg, "max-messages", 1, "The number of messages to attempt to send. 0 for unlimited.") + flag.BoolVar(&addTracing, "add-tracing", false, "Should tracing be added to events sent.") +} + +func main() { + flag.Parse() + period := test_images.ParseDurationStr(periodStr, 5) + delay := test_images.ParseDurationStr(delayStr, 5) + + if delay > 0 { + log.Printf("will sleep for %s", delay) + time.Sleep(delay) + log.Printf("awake, continuing") + } + + // I need the httpClient to report to responseSink + ceClient, err := cloudevents.NewDefaultClient() + if err != nil { + log.Fatalf("failed to create httpClient, %v", err) + } + + httpClient := &nethttp.Client{ + Transport: &ochttp.Transport{ + Base: nethttp.DefaultTransport, + Propagation: tracecontextb3.TraceContextEgress, + }, + } + + sequence := 0 + + ticker := time.NewTicker(period) + for { + sequence++ + + var body io.Reader + if inputBody != "" { + body = strings.NewReader(inputBody) + log.Printf("Using body: '%s'", inputBody) + } + + request, err := nethttp.NewRequest(method, sink, body) + if err != nil { + log.Fatalf("Cannot create request: %s", err.Error()) + } + + if inputHeaders != "" { + for k, v := range test_images.ParseHeaders(inputHeaders) { + request.Header.Set(k, v[0]) + log.Printf("Using header %s: %s", k, v[0]) + } + } + + response, err := httpClient.Do(request) + if err != nil { + log.Fatalf("Error while executing HTTP request: %v", err.Error()) + } + + responseEvent := sender.NewSenderEventFromRaw( + uuid.New().String(), + "https://knative.dev/eventing/test/event-sender", + response, + ) + + if responseSink != "" { + res := ceClient.Send(cloudevents.ContextWithTarget(context.Background(), responseSink), responseEvent) + if cloudevents.IsUndelivered(res) { + log.Printf("send to response sink returned an error: %v\n", res) + } else { + log.Printf("Got response from %s\n%s\n", responseSink, res) + } + } + + // Wait for next tick + <-ticker.C + // Only send a limited number of messages. + if maxMsg != 0 && maxMsg == sequence { + return + } + } +} diff --git a/test/test_images/request-sender/pod.yaml b/test/test_images/request-sender/pod.yaml new file mode 100644 index 00000000000..20a694ca03e --- /dev/null +++ b/test/test_images/request-sender/pod.yaml @@ -0,0 +1,9 @@ +apiVersion: v1 +kind: Pod +metadata: + name: request-sender +spec: + containers: + - name: request-sender + image: ko://knative.dev/eventing/test/test_images/request-sender + diff --git a/test/test_images/transformevents/main.go b/test/test_images/transformevents/main.go index b359acae13a..651787eea54 100644 --- a/test/test_images/transformevents/main.go +++ b/test/test_images/transformevents/main.go @@ -22,7 +22,7 @@ import ( "log" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/test/lib/tracing" + "knative.dev/eventing/test/test_images" cloudevents "github.com/cloudevents/sdk-go/v2" "go.uber.org/zap" @@ -69,7 +69,7 @@ func main() { flag.Parse() logger, _ := zap.NewDevelopment() - if err := tracing.ConfigureTracing(logger.Sugar(), ""); err != nil { + if err := test_images.ConfigureTracing(logger.Sugar(), ""); err != nil { log.Fatalf("Unable to setup trace publishing: %v", err) } diff --git a/test/lib/tracing/config.go b/test/test_images/utils.go similarity index 61% rename from test/lib/tracing/config.go rename to test/test_images/utils.go index 6a762994edf..fbfafee7f53 100644 --- a/test/lib/tracing/config.go +++ b/test/test_images/utils.go @@ -14,17 +14,40 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tracing +package test_images import ( + "net/http" "os" + "strconv" + "strings" + "time" "go.uber.org/zap" - tracingconfig "knative.dev/pkg/tracing/config" + "knative.dev/pkg/tracing/config" "knative.dev/eventing/pkg/tracing" ) +func ParseHeaders(serializedHeaders string) http.Header { + h := make(http.Header) + for _, kv := range strings.Split(serializedHeaders, ",") { + splitted := strings.Split(kv, "=") + h.Set(splitted[0], splitted[1]) + } + return h +} + +func ParseDurationStr(durationStr string, defaultDuration int) time.Duration { + var duration time.Duration + if d, err := strconv.Atoi(durationStr); err != nil { + duration = time.Duration(defaultDuration) * time.Second + } else { + duration = time.Duration(d) * time.Second + } + return duration +} + const ConfigTracingEnv = "K_CONFIG_TRACING" // ConfigureTracing can be used in test-images to configure tracing @@ -35,7 +58,7 @@ func ConfigureTracing(logger *zap.SugaredLogger, serviceName string) error { return tracing.SetupStaticPublishing(logger, serviceName, tracing.AlwaysSample) } - conf, err := tracingconfig.JsonToTracingConfig(tracingEnv) + conf, err := config.JsonToTracingConfig(tracingEnv) if err != nil { return err }