diff --git a/go.mod b/go.mod index 2be3e6c075a..122ac5ad2b6 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,6 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 - github.com/prometheus/common v0.9.1 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/test/e2e/helpers/channel_event_tranformation_test_helper.go b/test/e2e/helpers/channel_event_tranformation_test_helper.go index 80d1b550fdf..b32d076d9a0 100644 --- a/test/e2e/helpers/channel_event_tranformation_test_helper.go +++ b/test/e2e/helpers/channel_event_tranformation_test_helper.go @@ -65,13 +65,16 @@ func EventTransformationForSubscriptionTestHelper( if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil { t.Fatal("Cannot set the payload of the event:", err.Error()) } - transformationPod := resources.EventTransformationPod( + recordevents.DeployEventRecordOrFail( + ctx, + client, transformationPodName, - eventAfterTransformation.Type(), - eventAfterTransformation.Source(), - eventAfterTransformation.Data(), + recordevents.ReplyWithTransformedEvent( + eventAfterTransformation.Type(), + eventAfterTransformation.Source(), + string(eventAfterTransformation.Data()), + ), ) - client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName)) // create event logger pod and service as the subscriber eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName) diff --git a/test/lib/recordevents/event_info_store.go b/test/lib/recordevents/event_info_store.go index 007a4c14952..2a3d5fda9fe 100644 --- a/test/lib/recordevents/event_info_store.go +++ b/test/lib/recordevents/event_info_store.go @@ -28,12 +28,9 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/util/wait" - pkgTest "knative.dev/pkg/test" testlib "knative.dev/eventing/test/lib" - "knative.dev/eventing/test/lib/resources" ) const ( @@ -42,34 +39,6 @@ const ( retryTimeout = 4 * time.Minute ) -type EventRecordOption = func(*corev1.Pod, *testlib.Client) error - -func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { - client.CreateServiceAccountOrFail(name) - client.CreateRoleOrFail(resources.Role(name, - resources.WithRuleForRole(&rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"pods"}, - Verbs: []string{"get"}, - }), - resources.WithRuleForRole(&rbacv1.PolicyRule{ - APIGroups: []string{""}, - Resources: []string{"events"}, - Verbs: []string{rbacv1.VerbAll}, - }), - )) - client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) - - eventRecordPod := EventRecordPod(name, name) - client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) - err := pkgTest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace) - if err != nil { - client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) - } - client.WaitForServiceEndpointsOrFail(ctx, name, 1) - return eventRecordPod -} - // Deploys a new recordevents pod and start the associated EventInfoStore func StartEventRecordOrFail(ctx context.Context, client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) { eventRecordPod := DeployEventRecordOrFail(ctx, client, podName, options...) diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index e9bd4224aac..badc5048802 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -25,7 +25,6 @@ import ( cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" - "github.com/prometheus/common/log" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -33,35 +32,57 @@ import ( // Observer is the entry point for sinking events into the event log. type Observer struct { + // Name is the name of this Observer, used to filter if multiple observers. Name string // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - seq uint64 -} - -// New returns an observer that will vent observations to the list of provided -// EventLog instances. It will listen on :8080. -func New(name string, eventLogs ...recordevents.EventLog) *Observer { - return &Observer{ - Name: name, - EventLogs: eventLogs, - } + ctx context.Context + seq uint64 + replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) } type envConfig struct { // ObserverName is used to identify this instance of the observer. ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"` + + // Reply is used to define if the observer should reply back + Reply bool `envconfig:"REPLY" default:"false" required:"false"` + + // The event type to use in the reply, if enabled + ReplyEventType string `envconfig:"REPLY_EVENT_TYPE" default:"" required:"false"` + + // The event source to use in the reply, if enabled + ReplyEventSource string `envconfig:"REPLY_EVENT_SOURCE" default:"" required:"false"` + + // The event data to use in the reply, if enabled + ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"` } -func NewFromEnv(eventLogs ...recordevents.EventLog) *Observer { +func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { var env envConfig if err := envconfig.Process("", &env); err != nil { - log.Fatal("Failed to process env var", err) + logging.FromContext(ctx).Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Observer environment configuration: %+v", env) + + var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + if env.Reply { + logging.FromContext(ctx).Info("Observer will reply with an event") + replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData) + } else { + logging.FromContext(ctx).Info("Observer won't reply with an event") + replyFunc = NoOpReply } - return New(env.ObserverName, eventLogs...) + return &Observer{ + Name: env.ObserverName, + EventLogs: eventLogs, + ctx: ctx, + replyFunc: replyFunc, + } } // Start will create the CloudEvents client and start listening for inbound @@ -99,7 +120,7 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) if eventErr != nil { eventErrStr = eventErr.Error() } - err := o.EventLogs.Vent(recordevents.EventInfo{ + eventInfo := recordevents.EventInfo{ Error: eventErrStr, Event: event, HTTPHeaders: header, @@ -107,10 +128,11 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Observer: o.Name, Time: time.Now(), Sequence: atomic.AddUint64(&o.seq, 1), - }) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { - log.Warn("Error while venting the recorded event", err) + logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err) } - writer.WriteHeader(http.StatusAccepted) + o.replyFunc(o.ctx, writer, eventInfo) } diff --git a/test/lib/recordevents/observer/reply.go b/test/lib/recordevents/observer/reply.go new file mode 100644 index 00000000000..96456977f0c --- /dev/null +++ b/test/lib/recordevents/observer/reply.go @@ -0,0 +1,70 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package observer + +import ( + "context" + "net/http" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/cloudevents/sdk-go/v2/binding" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.EventInfo) { + writer.WriteHeader(http.StatusAccepted) +} + +func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) { + return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) { + if info.Error != "" { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte(info.Error)) + logging.FromContext(ctx).Warn("Conversion error in the event to send back", info.Error) + return + } + + if info.Event == nil { + writer.WriteHeader(http.StatusBadRequest) + _, _ = writer.Write([]byte("No event!")) + logging.FromContext(ctx).Warn("No event to send back") + return + } + + outputEvent := info.Event.Clone() + + if replyEventSource != "" { + outputEvent.SetSource(replyEventSource) + } + if replyEventType != "" { + outputEvent.SetType(replyEventType) + } + if replyEventData != "" { + if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil { + logging.FromContext(ctx).Warn("Cannot set the event data") + } + } + + err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer) + if err != nil { + logging.FromContext(ctx).Warn("Error while writing the event as response", err) + } + } +} diff --git a/test/lib/recordevents/recorder_vent/constructor.go b/test/lib/recordevents/recorder_vent/constructor.go index cc73a9671f0..c4743c67ae8 100644 --- a/test/lib/recordevents/recorder_vent/constructor.go +++ b/test/lib/recordevents/recorder_vent/constructor.go @@ -50,7 +50,7 @@ func NewFromEnv(ctx context.Context) recordevents.EventLog { log.Fatal("Failed to process env var", err) } - logging.FromContext(ctx).Infof("Environment configuration: %+v", env) + logging.FromContext(ctx).Infof("Recorder vent environment configuration: %+v", env) return NewEventLog(ctx, env.AgentName, env.PodName) } diff --git a/test/lib/recordevents/resources.go b/test/lib/recordevents/resources.go index 7398de68db4..7b018818c63 100644 --- a/test/lib/recordevents/resources.go +++ b/test/lib/recordevents/resources.go @@ -17,12 +17,89 @@ limitations under the License. package recordevents import ( + "context" + + "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/uuid" - "knative.dev/pkg/test" + pkgtest "knative.dev/pkg/test" + + testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/resources" ) +type EventRecordOption = func(*corev1.Pod, *testlib.Client) error + +// EchoEvent is an option to let the recordevents reply with the received event +func EchoEvent(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + return nil +} + +var _ EventRecordOption = EchoEvent + +// ReplyWithTransformedEvent is an option to let the recordevents reply with the transformed event +func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, replyEventData string) EventRecordOption { + return func(pod *corev1.Pod, client *testlib.Client) error { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY", Value: "true"}, + ) + if replyEventType != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_TYPE", Value: replyEventType}, + ) + } + if replyEventSource != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_SOURCE", Value: replyEventSource}, + ) + } + if replyEventData != "" { + pod.Spec.Containers[0].Env = append( + pod.Spec.Containers[0].Env, + corev1.EnvVar{Name: "REPLY_EVENT_DATA", Value: replyEventData}, + ) + } + + return nil + } +} + +// DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image +func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { + client.CreateServiceAccountOrFail(name) + client.CreateRoleOrFail(resources.Role(name, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get"}, + }), + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{rbacv1.VerbAll}, + }), + )) + client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) + + eventRecordPod := EventRecordPod(name, name) + client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) + err := pkgtest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace) + if err != nil { + client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) + } + client.WaitForServiceEndpointsOrFail(ctx, name, 1) + return eventRecordPod +} + // EventRecordPod creates a Pod that stores received events for test retrieval. func EventRecordPod(name string, serviceAccountName string) *corev1.Pod { return recordEventsPod("recordevents", name, serviceAccountName) @@ -37,7 +114,7 @@ func recordEventsPod(imageName string, name string, serviceAccountName string) * Spec: corev1.PodSpec{ Containers: []corev1.Container{{ Name: imageName, - Image: test.ImagePath(imageName), + Image: pkgtest.ImagePath(imageName), ImagePullPolicy: corev1.PullIfNotPresent, Env: []corev1.EnvVar{{ Name: "SYSTEM_NAMESPACE", diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 51fde078e9b..38445c45bc8 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -22,7 +22,7 @@ import ( "os" "k8s.io/client-go/rest" - "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/injection" "knative.dev/pkg/logging" _ "knative.dev/pkg/system/testing" @@ -39,13 +39,13 @@ func main() { log.Fatal("Error while reading the cfg", err) } //nolint // nil ctx is fine here, look at the code of EnableInjectionOrDie - ctx := sharedmain.EnableInjectionOrDie(nil, cfg) + ctx, _ := injection.EnableInjectionOrDie(nil, cfg) if err := test_images.ConfigureTracing(logging.FromContext(ctx), ""); err != nil { logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) } - obs := observer.NewFromEnv( + obs := observer.NewFromEnv(ctx, recorder_vent.NewFromEnv(ctx), ) diff --git a/vendor/modules.txt b/vendor/modules.txt index 740b18aa23a..3e15370c691 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -254,7 +254,6 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.9.1 -## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log