From d5d747f3e8f66bfa7071fd1481576265df2477a1 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Mon, 26 Oct 2020 10:58:41 +0100 Subject: [PATCH 1/4] Nit (#4385) Signed-off-by: Francesco Guardiani (cherry picked from commit 3ceaad40cda1f695d4428f1e1e353699da9b60eb) Signed-off-by: Francesco Guardiani --- test/lib/recordevents/event_info.go | 34 ++++++++++++++--- test/lib/recordevents/observer/observer.go | 6 ++- test/lib/test_runner.go | 44 +++++++++++++++++++++- 3 files changed, 75 insertions(+), 9 deletions(-) diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index d9dbbdae819..4804a5353ab 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -44,26 +44,48 @@ type EventInfo struct { Sequence uint64 `json:"sequence"` } -// Pretty print the event. Meant for debugging. This formats the validation error -// or the full event as appropriate. This does NOT format the headers. +// Pretty print the event. Meant for debugging. func (ei *EventInfo) String() string { + var sb strings.Builder + sb.WriteString("-- EventInfo --\n") if ei.Event != nil { - return ei.Event.String() - } else { - return fmt.Sprintf("invalid event \"%s\"", ei.Error) + sb.WriteString("--- Event ---\n") + sb.WriteString(ei.Event.String()) + sb.WriteRune('\n') + sb.WriteRune('\n') } + if ei.Error != "" { + sb.WriteString("--- Error ---\n") + sb.WriteString(ei.Error) + sb.WriteRune('\n') + sb.WriteRune('\n') + } + sb.WriteString("--- HTTP headers ---\n") + for k, v := range ei.HTTPHeaders { + sb.WriteString(" " + k + ": " + v[0] + "\n") + } + sb.WriteRune('\n') + sb.WriteString("--- Origin: '" + ei.Origin + "' ---\n") + sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n") + sb.WriteString("--- Time: " + ei.Time.String() + " ---\n") + sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence)) + return sb.String() } // This is mainly used for providing better failure messages type SearchedInfo struct { TotalEvent int LastNEvent []EventInfo + + storeEventsSeen int + storeEventsNotMine int } // Pretty print the SearchedInfor for error messages func (s *SearchedInfo) String() string { var sb strings.Builder - sb.WriteString(fmt.Sprintf("%d events seen, last %d events:", s.TotalEvent, len(s.LastNEvent))) + sb.WriteString(fmt.Sprintf("%d events seen, last %d events (total events seen %d, events ignored %d):", + s.TotalEvent, len(s.LastNEvent), s.storeEventsSeen, s.storeEventsNotMine)) for _, ei := range s.LastNEvent { sb.WriteString(ei.String()) sb.WriteRune('\n') diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index b57dac9c718..54524ddca42 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -45,7 +45,7 @@ type Observer struct { type envConfig struct { // ObserverName is used to identify this instance of the observer. - ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"` + ObserverName string `envconfig:"POD_NAME" default:"observer-default" required:"true"` // Reply is used to define if the observer should reply back Reply bool `envconfig:"REPLY" default:"false" required:"false"` @@ -119,6 +119,10 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) header := request.Header + // Host header is removed from the request.Header map by net/http + if request.Host != "" { + header.Set("Host", request.Host) + } eventErrStr := "" if eventErr != nil { diff --git a/test/lib/test_runner.go b/test/lib/test_runner.go index 553ebeff08e..863453c5307 100644 --- a/test/lib/test_runner.go +++ b/test/lib/test_runner.go @@ -19,6 +19,8 @@ package lib import ( "fmt" "path/filepath" + "sort" + "strings" "testing" "time" @@ -189,8 +191,25 @@ func TearDown(client *Client) { if err != nil { client.T.Logf("Could not list events in the namespace %q: %v", client.Namespace, err) } else { - for _, e := range el.Items { - client.T.Logf("EVENT: %v", e) + // Elements has to be ordered first + items := el.Items + sort.SliceStable(items, func(i, j int) bool { + // Some events might not contain last timestamp, in that case we fallback to event time + iTime := items[i].LastTimestamp.Time + if iTime.IsZero() { + iTime = items[i].EventTime.Time + } + + jTime := items[j].LastTimestamp.Time + if jTime.IsZero() { + jTime = items[j].EventTime.Time + } + + return iTime.Before(jTime) + }) + + for _, e := range items { + client.T.Log(formatEvent(&e)) } } @@ -210,6 +229,27 @@ func TearDown(client *Client) { } } +func formatEvent(e *corev1.Event) string { + return strings.Join([]string{`Event{`, + `ObjectMeta:` + strings.Replace(strings.Replace(e.ObjectMeta.String(), "ObjectMeta", "v1.ObjectMeta", 1), `&`, ``, 1), + `InvolvedObject:` + strings.Replace(strings.Replace(e.InvolvedObject.String(), "ObjectReference", "ObjectReference", 1), `&`, ``, 1), + `Reason:` + e.Reason, + `Message:` + e.Message, + `Source:` + strings.Replace(strings.Replace(e.Source.String(), "EventSource", "EventSource", 1), `&`, ``, 1), + `FirstTimestamp:` + e.FirstTimestamp.String(), + `LastTimestamp:` + e.LastTimestamp.String(), + `Count:` + fmt.Sprintf("%d", e.Count), + `Type:` + e.Type, + `EventTime:` + e.EventTime.String(), + `Series:` + strings.Replace(e.Series.String(), "EventSeries", "EventSeries", 1), + `Action:` + e.Action, + `Related:` + strings.Replace(e.Related.String(), "ObjectReference", "ObjectReference", 1), + `ReportingController:` + e.ReportingController, + `ReportingInstance:` + e.ReportingInstance, + `}`, + }, "\n") +} + // CreateNamespaceIfNeeded creates a new namespace if it does not exist. func CreateNamespaceIfNeeded(t *testing.T, client *Client, namespace string) { _, err := client.Kube.Kube.CoreV1().Namespaces().Get(namespace, metav1.GetOptions{}) From 29f347eb3f6070d405683795b867f96c21608062 Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Tue, 27 Oct 2020 05:24:33 -0700 Subject: [PATCH 2/4] emit a k8s event when dropping events (#4389) * emit a k8s event when dropping events Signed-off-by: Ville Aikas * go imports Signed-off-by: Ville Aikas * tags Signed-off-by: Ville Aikas * fix silliness Signed-off-by: Ville Aikas * simplify Signed-off-by: Ville Aikas (cherry picked from commit 0a54fd909f39f37ba3f246c3b3a69576cf2c34aa) --- test/lib/dropevents/receiver.go | 14 ++++++ test/lib/recordevents/observer/observer.go | 51 ++++++++++++++++++++-- test/test_images/recordevents/main.go | 22 +--------- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/test/lib/dropevents/receiver.go b/test/lib/dropevents/receiver.go index 4453b4f4714..6b18f2be10d 100644 --- a/test/lib/dropevents/receiver.go +++ b/test/lib/dropevents/receiver.go @@ -32,6 +32,20 @@ const ( NumberKey = "NUMBER" ) +// count is only used for SKIP_ALGORITHM=Sequence. +func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper { + switch algorithm { + case Fibonacci: + return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1} + + case Sequence: + return dropeventsfirst.First{N: count} + + default: + panic("unknown algorithm: " + algorithm) + } +} + func SkipperAlgorithm(algorithm string) Skipper { switch algorithm { diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 54524ddca42..0e7528a45db 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "time" + "knative.dev/eventing/test/lib/dropevents" + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" @@ -38,9 +40,12 @@ type Observer struct { // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - ctx context.Context - seq uint64 + ctx context.Context + seq uint64 + // Increment this for every dropped event that we see + dropSeq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + counter *dropevents.CounterHandler } type envConfig struct { @@ -62,6 +67,13 @@ type envConfig struct { // This string to append in the data field in the reply, if enabled. // This will threat the data as text/plain field ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` + + // If events should be dropped, specify the strategy here. + SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"` + + // If events should be dropped according to Linear policy, this controls + // how many events are dropped. + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { @@ -80,12 +92,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ logging.FromContext(ctx).Info("Observer won't reply with an event") replyFunc = NoOpReply } + var counter *dropevents.CounterHandler + + if env.SkipStrategy != "" { + counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } else { + counter = &dropevents.CounterHandler{ + // Don't skip anything, since count is 0. nop skipper. + Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0), + } + } return &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, + counter: counter, } } @@ -135,12 +160,30 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Origin: request.RemoteAddr, Observer: o.Name, Time: time.Now(), - Sequence: atomic.AddUint64(&o.seq, 1), } + shouldSkip := o.counter.Skip() + + // We still want to emit the event to make it easier to see what we had oberved, but + // we want to transform it a little bit before emitting so that it does not count + // as the real event that we want to emit. + if shouldSkip { + eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1) + eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) + } else { + // Increment the sequence only if we're not dropping so that we do not + // introduce side effects. + eventInfo.Sequence = atomic.AddUint64(&o.seq, 1) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err) } - o.replyFunc(o.ctx, writer, eventInfo) + if shouldSkip { + // Trigger a redelivery + writer.WriteHeader(http.StatusConflict) + } else { + o.replyFunc(o.ctx, writer, eventInfo) + } } diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index f2427c40c7b..eca2971ce2b 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -19,8 +19,6 @@ package main import ( "context" "log" - "net/http" - "os" "k8s.io/client-go/rest" "knative.dev/pkg/injection" @@ -28,7 +26,6 @@ import ( _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/test/lib/dropevents" "knative.dev/eventing/test/lib/recordevents/observer" "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" @@ -49,24 +46,7 @@ func main() { recorder_vent.NewFromEnv(ctx), ) - algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - if ok { - skipper := dropevents.SkipperAlgorithm(algorithm) - counter := dropevents.CounterHandler{ - Skipper: skipper, - } - err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - handler.ServeHTTP(writer, request) - }) - }) - } else { - err = obs.Start(ctx, kncloudevents.CreateHandler) - } + err = obs.Start(ctx, kncloudevents.CreateHandler) if err != nil { logging.FromContext(ctx).Fatal("Error during start", err) From 0c253663d279f49838bb85245687c7113373923f Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 27 Oct 2020 16:14:33 +0100 Subject: [PATCH 3/4] [recordevents] Removed EventBroadcaster usage and replaced with manual send (#4393) * Removed EventBroadcaster usage and replaced with manual creation and send of events Signed-off-by: Francesco Guardiani * Boilerplate Signed-off-by: Francesco Guardiani * Remove redundant format Removed sequence annotation Signed-off-by: Francesco Guardiani * Added required value Signed-off-by: Francesco Guardiani * ?!?! Signed-off-by: Francesco Guardiani * Maybe this one fix the issue? Signed-off-by: Francesco Guardiani * Maybe this one fix the issue? Signed-off-by: Francesco Guardiani * Removed useless double log line Signed-off-by: Francesco Guardiani * Remove useless headers Signed-off-by: Francesco Guardiani * Missing host header Signed-off-by: Francesco Guardiani * Nit Signed-off-by: Francesco Guardiani * Now it works on my machine, i'm warning you prow! Signed-off-by: Francesco Guardiani * Nit Signed-off-by: Francesco Guardiani * Now it works for long events too Signed-off-by: Francesco Guardiani * Suggestions Signed-off-by: Francesco Guardiani * Fixed the dropped counter thing Signed-off-by: Francesco Guardiani * Nit Signed-off-by: Francesco Guardiani (cherry picked from commit 7de59ec86655fe43b320fa3f27afcf9d1a822a84) Signed-off-by: Francesco Guardiani --- test/lib/recordevents/event_info.go | 2 + test/lib/recordevents/logger_vent/logger.go | 27 ++++++ test/lib/recordevents/observer/observer.go | 32 ++++--- .../recordevents/recorder_vent/constructor.go | 95 ++----------------- .../recordevents/recorder_vent/recorder.go | 89 +++++++++++++++-- test/test_images/recordevents/main.go | 2 + 6 files changed, 141 insertions(+), 106 deletions(-) create mode 100644 test/lib/recordevents/logger_vent/logger.go diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index 4804a5353ab..e8e721fc45f 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -42,6 +42,7 @@ type EventInfo struct { Observer string `json:"observer,omitempty"` Time time.Time `json:"time,omitempty"` Sequence uint64 `json:"sequence"` + Dropped bool `json:"dropped"` } // Pretty print the event. Meant for debugging. @@ -69,6 +70,7 @@ func (ei *EventInfo) String() string { sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n") sb.WriteString("--- Time: " + ei.Time.String() + " ---\n") sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence)) + sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped)) return sb.String() } diff --git a/test/lib/recordevents/logger_vent/logger.go b/test/lib/recordevents/logger_vent/logger.go new file mode 100644 index 00000000000..95d7012f23c --- /dev/null +++ b/test/lib/recordevents/logger_vent/logger.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package logger_vent + +import "knative.dev/eventing/test/lib/recordevents" + +type Logger func(string, ...interface{}) + +func (l Logger) Vent(observed recordevents.EventInfo) error { + l("Event: \n%s", observed.String()) + + return nil +} diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 0e7528a45db..58e40a1c528 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -19,6 +19,7 @@ package observer import ( "context" "net/http" + "strings" "sync/atomic" "time" @@ -27,6 +28,7 @@ import ( cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" + "go.uber.org/zap" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -40,10 +42,8 @@ type Observer struct { // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - ctx context.Context - seq uint64 - // Increment this for every dropped event that we see - dropSeq uint64 + ctx context.Context + seq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) counter *dropevents.CounterHandler } @@ -143,41 +143,45 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) defer m.Finish(nil) event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) - header := request.Header + headers := make(http.Header) + for k, v := range request.Header { + if !strings.HasPrefix(k, "Ce-") { + headers[k] = v + } + } // Host header is removed from the request.Header map by net/http if request.Host != "" { - header.Set("Host", request.Host) + headers.Set("Host", request.Host) } eventErrStr := "" if eventErr != nil { eventErrStr = eventErr.Error() } + + shouldSkip := o.counter.Skip() + eventInfo := recordevents.EventInfo{ Error: eventErrStr, Event: event, - HTTPHeaders: header, + HTTPHeaders: headers, Origin: request.RemoteAddr, Observer: o.Name, Time: time.Now(), + Sequence: atomic.AddUint64(&o.seq, 1), + Dropped: shouldSkip, } - shouldSkip := o.counter.Skip() // We still want to emit the event to make it easier to see what we had oberved, but // we want to transform it a little bit before emitting so that it does not count // as the real event that we want to emit. if shouldSkip { - eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1) eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) - } else { - // Increment the sequence only if we're not dropping so that we do not - // introduce side effects. - eventInfo.Sequence = atomic.AddUint64(&o.seq, 1) } err := o.EventLogs.Vent(eventInfo) if err != nil { - logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err) + logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err)) } if shouldSkip { diff --git a/test/lib/recordevents/recorder_vent/constructor.go b/test/lib/recordevents/recorder_vent/constructor.go index 53163a95f70..438ed043d3f 100644 --- a/test/lib/recordevents/recorder_vent/constructor.go +++ b/test/lib/recordevents/recorder_vent/constructor.go @@ -19,21 +19,15 @@ package recorder_vent import ( "context" "log" - "math/rand" "time" "github.com/kelseyhightower/envconfig" - "k8s.io/apimachinery/pkg/api/errors" - restclient "k8s.io/client-go/rest" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes/scheme" - "k8s.io/client-go/tools/record" + ref "k8s.io/client-go/tools/reference" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" @@ -70,86 +64,15 @@ func NewEventLog(ctx context.Context, agentName string, podName string, podNames logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace) - return &recorder{out: createRecorder(ctx, agentName, podNamespace), on: on} -} - -func createRecorder(ctx context.Context, agentName string, namespace string) record.EventRecorder { - logger := logging.FromContext(ctx) - - recorder := controller.GetEventRecorder(ctx) - if recorder == nil { - // Create event broadcaster - logger.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - watches := []watch.Interface{ - eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), - eventBroadcaster.StartEventWatcher( - sendToSink(ctx, kubeclient.Get(ctx).CoreV1().Events(namespace).CreateWithEventNamespace), - ), - } - recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) - go func() { - <-ctx.Done() - for _, w := range watches { - w.Stop() - } - logging.FromContext(ctx).Debug("Closed event-broadcaster") - }() - } - - return recorder -} - -func sendToSink(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error)) func(*corev1.Event) { - return func(event *corev1.Event) { - tries := 0 - for { - if recordEvent(ctx, sender, event) { - break - } - tries++ - if tries >= maxRetry { - logging.FromContext(ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name) - break - } - // Randomize the first sleep so that various clients won't all be - // synced up if the master goes down. - if tries == 1 { - time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) - } else { - time.Sleep(sleepDuration) - } - } - } -} - -func recordEvent(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error), event *corev1.Event) bool { - newEv, err := sender(event) - if err == nil { - logging.FromContext(ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID) - return true + reference, err := ref.GetReference(scheme.Scheme, on) + if err != nil { + logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err) } - // If we can't contact the server, then hold everything while we keep trying. - // Otherwise, something about the event is malformed and we should abandon it. - switch err.(type) { - case *restclient.RequestConstructionError: - // We will construct the request the same next time, so don't keep trying. - logging.FromContext(ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err) - return true - case *errors.StatusError: - if errors.IsAlreadyExists(err) { - logging.FromContext(ctx).Infof("Server rejected event '%s': '%v' (will not retry!)", event.Name, err) - } else { - logging.FromContext(ctx).Errorf("Server rejected event '%s': '%v' (will not retry!)", event.Name, err) - } - return true - case *errors.UnexpectedObjectError: - // We don't expect this; it implies the server's response didn't match a - // known pattern. Go ahead and retry. - default: - // This case includes actual http transport errors. Go ahead and retry. + return &recorder{ + ctx: ctx, + namespace: podNamespace, + agentName: agentName, + ref: reference, } - logging.FromContext(ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err) - return false } diff --git a/test/lib/recordevents/recorder_vent/recorder.go b/test/lib/recordevents/recorder_vent/recorder.go index 9669741f9b6..117ac2df16a 100644 --- a/test/lib/recordevents/recorder_vent/recorder.go +++ b/test/lib/recordevents/recorder_vent/recorder.go @@ -17,18 +17,28 @@ limitations under the License. package recorder_vent import ( + "context" "encoding/json" + "fmt" + "math/rand" + "time" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + restclient "k8s.io/client-go/rest" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" "knative.dev/eventing/test/lib/recordevents" ) type recorder struct { - out record.EventRecorder - on runtime.Object + ctx context.Context + namespace string + agentName string + + ref *corev1.ObjectReference } func (r *recorder) Vent(observed recordevents.EventInfo) error { @@ -36,8 +46,75 @@ func (r *recorder) Vent(observed recordevents.EventInfo) error { if err != nil { return err } + message := string(b) + + t := time.Now() + // Note: DO NOT SET EventTime, or you'll trigger k8s api server hilarity: + // - https://github.com/kubernetes/kubernetes/issues/95913 + // - https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/validation/events.go#L122 + event := &corev1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%v.%d", r.ref.Name, observed.Sequence), + Namespace: r.namespace, + }, + InvolvedObject: *r.ref, + Reason: recordevents.CloudEventObservedReason, + Message: message, + Source: corev1.EventSource{Component: r.agentName}, + FirstTimestamp: metav1.Time{Time: t}, + LastTimestamp: metav1.Time{Time: t}, + Count: 1, + Type: corev1.EventTypeNormal, + } + + return r.recordEvent(event) +} + +func (r *recorder) recordEvent(event *corev1.Event) error { + tries := 0 + for { + done, err := r.trySendEvent(event) + if done { + return nil + } + tries++ + if tries >= maxRetry { + logging.FromContext(r.ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name) + return err + } + // Randomize the first sleep so that various clients won't all be + // synced up if the master goes down. + if tries == 1 { + time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64())) + } else { + time.Sleep(sleepDuration) + } + } +} - r.out.Eventf(r.on, corev1.EventTypeNormal, recordevents.CloudEventObservedReason, "%s", string(b)) +func (r *recorder) trySendEvent(event *corev1.Event) (bool, error) { + newEv, err := kubeclient.Get(r.ctx).CoreV1().Events(r.namespace).CreateWithEventNamespace(event) + if err == nil { + logging.FromContext(r.ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID) + return true, nil + } - return nil + // If we can't contact the server, then hold everything while we keep trying. + // Otherwise, something about the event is malformed and we should abandon it. + switch err.(type) { + case *restclient.RequestConstructionError: + // We will construct the request the same next time, so don't keep trying. + logging.FromContext(r.ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err) + return true, err + case *apierrors.StatusError: + logging.FromContext(r.ctx).Errorf("Server rejected event '%s'. Reason: '%v' (will not retry!). Event: %v", event.Name, err, event) + return true, err + case *apierrors.UnexpectedObjectError: + // We don't expect this; it implies the server's response didn't match a + // known pattern. Go ahead and retry. + default: + // This case includes actual http transport errors. Go ahead and retry. + } + logging.FromContext(r.ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err) + return false, err } diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index eca2971ce2b..95e45db54f5 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -26,6 +26,7 @@ import ( _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/eventing/test/lib/recordevents/logger_vent" "knative.dev/eventing/test/lib/recordevents/observer" "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" @@ -43,6 +44,7 @@ func main() { } obs := observer.NewFromEnv(ctx, + logger_vent.Logger(logging.FromContext(ctx).Infof), recorder_vent.NewFromEnv(ctx), ) From c58b0f7d21579932469d0ee86a8e3a2f65bb9885 Mon Sep 17 00:00:00 2001 From: slinkydeveloper Date: Tue, 27 Oct 2020 16:36:01 +0100 Subject: [PATCH 4/4] Wrong merge fix Signed-off-by: Francesco Guardiani --- test/lib/recordevents/event_info.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index e8e721fc45f..35a4d0265f7 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -78,16 +78,12 @@ func (ei *EventInfo) String() string { type SearchedInfo struct { TotalEvent int LastNEvent []EventInfo - - storeEventsSeen int - storeEventsNotMine int } // Pretty print the SearchedInfor for error messages func (s *SearchedInfo) String() string { var sb strings.Builder - sb.WriteString(fmt.Sprintf("%d events seen, last %d events (total events seen %d, events ignored %d):", - s.TotalEvent, len(s.LastNEvent), s.storeEventsSeen, s.storeEventsNotMine)) + sb.WriteString(fmt.Sprintf("%d events seen, last %d events:", s.TotalEvent, len(s.LastNEvent))) for _, ei := range s.LastNEvent { sb.WriteString(ei.String()) sb.WriteRune('\n')