From b675a6a3add2092d230e17b04697681a29b56537 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 26 Oct 2020 14:14:30 +0200 Subject: [PATCH 1/5] emit a k8s event when dropping events Signed-off-by: Ville Aikas --- test/lib/dropevents/receiver.go | 14 ++++++ test/lib/recordevents/observer/observer.go | 51 ++++++++++++++++++++-- test/test_images/recordevents/main.go | 25 +---------- 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/test/lib/dropevents/receiver.go b/test/lib/dropevents/receiver.go index 4453b4f4714..6b18f2be10d 100644 --- a/test/lib/dropevents/receiver.go +++ b/test/lib/dropevents/receiver.go @@ -32,6 +32,20 @@ const ( NumberKey = "NUMBER" ) +// count is only used for SKIP_ALGORITHM=Sequence. +func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper { + switch algorithm { + case Fibonacci: + return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1} + + case Sequence: + return dropeventsfirst.First{N: count} + + default: + panic("unknown algorithm: " + algorithm) + } +} + func SkipperAlgorithm(algorithm string) Skipper { switch algorithm { diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 9933542d94e..4e16b26c5d0 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "time" + "knative.dev/eventing/test/lib/dropevents" + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" @@ -39,9 +41,14 @@ type Observer struct { // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - ctx context.Context - seq uint64 + ctx context.Context + seq uint64 + // Increment this for every dropped event that we see + dropSeq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + // If configured to drop some events, counter keeps track of how many should + // be dropped. + counter *dropevents.CounterHandler } type envConfig struct { @@ -63,6 +70,13 @@ type envConfig struct { // This string to append in the data field in the reply, if enabled. // This will threat the data as text/plain field ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` + + // If events should be dropped, specify the strategy here. + SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"` + + // If events should be dropped according to Linear policy, this controls + // how many events are dropped. + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0"" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { @@ -82,11 +96,19 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ replyFunc = NoOpReply } + var skipper dropevents.Skipper + if env.SkipStrategy != "" { + skipper = dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter) + } + return &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, + counter: &dropevents.CounterHandler{ + Skipper: skipper, + }, } } @@ -136,12 +158,33 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Origin: request.RemoteAddr, Observer: o.Name, Time: time.Now(), - Sequence: atomic.AddUint64(&o.seq, 1), } + shouldSkip := false + if o.counter != nil { + shouldSkip = o.counter.Skip() + } + + // We still want to emit the event to make it easier to see what we had oberved, but + // we want to transform it a little bit before emitting so that it does not count + // as the real event that we want to emit. + if shouldSkip { + eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1) + eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) + } else { + // Increment the sequence only if we're not dropping so that we do not + // introduce side effects. + eventInfo.Sequence = atomic.AddUint64(&o.seq, 1) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { logging.FromContext(o.ctx).Warnw("Error while venting the recorded event", zap.Error(err)) } - o.replyFunc(o.ctx, writer, eventInfo) + if shouldSkip { + // Trigger a redelivery + writer.WriteHeader(http.StatusConflict) + } else { + o.replyFunc(o.ctx, writer, eventInfo) + } } diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 38445c45bc8..8a141efa0e6 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -17,17 +17,13 @@ limitations under the License. package main import ( - "log" - "net/http" - "os" - "k8s.io/client-go/rest" "knative.dev/pkg/injection" "knative.dev/pkg/logging" _ "knative.dev/pkg/system/testing" + "log" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/test/lib/dropevents" "knative.dev/eventing/test/lib/recordevents/observer" "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" @@ -49,24 +45,7 @@ func main() { recorder_vent.NewFromEnv(ctx), ) - algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - if ok { - skipper := dropevents.SkipperAlgorithm(algorithm) - counter := dropevents.CounterHandler{ - Skipper: skipper, - } - err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - handler.ServeHTTP(writer, request) - }) - }) - } else { - err = obs.Start(ctx, kncloudevents.CreateHandler) - } + err = obs.Start(ctx, kncloudevents.CreateHandler) if err != nil { logging.FromContext(ctx).Fatal("Error during start", err) From fdb12d0ecdde701826c58db8b7ecbbb4e486d584 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 26 Oct 2020 14:31:44 +0200 Subject: [PATCH 2/5] go imports Signed-off-by: Ville Aikas --- test/test_images/recordevents/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 8a141efa0e6..686b10f4c00 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -17,11 +17,12 @@ limitations under the License. package main import ( + "log" + "k8s.io/client-go/rest" "knative.dev/pkg/injection" "knative.dev/pkg/logging" _ "knative.dev/pkg/system/testing" - "log" "knative.dev/eventing/pkg/kncloudevents" "knative.dev/eventing/test/lib/recordevents/observer" From 972c3e59d60de764d5326145f4f02d039322436f Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Mon, 26 Oct 2020 14:40:38 +0200 Subject: [PATCH 3/5] tags Signed-off-by: Ville Aikas --- test/lib/recordevents/observer/observer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 4e16b26c5d0..c077a7409c8 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -76,7 +76,7 @@ type envConfig struct { // If events should be dropped according to Linear policy, this controls // how many events are dropped. - SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0"" required:"false"` + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { From a479b11c71e8053acababadcfa59c36381eb6c18 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Tue, 27 Oct 2020 10:52:13 +0200 Subject: [PATCH 4/5] fix silliness Signed-off-by: Ville Aikas --- test/lib/recordevents/observer/observer.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index c077a7409c8..00a1e727635 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -96,20 +96,19 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ replyFunc = NoOpReply } - var skipper dropevents.Skipper - if env.SkipStrategy != "" { - skipper = dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter) - } - - return &Observer{ + o := &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, - counter: &dropevents.CounterHandler{ - Skipper: skipper, - }, } + + if env.SkipStrategy != "" { + o.counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } + return o } // Start will create the CloudEvents client and start listening for inbound From 1c57141c6475a630b0540948e4e2c989d17834a7 Mon Sep 17 00:00:00 2001 From: Ville Aikas Date: Tue, 27 Oct 2020 13:05:23 +0200 Subject: [PATCH 5/5] simplify Signed-off-by: Ville Aikas --- test/lib/recordevents/observer/observer.go | 31 +++++++++++----------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 00a1e727635..3d5f3858fda 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -46,9 +46,7 @@ type Observer struct { // Increment this for every dropped event that we see dropSeq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) - // If configured to drop some events, counter keeps track of how many should - // be dropped. - counter *dropevents.CounterHandler + counter *dropevents.CounterHandler } type envConfig struct { @@ -95,20 +93,26 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ logging.FromContext(ctx).Info("Observer won't reply with an event") replyFunc = NoOpReply } + var counter *dropevents.CounterHandler - o := &Observer{ + if env.SkipStrategy != "" { + counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } else { + counter = &dropevents.CounterHandler{ + // Don't skip anything, since count is 0. nop skipper. + Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0), + } + } + + return &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, + counter: counter, } - - if env.SkipStrategy != "" { - o.counter = &dropevents.CounterHandler{ - Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), - } - } - return o } // Start will create the CloudEvents client and start listening for inbound @@ -158,10 +162,7 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Observer: o.Name, Time: time.Now(), } - shouldSkip := false - if o.counter != nil { - shouldSkip = o.counter.Skip() - } + shouldSkip := o.counter.Skip() // We still want to emit the event to make it easier to see what we had oberved, but // we want to transform it a little bit before emitting so that it does not count