diff --git a/go.mod b/go.mod index 6d73ed8b61d..069d1f3a770 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 - github.com/prometheus/common v0.9.1 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/test/conformance/helpers/broker_data_plane_test_helper.go b/test/conformance/helpers/broker_data_plane_test_helper.go index 9facba44571..5666a45fdda 100644 --- a/test/conformance/helpers/broker_data_plane_test_helper.go +++ b/test/conformance/helpers/broker_data_plane_test_helper.go @@ -276,14 +276,16 @@ func BrokerV1Beta1ConsumerDataPlaneTestHelper( } transformMsg := []byte(`{"msg":"Transformed!"}`) - transformPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + ctx, + client, transformerName, - "reply-check-type", - "reply-check-source", - transformMsg, + recordevents.ReplyWithTransformedEvent( + "reply-check-type", + "reply-check-source", + string(transformMsg), + ), ) - client.CreatePodOrFail(transformPod, testlib.WithService(transformerName)) - client.WaitForAllTestResourcesReadyOrFail(ctx) trigger := client.CreateTriggerOrFailV1Beta1( triggerName, diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go index f13ecc9df07..1de5a4c1643 100644 --- a/test/conformance/helpers/broker_tracing_test_helper.go +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -92,15 +92,18 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes resources.WithSubscriberServiceRefForTriggerV1Beta1(loggerPodName), ) - // Create a transformer (EventTransfrmer) Pod that replies with the same event as the input, + // Create a transformer Pod (recordevents with transform reply) that replies with the same event as the input, // except the reply's event's type is changed to bar. - eventTransformerPod := resources.EventTransformationPod( + eventTransformerPod := recordevents.DeployEventRecordOrFail( + ctx, + client, "transformer", - etLogger, - senderName, - []byte(eventBody), + recordevents.ReplyWithTransformedEvent( + etLogger, + senderName, + eventBody, + ), ) - client.CreatePodOrFail(eventTransformerPod, testlib.WithService(eventTransformerPod.Name)) // Create a Trigger that receives events (type=foo) and sends them to the transformer Pod. transformerTrigger := client.CreateTriggerOrFailV1Beta1( diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 35bf6ab584d..c5a7a3df1a8 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -73,13 +73,16 @@ func setupChannelTracingWithReply( recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName) // Create the subscriber, a Pod that mutates the event. - transformerPod := resources.EventTransformationPod( + transformerPod := recordevents.DeployEventRecordOrFail( + ctx, + client, "transformer", - "mutated", - eventSource, - nil, + recordevents.ReplyWithTransformedEvent( + "mutated", + eventSource, + "", + ), ) - client.CreatePodOrFail(transformerPod, testlib.WithService(transformerPod.Name)) // Create the Subscription linking the Channel to the mutator. client.CreateSubscriptionOrFail( diff --git a/test/e2e/helpers/broker_channel_flow_helper.go b/test/e2e/helpers/broker_channel_flow_helper.go index 0e3b3adb6a8..16fc73b9481 100644 --- a/test/e2e/helpers/broker_channel_flow_helper.go +++ b/test/e2e/helpers/broker_channel_flow_helper.go @@ -107,13 +107,16 @@ func BrokerChannelFlowWithTransformation( } // create the transformation service for trigger1 - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + ctx, + client, transformationPodName, - transformedEventType, - transformedEventSource, - []byte(transformedBody), + recordevents.ReplyWithTransformedEvent( + transformedEventType, + transformedEventSource, + transformedBody, + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create trigger1 to receive the original event, and do event transformation if triggerVersion == "v1" { diff --git a/test/e2e/helpers/broker_event_transformation_test_helper.go b/test/e2e/helpers/broker_event_transformation_test_helper.go index f20d3a011cb..be549ba8632 100644 --- a/test/e2e/helpers/broker_event_transformation_test_helper.go +++ b/test/e2e/helpers/broker_event_transformation_test_helper.go @@ -73,13 +73,16 @@ func EventTransformationForTriggerTestHelper( client.WaitForResourceReadyOrFail(brokerName, testlib.BrokerTypeMeta) // create the transformation service - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + ctx, + client, transformationPodName, - transformedEventType, - transformedEventSource, - []byte(transformedBody), + recordevents.ReplyWithTransformedEvent( + transformedEventType, + transformedEventSource, + transformedBody, + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create trigger1 for event transformation if triggerVersion == "v1" { diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index 87de6c19848..c596974eff4 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -65,13 +65,16 @@ func EventTransformationForSubscriptionTestHelper( if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil { t.Fatalf("Cannot set the payload of the event: %s", err.Error()) } - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + ctx, + client, transformationPodName, - eventAfterTransformation.Type(), - eventAfterTransformation.Source(), - eventAfterTransformation.Data(), + recordevents.ReplyWithTransformedEvent( + eventAfterTransformation.Type(), + eventAfterTransformation.Source(), + string(eventAfterTransformation.Data()), + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create event logger pod and service as the subscriber eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName) diff --git a/test/e2e/helpers/parallel_test_helper.go b/test/e2e/helpers/parallel_test_helper.go index 4c4c4416cb8..2ccd4d5769d 100644 --- a/test/e2e/helpers/parallel_test_helper.go +++ b/test/e2e/helpers/parallel_test_helper.go @@ -74,13 +74,20 @@ func ParallelTestHelper( for branchNumber, cse := range tc.branchesConfig { // construct filter services filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber) - filterPod := resources.EventFilteringPod(filterPodName, cse.filter) - client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName)) + if cse.filter { + recordevents.DeployEventRecordOrFail(ctx, client, filterPodName) + } else { + recordevents.DeployEventRecordOrFail(ctx, client, filterPodName, recordevents.EchoEvent) + } // construct branch subscriber subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber) - subPod := resources.SequenceStepperPod(subPodName, subPodName) - client.CreatePodOrFail(subPod, testlib.WithService(subPodName)) + recordevents.DeployEventRecordOrFail( + ctx, + client, + subPodName, + recordevents.ReplyWithAppendedData(subPodName), + ) parallelBranches[branchNumber] = v1beta1.ParallelBranch{ Filter: &duckv1.Destination{ @@ -183,13 +190,20 @@ func ParallelV1TestHelper( for branchNumber, cse := range tc.branchesConfig { // construct filter services filterPodName := fmt.Sprintf("parallel-%s-branch-%d-filter", tc.name, branchNumber) - filterPod := resources.EventFilteringPod(filterPodName, cse.filter) - client.CreatePodOrFail(filterPod, testlib.WithService(filterPodName)) + if cse.filter { + recordevents.DeployEventRecordOrFail(ctx, client, filterPodName) + } else { + recordevents.DeployEventRecordOrFail(ctx, client, filterPodName, recordevents.EchoEvent) + } // construct branch subscriber subPodName := fmt.Sprintf("parallel-%s-branch-%d-sub", tc.name, branchNumber) - subPod := resources.SequenceStepperPod(subPodName, subPodName) - client.CreatePodOrFail(subPod, testlib.WithService(subPodName)) + recordevents.DeployEventRecordOrFail( + ctx, + client, + subPodName, + recordevents.ReplyWithAppendedData(subPodName), + ) parallelBranches[branchNumber] = flowsv1.ParallelBranch{ Filter: &duckv1.Destination{ diff --git a/test/e2e/helpers/sequence_test_helper.go b/test/e2e/helpers/sequence_test_helper.go index 807384498ee..5f0c44dc0fb 100644 --- a/test/e2e/helpers/sequence_test_helper.go +++ b/test/e2e/helpers/sequence_test_helper.go @@ -76,9 +76,11 @@ func SequenceTestHelper( // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + ctx, client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := v1beta1.SequenceStep{ Destination: duckv1.Destination{ @@ -131,8 +133,7 @@ func SequenceTestHelper( event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -149,7 +150,8 @@ func SequenceTestHelper( } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } @@ -192,9 +194,11 @@ func SequenceV1TestHelper( // create a stepper Pod with Service podName := config.podName msgAppender := config.msgAppender - stepperPod := resources.SequenceStepperPod(podName, msgAppender) + recordevents.DeployEventRecordOrFail( + ctx, client, podName, + recordevents.ReplyWithAppendedData(msgAppender), + ) - client.CreatePodOrFail(stepperPod, testlib.WithService(podName)) // create a new step step := flowsv1.SequenceStep{ Destination: duckv1.Destination{ @@ -247,8 +251,7 @@ func SequenceV1TestHelper( event.SetSource(eventSource) event.SetType(testlib.DefaultEventType) msg := fmt.Sprintf("TestSequence %s", uuid.New().String()) - body := fmt.Sprintf(`{"msg":"%s"}`, msg) - if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil { + if err := event.SetData(cloudevents.TextPlain, msg); err != nil { st.Fatalf("Cannot set the payload of the event: %s", err.Error()) } client.SendEventToAddressable( @@ -265,7 +268,8 @@ func SequenceV1TestHelper( } eventTracker.AssertAtLeast(1, recordevents.MatchEvent( cetest.HasSource(eventSource), - cetest.DataContains(expectedMsg), + cetest.HasDataContentType(cloudevents.TextPlain), + cetest.HasData([]byte(expectedMsg)), )) }) } diff --git a/test/lib/recordevents/event_info_store.go b/test/lib/recordevents/event_info_store.go index ba80a19c255..36d7d11af23 100644 --- a/test/lib/recordevents/event_info_store.go +++ b/test/lib/recordevents/event_info_store.go @@ -28,12 +28,9 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/util/wait" - pkgTest "knative.dev/pkg/test" testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/resources" ) const ( @@ -42,34 +39,6 @@ const ( retryTimeout = 4 * time.Minute ) -type EventRecordOption = func(*corev1.Pod, *testlib.Client) error - -func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { - client.CreateServiceAccountOrFail(name) - client.CreateRoleOrFail(resources.Role(name, - resources.WithRuleForRole(&rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"pods"}, - Verbs: []string{"get"}, - }), - resources.WithRuleForRole(&rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"events"}, - Verbs: []string{rbacv1.VerbAll}, - }), - )) - client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) - - eventRecordPod := EventRecordPod(name, name) - client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) - err := pkgTest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace) - if err != nil { - client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) - } - client.WaitForServiceEndpointsOrFail(ctx, name, 1) - return eventRecordPod -} - // Deploys a new recordevents pod and start the associated EventInfoStore func StartEventRecordOrFail(ctx context.Context, client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) { eventRecordPod := DeployEventRecordOrFail(ctx, client, podName, options...) diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index e9bd4224aac..b57dac9c718 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -25,7 +25,6 @@ import ( cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" - "github.com/prometheus/common/log" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -33,35 +32,61 @@ import ( // Observer is the entry point for sinking events into the event log. type Observer struct { + // Name is the name of this Observer, used to filter if multiple observers. Name string // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - seq uint64 -} - -// New returns an observer that will vent observations to the list of provided -// EventLog instances. It will listen on :8080. -func New(name string, eventLogs ...recordevents.EventLog) *Observer { - return &Observer{ - Name: name, - EventLogs: eventLogs, - } + ctx context.Context + seq uint64 + replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) } type envConfig struct { // ObserverName is used to identify this instance of the observer. ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"` + + // Reply is used to define if the observer should reply back + Reply bool `envconfig:"REPLY" default:"false" required:"false"` + + // The event type to use in the reply, if enabled + ReplyEventType string `envconfig:"REPLY_EVENT_TYPE" default:"" required:"false"` + + // The event source to use in the reply, if enabled + ReplyEventSource string `envconfig:"REPLY_EVENT_SOURCE" default:"" required:"false"` + + // The event data to use in the reply, if enabled + ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"` + + // This string to append in the data field in the reply, if enabled. + // This will threat the data as text/plain field + ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` } -func NewFromEnv(eventLogs ...recordevents.EventLog) *Observer { +func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { var env envConfig if err := envconfig.Process("", &env); err != nil { - log.Fatal("Failed to process env var", err) + logging.FromContext(ctx).Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Observer environment configuration: %+v", env) + + var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + if env.Reply { + logging.FromContext(ctx).Info("Observer will reply with an event") + replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData, env.ReplyAppendData) + } else { + logging.FromContext(ctx).Info("Observer won't reply with an event") + replyFunc = NoOpReply } - return New(env.ObserverName, eventLogs...) + return &Observer{ + Name: env.ObserverName, + EventLogs: eventLogs, + ctx: ctx, + replyFunc: replyFunc, + } } // Start will create the CloudEvents client and start listening for inbound @@ -99,7 +124,7 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) if eventErr != nil { eventErrStr = eventErr.Error() } - err := o.EventLogs.Vent(recordevents.EventInfo{ + eventInfo := recordevents.EventInfo{ Error: eventErrStr, Event: event, HTTPHeaders: header, @@ -107,10 +132,11 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Observer: o.Name, Time: time.Now(), Sequence: atomic.AddUint64(&o.seq, 1), - }) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { - log.Warn("Error while venting the recorded event", err) + logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err) } - writer.WriteHeader(http.StatusAccepted) + o.replyFunc(o.ctx, writer, eventInfo) } diff --git a/test/lib/recordevents/observer/reply.go b/test/lib/recordevents/observer/reply.go new file mode 100644 index 00000000000..f60945d1b00 --- /dev/null +++ b/test/lib/recordevents/observer/reply.go @@ -0,0 +1,90 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package observer + +import ( + "context" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "go.uber.org/zap" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.EventInfo) { + writer.WriteHeader(http.StatusAccepted) +} + +func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string, replyAppendData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) { + return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) { + if info.Error != "" { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte(info.Error)) + logging.FromContext(ctx).Warn("Conversion error in the event to send back", info.Error) + return + } + + if info.Event == nil { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte("No event!")) + logging.FromContext(ctx).Warn("No event to send back") + return + } + + outputEvent := info.Event.Clone() + + if replyEventSource != "" { + logging.FromContext(ctx).Infof("Setting reply event source '%s'", replyEventSource) + outputEvent.SetSource(replyEventSource) + } + if replyEventType != "" { + logging.FromContext(ctx).Infof("Setting reply event type '%s'", replyEventType) + outputEvent.SetType(replyEventType) + } + if replyEventData != "" { + logging.FromContext(ctx).Infof("Setting reply event data '%s'", replyAppendData) + if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + if replyAppendData != "" { + var d string + if outputEvent.Data() == nil { + d = replyAppendData + } else { + if err := info.Event.DataAs(&d); err != nil { + logging.FromContext(ctx).Warn("Cannot read the event data as text/plain") + } + d = d + replyAppendData + } + logging.FromContext(ctx).Infof("Setting appended event data '%s'", d) + if err := outputEvent.SetData(cloudevents.TextPlain, d); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + + logging.FromContext(ctx).Infow("Replying with", zap.Stringer("event", outputEvent)) + err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer) + if err != nil { + logging.FromContext(ctx).Warn("Error while writing the event as response", err) + } + } +} diff --git a/test/lib/recordevents/recorder_vent/constructor.go b/test/lib/recordevents/recorder_vent/constructor.go index cc73a9671f0..c4743c67ae8 100644 --- a/test/lib/recordevents/recorder_vent/constructor.go +++ b/test/lib/recordevents/recorder_vent/constructor.go @@ -50,7 +50,7 @@ func NewFromEnv(ctx context.Context) recordevents.EventLog { log.Fatal("Failed to process env var", err) } - logging.FromContext(ctx).Infof("Environment configuration: %+v", env) + logging.FromContext(ctx).Infof("Recorder vent environment configuration: %+v", env) return NewEventLog(ctx, env.AgentName, env.PodName) } diff --git a/test/lib/recordevents/resources.go b/test/lib/recordevents/resources.go index 54726bedfdc..ee8ab4ca781 100644 --- a/test/lib/recordevents/resources.go +++ b/test/lib/recordevents/resources.go @@ -17,14 +17,109 @@ limitations under the License. package recordevents import ( + "context" + + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - "knative.dev/pkg/test" + pkgtest "knative.dev/pkg/test" + + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/resources" ) -// EventRecordPod creates a Pod that stores received events for test retrieval. -func EventRecordPod(name string, serviceAccountName string) *corev1.Pod { +type EventRecordOption = func(*corev1.Pod, *testlib.Client) error + +// EchoEvent is an option to let the recordevents reply with the received event +func EchoEvent(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + return nil +} + +var _ EventRecordOption = EchoEvent + +// ReplyWithTransformedEvent is an option to let the recordevents reply with the transformed event +func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, replyEventData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if replyEventType != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_TYPE", Value: replyEventType}, + ) + } + if replyEventSource != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_SOURCE", Value: replyEventSource}, + ) + } + if replyEventData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_DATA", Value: replyEventData}, + ) + } + + return nil + } +} + +// ReplyWithAppendedData is an option to let the recordevents reply with the transformed event with appended data +func ReplyWithAppendedData(appendData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if appendData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_APPEND_DATA", Value: appendData}, + ) + } + + return nil + } +} + +// DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image +func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { + client.CreateServiceAccountOrFail(name) + client.CreateRoleOrFail(resources.Role(name, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get"}, + }), + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{rbacv1.VerbAll}, + }), + )) + client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) + + eventRecordPod := eventRecordPod(name, name) + client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) + err := pkgtest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace) + if err != nil { + client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) + } + client.WaitForServiceEndpointsOrFail(ctx, name, 1) + return eventRecordPod +} + +// eventRecordPod creates a Pod that stores received events for test retrieval. +func eventRecordPod(name string, serviceAccountName string) *corev1.Pod { return recordEventsPod("recordevents", name, serviceAccountName) } @@ -37,7 +132,7 @@ func recordEventsPod(imageName string, name string, serviceAccountName string) * Spec: corev1.PodSpec{ Containers: []corev1.Container{{ Name: imageName, - Image: test.ImagePath(imageName), + Image: pkgtest.ImagePath(imageName), ImagePullPolicy: corev1.PullAlways, Env: []corev1.EnvVar{{ Name: "SYSTEM_NAMESPACE", diff --git a/test/lib/resources/kube.go b/test/lib/resources/kube.go index bc415d10873..d9dc37b90b8 100644 --- a/test/lib/resources/kube.go +++ b/test/lib/resources/kube.go @@ -27,7 +27,6 @@ import ( rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/uuid" pkgTest "knative.dev/pkg/test" ) @@ -37,33 +36,6 @@ type PodOption func(*corev1.Pod) // Option enables further configuration of a Role. type RoleOption func(*rbacv1.Role) -// EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event -func EventTransformationPod(name string, newEventType string, newEventSource string, newEventData []byte) *corev1.Pod { - const imageName = "transformevents" - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "-event-type", - newEventType, - "-event-source", - newEventSource, - "-event-data", - string(newEventData), - }, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - // HelloWorldPod creates a Pod that logs "Hello, World!". func HelloWorldPod(name string, options ...PodOption) *corev1.Pod { const imageName = "print" @@ -93,54 +65,6 @@ func WithLabelsForPod(labels map[string]string) PodOption { } } -// SequenceStepperPod creates a Pod that can be used as a step in testing Sequence. -// Note event data used in the test must be BaseData, and this Pod as a Subscriber will receive the event, -// and return a new event with eventMsgAppender added to data.Message. -func SequenceStepperPod(name, eventMsgAppender string) *corev1.Pod { - const imageName = "sequencestepper" - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - Args: []string{ - "-msg-appender", - eventMsgAppender, - }, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - -// EventFilteringPod creates a Pod that either filter or send the received CloudEvent -func EventFilteringPod(name string, filter bool) *corev1.Pod { - const imageName = "filterevents" - pod := &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } - if filter { - pod.Spec.Containers[0].Args = []string{"-filter"} - } - return pod -} - const ( PerfConsumerService = "perf-consumer" PerfAggregatorService = "perf-aggregator" diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 51fde078e9b..27a4b1c6862 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -45,7 +45,7 @@ func main() { logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) } - obs := observer.NewFromEnv( + obs := observer.NewFromEnv(ctx, recorder_vent.NewFromEnv(ctx), ) diff --git a/vendor/modules.txt b/vendor/modules.txt index e5574149f3e..4d11980730a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -254,7 +254,6 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.9.1 -## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log