diff --git a/test/lib/dropevents/receiver.go b/test/lib/dropevents/receiver.go index 4453b4f4714..6b18f2be10d 100644 --- a/test/lib/dropevents/receiver.go +++ b/test/lib/dropevents/receiver.go @@ -32,6 +32,20 @@ const ( NumberKey = "NUMBER" ) +// count is only used for SKIP_ALGORITHM=Sequence. +func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper { + switch algorithm { + case Fibonacci: + return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1} + + case Sequence: + return dropeventsfirst.First{N: count} + + default: + panic("unknown algorithm: " + algorithm) + } +} + func SkipperAlgorithm(algorithm string) Skipper { switch algorithm { diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index d9dbbdae819..35a4d0265f7 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -42,16 +42,36 @@ type EventInfo struct { Observer string `json:"observer,omitempty"` Time time.Time `json:"time,omitempty"` Sequence uint64 `json:"sequence"` + Dropped bool `json:"dropped"` } -// Pretty print the event. Meant for debugging. This formats the validation error -// or the full event as appropriate. This does NOT format the headers. +// Pretty print the event. Meant for debugging. func (ei *EventInfo) String() string { + var sb strings.Builder + sb.WriteString("-- EventInfo --\n") if ei.Event != nil { - return ei.Event.String() - } else { - return fmt.Sprintf("invalid event \"%s\"", ei.Error) + sb.WriteString("--- Event ---\n") + sb.WriteString(ei.Event.String()) + sb.WriteRune('\n') + sb.WriteRune('\n') + } + if ei.Error != "" { + sb.WriteString("--- Error ---\n") + sb.WriteString(ei.Error) + sb.WriteRune('\n') + sb.WriteRune('\n') } + sb.WriteString("--- HTTP headers ---\n") + for k, v := range ei.HTTPHeaders { + sb.WriteString(" " + k + ": " + v[0] + "\n") + } + sb.WriteRune('\n') + sb.WriteString("--- Origin: '" + ei.Origin + "' ---\n") + sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n") + sb.WriteString("--- Time: " + ei.Time.String() + " ---\n") + sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence)) + sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped)) + return sb.String() } // This is mainly used for providing better failure messages diff --git a/test/lib/recordevents/logger_vent/logger.go b/test/lib/recordevents/logger_vent/logger.go new file mode 100644 index 00000000000..95d7012f23c --- /dev/null +++ b/test/lib/recordevents/logger_vent/logger.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logger_vent + +import "knative.dev/eventing/test/lib/recordevents" + +type Logger func(string, ...interface{}) + +func (l Logger) Vent(observed recordevents.EventInfo) error { + l("Event: \n%s", observed.String()) + + return nil +} diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index b57dac9c718..58e40a1c528 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -19,12 +19,16 @@ package observer import ( "context" "net/http" + "strings" "sync/atomic" "time" + "knative.dev/eventing/test/lib/dropevents" + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -41,11 +45,12 @@ type Observer struct { ctx context.Context seq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + counter *dropevents.CounterHandler } type envConfig struct { // ObserverName is used to identify this instance of the observer. - ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"` + ObserverName string `envconfig:"POD_NAME" default:"observer-default" required:"true"` // Reply is used to define if the observer should reply back Reply bool `envconfig:"REPLY" default:"false" required:"false"` @@ -62,6 +67,13 @@ type envConfig struct { // This string to append in the data field in the reply, if enabled. // This will threat the data as text/plain field ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` + + // If events should be dropped, specify the strategy here. + SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"` + + // If events should be dropped according to Linear policy, this controls + // how many events are dropped. + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { @@ -80,12 +92,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ logging.FromContext(ctx).Info("Observer won't reply with an event") replyFunc = NoOpReply } + var counter *dropevents.CounterHandler + + if env.SkipStrategy != "" { + counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } else { + counter = &dropevents.CounterHandler{ + // Don't skip anything, since count is 0. nop skipper. + Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0), + } + } return &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, + counter: counter, } } @@ -118,25 +143,51 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) defer m.Finish(nil) event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) - header := request.Header + headers := make(http.Header) + for k, v := range request.Header { + if !strings.HasPrefix(k, "Ce-") { + headers[k] = v + } + } + // Host header is removed from the request.Header map by net/http + if request.Host != "" { + headers.Set("Host", request.Host) + } eventErrStr := "" if eventErr != nil { eventErrStr = eventErr.Error() } + + shouldSkip := o.counter.Skip() + eventInfo := recordevents.EventInfo{ Error: eventErrStr, Event: event, - HTTPHeaders: header, + HTTPHeaders: headers, Origin: request.RemoteAddr, Observer: o.Name, Time: time.Now(), Sequence: atomic.AddUint64(&o.seq, 1), + Dropped: shouldSkip, } + + // We still want to emit the event to make it easier to see what we had oberved, but + // we want to transform it a little bit before emitting so that it does not count + // as the real event that we want to emit. + if shouldSkip { + eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { - logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err) + logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err)) } - o.replyFunc(o.ctx, writer, eventInfo) + if shouldSkip { + // Trigger a redelivery + writer.WriteHeader(http.StatusConflict) + } else { + o.replyFunc(o.ctx, writer, eventInfo) + } } diff --git a/test/lib/recordevents/recorder_vent/constructor.go b/test/lib/recordevents/recorder_vent/constructor.go index 53163a95f70..438ed043d3f 100644 --- a/test/lib/recordevents/recorder_vent/constructor.go +++ b/test/lib/recordevents/recorder_vent/constructor.go @@ -19,21 +19,15 @@ package recorder_vent import ( "context" "log" - "math/rand" "time" "github.com/kelseyhightower/envconfig" - "k8s.io/apimachinery/pkg/api/errors" - restclient "k8s.io/client-go/rest" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" + ref "k8s.io/client-go/tools/reference" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -70,86 +64,15 @@ func NewEventLog(ctx context.Context, agentName string, podName string, podNames logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace) - return &recorder{out: createRecorder(ctx, agentName, podNamespace), on: on} -} - -func createRecorder(ctx context.Context, agentName string, namespace string) record.EventRecorder { - logger := logging.FromContext(ctx) - - recorder := controller.GetEventRecorder(ctx) - if recorder == nil { - // Create event broadcaster - logger.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - watches := []watch.Interface{ - eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), - eventBroadcaster.StartEventWatcher( - sendToSink(ctx, kubeclient.Get(ctx).CoreV1().Events(namespace).CreateWithEventNamespace), - ), - } - recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) - go func() { - <-ctx.Done() - for _, w := range watches { - w.Stop() - } - logging.FromContext(ctx).Debug("Closed event-broadcaster") - }() - } - - return recorder -} - -func sendToSink(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error)) func(*corev1.Event) { - return func(event *corev1.Event) { - tries := 0 - for { - if recordEvent(ctx, sender, event) { - break - } - tries++ - if tries >= maxRetry { - logging.FromContext(ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name) - break - } - // Randomize the first sleep so that various clients won't all be - // synced up if the master goes down. - if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) - } else { - time.Sleep(sleepDuration) - } - } - } -} - -func recordEvent(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error), event *corev1.Event) bool { - newEv, err := sender(event) - if err == nil { - logging.FromContext(ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID) - return true + reference, err := ref.GetReference(scheme.Scheme, on) + if err != nil { + logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err) } - // If we can't contact the server, then hold everything while we keep trying. - // Otherwise, something about the event is malformed and we should abandon it. - switch err.(type) { - case *restclient.RequestConstructionError: - // We will construct the request the same next time, so don't keep trying. - logging.FromContext(ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err) - return true - case *errors.StatusError: - if errors.IsAlreadyExists(err) { - logging.FromContext(ctx).Infof("Server rejected event '%s': '%v' (will not retry!)", event.Name, err) - } else { - logging.FromContext(ctx).Errorf("Server rejected event '%s': '%v' (will not retry!)", event.Name, err) - } - return true - case *errors.UnexpectedObjectError: - // We don't expect this; it implies the server's response didn't match a - // known pattern. Go ahead and retry. - default: - // This case includes actual http transport errors. Go ahead and retry. + return &recorder{ + ctx: ctx, + namespace: podNamespace, + agentName: agentName, + ref: reference, } - logging.FromContext(ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err) - return false } diff --git a/test/lib/recordevents/recorder_vent/recorder.go b/test/lib/recordevents/recorder_vent/recorder.go index 9669741f9b6..117ac2df16a 100644 --- a/test/lib/recordevents/recorder_vent/recorder.go +++ b/test/lib/recordevents/recorder_vent/recorder.go @@ -17,18 +17,28 @@ limitations under the License. package recorder_vent import ( + "context" "encoding/json" + "fmt" + "math/rand" + "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" ) type recorder struct { - out record.EventRecorder - on runtime.Object + ctx context.Context + namespace string + agentName string + + ref *corev1.ObjectReference } func (r *recorder) Vent(observed recordevents.EventInfo) error { @@ -36,8 +46,75 @@ func (r *recorder) Vent(observed recordevents.EventInfo) error { if err != nil { return err } + message := string(b) + + t := time.Now() + // Note: DO NOT SET EventTime, or you'll trigger k8s api server hilarity: + // - https://github.com/kubernetes/kubernetes/issues/95913 + // - https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/validation/events.go#L122 + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%d", r.ref.Name, observed.Sequence), + Namespace: r.namespace, + }, + InvolvedObject: *r.ref, + Reason: recordevents.CloudEventObservedReason, + Message: message, + Source: corev1.EventSource{Component: r.agentName}, + FirstTimestamp: metav1.Time{Time: t}, + LastTimestamp: metav1.Time{Time: t}, + Count: 1, + Type: corev1.EventTypeNormal, + } + + return r.recordEvent(event) +} + +func (r *recorder) recordEvent(event *corev1.Event) error { + tries := 0 + for { + done, err := r.trySendEvent(event) + if done { + return nil + } + tries++ + if tries >= maxRetry { + logging.FromContext(r.ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name) + return err + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} - r.out.Eventf(r.on, corev1.EventTypeNormal, recordevents.CloudEventObservedReason, "%s", string(b)) +func (r *recorder) trySendEvent(event *corev1.Event) (bool, error) { + newEv, err := kubeclient.Get(r.ctx).CoreV1().Events(r.namespace).CreateWithEventNamespace(event) + if err == nil { + logging.FromContext(r.ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID) + return true, nil + } - return nil + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + logging.FromContext(r.ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err) + return true, err + case *apierrors.StatusError: + logging.FromContext(r.ctx).Errorf("Server rejected event '%s'. Reason: '%v' (will not retry!). Event: %v", event.Name, err, event) + return true, err + case *apierrors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + logging.FromContext(r.ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false, err } diff --git a/test/lib/test_runner.go b/test/lib/test_runner.go index 553ebeff08e..863453c5307 100644 --- a/test/lib/test_runner.go +++ b/test/lib/test_runner.go @@ -19,6 +19,8 @@ package lib import ( "fmt" "path/filepath" + "sort" + "strings" "testing" "time" @@ -189,8 +191,25 @@ func TearDown(client *Client) { if err != nil { client.T.Logf("Could not list events in the namespace %q: %v", client.Namespace, err) } else { - for _, e := range el.Items { - client.T.Logf("EVENT: %v", e) + // Elements has to be ordered first + items := el.Items + sort.SliceStable(items, func(i, j int) bool { + // Some events might not contain last timestamp, in that case we fallback to event time + iTime := items[i].LastTimestamp.Time + if iTime.IsZero() { + iTime = items[i].EventTime.Time + } + + jTime := items[j].LastTimestamp.Time + if jTime.IsZero() { + jTime = items[j].EventTime.Time + } + + return iTime.Before(jTime) + }) + + for _, e := range items { + client.T.Log(formatEvent(&e)) } } @@ -210,6 +229,27 @@ func TearDown(client *Client) { } } +func formatEvent(e *corev1.Event) string { + return strings.Join([]string{`Event{`, + `ObjectMeta:` + strings.Replace(strings.Replace(e.ObjectMeta.String(), "ObjectMeta", "v1.ObjectMeta", 1), `&`, ``, 1), + `InvolvedObject:` + strings.Replace(strings.Replace(e.InvolvedObject.String(), "ObjectReference", "ObjectReference", 1), `&`, ``, 1), + `Reason:` + e.Reason, + `Message:` + e.Message, + `Source:` + strings.Replace(strings.Replace(e.Source.String(), "EventSource", "EventSource", 1), `&`, ``, 1), + `FirstTimestamp:` + e.FirstTimestamp.String(), + `LastTimestamp:` + e.LastTimestamp.String(), + `Count:` + fmt.Sprintf("%d", e.Count), + `Type:` + e.Type, + `EventTime:` + e.EventTime.String(), + `Series:` + strings.Replace(e.Series.String(), "EventSeries", "EventSeries", 1), + `Action:` + e.Action, + `Related:` + strings.Replace(e.Related.String(), "ObjectReference", "ObjectReference", 1), + `ReportingController:` + e.ReportingController, + `ReportingInstance:` + e.ReportingInstance, + `}`, + }, "\n") +} + // CreateNamespaceIfNeeded creates a new namespace if it does not exist. func CreateNamespaceIfNeeded(t *testing.T, client *Client, namespace string) { _, err := client.Kube.Kube.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}) diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index f2427c40c7b..95e45db54f5 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -19,8 +19,6 @@ package main import ( "context" "log" - "net/http" - "os" "k8s.io/client-go/rest" "knative.dev/pkg/injection" @@ -28,7 +26,7 @@ import ( _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/test/lib/dropevents" + "knative.dev/eventing/test/lib/recordevents/logger_vent" "knative.dev/eventing/test/lib/recordevents/observer" "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" @@ -46,27 +44,11 @@ func main() { } obs := observer.NewFromEnv(ctx, + logger_vent.Logger(logging.FromContext(ctx).Infof), recorder_vent.NewFromEnv(ctx), ) - algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - if ok { - skipper := dropevents.SkipperAlgorithm(algorithm) - counter := dropevents.CounterHandler{ - Skipper: skipper, - } - err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - handler.ServeHTTP(writer, request) - }) - }) - } else { - err = obs.Start(ctx, kncloudevents.CreateHandler) - } + err = obs.Start(ctx, kncloudevents.CreateHandler) if err != nil { logging.FromContext(ctx).Fatal("Error during start", err)