Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions test/lib/dropevents/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ const (
NumberKey = "NUMBER"
)

// count is only used for SKIP_ALGORITHM=Sequence.
func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper {
switch algorithm {
case Fibonacci:
return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1}

case Sequence:
return dropeventsfirst.First{N: count}

default:
panic("unknown algorithm: " + algorithm)
}
}

func SkipperAlgorithm(algorithm string) Skipper {

switch algorithm {
Expand Down
51 changes: 47 additions & 4 deletions test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync/atomic"
"time"

"knative.dev/eventing/test/lib/dropevents"

cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
Expand All @@ -39,9 +41,12 @@ type Observer struct {
// EventLogs is the list of EventLog implementors to vent observed events.
EventLogs recordevents.EventLogs

ctx context.Context
seq uint64
ctx context.Context
seq uint64
// Increment this for every dropped event that we see
dropSeq uint64
replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
counter *dropevents.CounterHandler
}

type envConfig struct {
Expand All @@ -63,6 +68,13 @@ type envConfig struct {
// This string to append in the data field in the reply, if enabled.
// This will threat the data as text/plain field
ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"`

// If events should be dropped, specify the strategy here.
SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"`

// If events should be dropped according to Linear policy, this controls
// how many events are dropped.
SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"`
}

func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer {
Expand All @@ -81,12 +93,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ
logging.FromContext(ctx).Info("Observer won't reply with an event")
replyFunc = NoOpReply
}
var counter *dropevents.CounterHandler

if env.SkipStrategy != "" {
counter = &dropevents.CounterHandler{
Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter),
}
} else {
counter = &dropevents.CounterHandler{
// Don't skip anything, since count is 0. nop skipper.
Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0),
}
}

return &Observer{
Name: env.ObserverName,
EventLogs: eventLogs,
ctx: ctx,
replyFunc: replyFunc,
counter: counter,
}
}

Expand Down Expand Up @@ -136,12 +161,30 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
Origin: request.RemoteAddr,
Observer: o.Name,
Time: time.Now(),
Sequence: atomic.AddUint64(&o.seq, 1),
}
shouldSkip := o.counter.Skip()

// We still want to emit the event to make it easier to see what we had oberved, but
// we want to transform it a little bit before emitting so that it does not count
// as the real event that we want to emit.
if shouldSkip {
eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1)
eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type())
} else {
// Increment the sequence only if we're not dropping so that we do not
// introduce side effects.
eventInfo.Sequence = atomic.AddUint64(&o.seq, 1)
}

err := o.EventLogs.Vent(eventInfo)
if err != nil {
logging.FromContext(o.ctx).Warnw("Error while venting the recorded event", zap.Error(err))
}

o.replyFunc(o.ctx, writer, eventInfo)
if shouldSkip {
// Trigger a redelivery
writer.WriteHeader(http.StatusConflict)
} else {
o.replyFunc(o.ctx, writer, eventInfo)
}
}
22 changes: 1 addition & 21 deletions test/test_images/recordevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@ package main

import (
"log"
"net/http"
"os"

"k8s.io/client-go/rest"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
_ "knative.dev/pkg/system/testing"

"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/test/lib/dropevents"
"knative.dev/eventing/test/lib/recordevents/observer"
"knative.dev/eventing/test/lib/recordevents/recorder_vent"
"knative.dev/eventing/test/test_images"
Expand All @@ -49,24 +46,7 @@ func main() {
recorder_vent.NewFromEnv(ctx),
)

algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey)
if ok {
skipper := dropevents.SkipperAlgorithm(algorithm)
counter := dropevents.CounterHandler{
Skipper: skipper,
}
err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
if counter.Skip() {
writer.WriteHeader(http.StatusConflict)
return
}
handler.ServeHTTP(writer, request)
})
})
} else {
err = obs.Start(ctx, kncloudevents.CreateHandler)
}
err = obs.Start(ctx, kncloudevents.CreateHandler)

if err != nil {
logging.FromContext(ctx).Fatal("Error during start", err)
Expand Down