diff --git a/test/lib/dropevents/receiver.go b/test/lib/dropevents/receiver.go index 4453b4f4714..6b18f2be10d 100644 --- a/test/lib/dropevents/receiver.go +++ b/test/lib/dropevents/receiver.go @@ -32,6 +32,20 @@ const ( NumberKey = "NUMBER" ) +// count is only used for SKIP_ALGORITHM=Sequence. +func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper { + switch algorithm { + case Fibonacci: + return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1} + + case Sequence: + return dropeventsfirst.First{N: count} + + default: + panic("unknown algorithm: " + algorithm) + } +} + func SkipperAlgorithm(algorithm string) Skipper { switch algorithm { diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go index 9933542d94e..3d5f3858fda 100644 --- a/test/lib/recordevents/observer/observer.go +++ b/test/lib/recordevents/observer/observer.go @@ -22,6 +22,8 @@ import ( "sync/atomic" "time" + "knative.dev/eventing/test/lib/dropevents" + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/kelseyhightower/envconfig" @@ -39,9 +41,12 @@ type Observer struct { // EventLogs is the list of EventLog implementors to vent observed events. EventLogs recordevents.EventLogs - ctx context.Context - seq uint64 + ctx context.Context + seq uint64 + // Increment this for every dropped event that we see + dropSeq uint64 replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo) + counter *dropevents.CounterHandler } type envConfig struct { @@ -63,6 +68,13 @@ type envConfig struct { // This string to append in the data field in the reply, if enabled. // This will threat the data as text/plain field ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"` + + // If events should be dropped, specify the strategy here. + SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"` + + // If events should be dropped according to Linear policy, this controls + // how many events are dropped. + SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"` } func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer { @@ -81,12 +93,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ logging.FromContext(ctx).Info("Observer won't reply with an event") replyFunc = NoOpReply } + var counter *dropevents.CounterHandler + + if env.SkipStrategy != "" { + counter = &dropevents.CounterHandler{ + Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter), + } + } else { + counter = &dropevents.CounterHandler{ + // Don't skip anything, since count is 0. nop skipper. + Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0), + } + } return &Observer{ Name: env.ObserverName, EventLogs: eventLogs, ctx: ctx, replyFunc: replyFunc, + counter: counter, } } @@ -136,12 +161,30 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) Origin: request.RemoteAddr, Observer: o.Name, Time: time.Now(), - Sequence: atomic.AddUint64(&o.seq, 1), } + shouldSkip := o.counter.Skip() + + // We still want to emit the event to make it easier to see what we had oberved, but + // we want to transform it a little bit before emitting so that it does not count + // as the real event that we want to emit. + if shouldSkip { + eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1) + eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type()) + } else { + // Increment the sequence only if we're not dropping so that we do not + // introduce side effects. + eventInfo.Sequence = atomic.AddUint64(&o.seq, 1) + } + err := o.EventLogs.Vent(eventInfo) if err != nil { logging.FromContext(o.ctx).Warnw("Error while venting the recorded event", zap.Error(err)) } - o.replyFunc(o.ctx, writer, eventInfo) + if shouldSkip { + // Trigger a redelivery + writer.WriteHeader(http.StatusConflict) + } else { + o.replyFunc(o.ctx, writer, eventInfo) + } } diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 38445c45bc8..686b10f4c00 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -18,8 +18,6 @@ package main import ( "log" - "net/http" - "os" "k8s.io/client-go/rest" "knative.dev/pkg/injection" @@ -27,7 +25,6 @@ import ( _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/eventing/test/lib/dropevents" "knative.dev/eventing/test/lib/recordevents/observer" "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" @@ -49,24 +46,7 @@ func main() { recorder_vent.NewFromEnv(ctx), ) - algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - if ok { - skipper := dropevents.SkipperAlgorithm(algorithm) - counter := dropevents.CounterHandler{ - Skipper: skipper, - } - err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - handler.ServeHTTP(writer, request) - }) - }) - } else { - err = obs.Start(ctx, kncloudevents.CreateHandler) - } + err = obs.Start(ctx, kncloudevents.CreateHandler) if err != nil { logging.FromContext(ctx).Fatal("Error during start", err)