From 26ce58aa728cd8558bf149c6b1267e847b1039e9 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 5 Jun 2020 10:40:59 -0400 Subject: [PATCH 1/2] Port Sequence Test to new test images --- test/e2e/helpers/sequence_test_helper.go | 51 ++++++++++++------------ 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/test/e2e/helpers/sequence_test_helper.go b/test/e2e/helpers/sequence_test_helper.go index 5ec60acb042..ac6cb8431d0 100644 --- a/test/e2e/helpers/sequence_test_helper.go +++ b/test/e2e/helpers/sequence_test_helper.go @@ -16,18 +16,17 @@ limitations under the License. package helpers import ( - "encoding/json" "fmt" "testing" + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/uuid" "knative.dev/eventing/pkg/apis/flows/v1beta1" messagingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" eventingtesting "knative.dev/eventing/pkg/reconciler/testing/v1beta1" "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/cloudevents" "knative.dev/eventing/test/lib/resources" duckv1 "knative.dev/pkg/apis/duck/v1" ) @@ -39,9 +38,9 @@ func SequenceTestHelper(t *testing.T, sequenceName = "e2e-sequence" senderPodName = "e2e-sequence-sender-pod" - channelName = "e2e-sequence-channel" - subscriptionName = "e2e-sequence-subscription" - loggerPodName = "e2e-sequence-logger-pod" + channelName = "e2e-sequence-channel" + subscriptionName = "e2e-sequence-subscription" + recordEventsPodName = "e2e-sequence-recordevents-pod" ) stepSubscriberConfigs := []struct { @@ -90,15 +89,20 @@ func SequenceTestHelper(t *testing.T, // must be Addressable. In the future if we use Knative Serving in the tests, we can // make the logger service as a Knative service, and remove the channel and subscription. client.CreateChannelOrFail(channelName, &channel) - // create logger service as the subscriber - loggerPod := resources.EventLoggerPod(loggerPodName) - client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName)) + // create event logger pod and service as the subscriber + recordEventsPod := resources.EventRecordPod(recordEventsPodName) + client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName)) + eventTracker, err := client.NewEventInfoStore(recordEventsPodName, t.Logf) + if err != nil { + t.Fatalf("Pod tracker failed: %v", err) + } + defer eventTracker.Cleanup() // create subscription to subscribe the channel, and forward the received events to the logger service client.CreateSubscriptionOrFail( subscriptionName, channelName, &channel, - resources.WithSubscriberForSubscription(loggerPodName), + resources.WithSubscriberForSubscription(recordEventsPodName), ) replyRef := &duckv1.KReference{Kind: channel.Kind, APIVersion: channel.APIVersion, Name: channelName, Namespace: client.Namespace} @@ -117,19 +121,18 @@ func SequenceTestHelper(t *testing.T, // wait for all test resources to be ready, so that we can start sending events client.WaitForAllTestResourcesReadyOrFail() - // send fake CloudEvent to the Sequence - msg := fmt.Sprintf("TestSequence %s", uuid.NewUUID()) - // NOTE: the eventData format must be BaseData, as it needs to be correctly parsed in the stepper service. - eventData := cloudevents.BaseData{Message: msg} - eventDataBytes, err := json.Marshal(eventData) - if err != nil { - st.Fatalf("Failed to convert %v to json: %v", eventData, err) + // send CloudEvent to the Sequence + event := cloudevents.NewEvent() + event.SetID("dummy") + eventSource := fmt.Sprintf("http://%s.svc/", senderPodName) + event.SetSource(eventSource) + event.SetType(lib.DefaultEventType) + msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) + body := fmt.Sprintf(`{"msg":"%s"}`, msg) + if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - event := cloudevents.New( - string(eventDataBytes), - cloudevents.WithSource(senderPodName), - ) - client.SendFakeEventToAddressableOrFail( + client.SendEventToAddressable( senderPodName, sequenceName, lib.FlowsSequenceTypeMeta, @@ -140,8 +143,6 @@ func SequenceTestHelper(t *testing.T, for _, config := range stepSubscriberConfigs { expectedMsg += config.msgAppender } - if err := client.CheckLog(loggerPodName, lib.CheckerContains(expectedMsg)); err != nil { - st.Fatalf("String %q not found in logs of logger pod %q: %v", expectedMsg, loggerPodName, err) - } + eventTracker.AssertWaitMatchSourceData(t, recordEventsPodName, eventSource, expectedMsg, 1, 1) }) } From dfb4114776ff6f7fef8f741c33046e864ea80a83 Mon Sep 17 00:00:00 2001 From: Nicolas Lopez Date: Fri, 5 Jun 2020 13:03:28 -0400 Subject: [PATCH 2/2] fixes --- test/test_images/sequencestepper/main.go | 53 +++++++++++++----------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/test/test_images/sequencestepper/main.go b/test/test_images/sequencestepper/main.go index 89118b97ae7..5fc4417a5c1 100644 --- a/test/test_images/sequencestepper/main.go +++ b/test/test_images/sequencestepper/main.go @@ -18,12 +18,11 @@ package main import ( "context" + "encoding/json" "flag" "log" - ce "github.com/cloudevents/sdk-go" - - "knative.dev/eventing/test/lib/cloudevents" + cloudevents "github.com/cloudevents/sdk-go/v2" ) var ( @@ -34,43 +33,49 @@ func init() { flag.StringVar(&eventMsgAppender, "msg-appender", "", "a string we want to append on the event message") } -func gotEvent(event ce.Event, resp *ce.EventResponse) error { - ctx := event.Context.AsV1() - - data := &cloudevents.BaseData{} - if err := event.DataAs(data); err != nil { - log.Printf("Got Data Error: %s\n", err.Error()) - return err - } - +func gotEvent(event cloudevents.Event) (*cloudevents.Event, error) { log.Println("Received a new event: ") - log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), data) + log.Printf("[%s] %s %s: %s", event.Time().String(), event.Source(), event.Type(), string(event.Data())) + + outputEvent := event.Clone() // append eventMsgAppender to message of the data - data.Message = data.Message + eventMsgAppender - r := ce.Event{ - Context: ctx, - Data: data, + var data map[string]interface{} + if err := json.Unmarshal(event.Data(), &data); err != nil { + return nil, err + } + data["msg"] = data["msg"].(string) + eventMsgAppender + if eventData, err := json.Marshal(&data); err != nil { + return nil, err + } else if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(eventData)); err != nil { + return nil, err } - - r.SetDataContentType(ce.ApplicationJSON) log.Println("Transform the event to: ") - log.Printf("[%s] %s %s: %+v", ctx.Time.String(), ctx.GetSource(), ctx.GetType(), data) + log.Printf("[%s] %s %s: %s", outputEvent.Time().String(), outputEvent.Source(), outputEvent.Type(), string(outputEvent.Data())) - resp.RespondWith(200, &r) - return nil + return &outputEvent, nil } func main() { // parse the command line flags flag.Parse() - c, err := ce.NewDefaultClient() + t, err := cloudevents.NewHTTP(cloudevents.WithPort(8080)) + if err != nil { + log.Fatalf("failed to create transport, %v", err) + } + + c, err := cloudevents.NewClient(t, + cloudevents.WithTimeNow(), + cloudevents.WithUUIDs(), + ) if err != nil { log.Fatalf("failed to create client, %v", err) } log.Printf("listening on 8080") - log.Fatalf("failed to start receiver: %s", c.StartReceiver(context.Background(), gotEvent)) + if err := c.StartReceiver(context.Background(), gotEvent); err != nil { + log.Fatalf("failed to start receiver: %s", err) + } }