Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions test/lib/dropevents/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ const (
NumberKey = "NUMBER"
)

// count is only used for SKIP_ALGORITHM=Sequence.
func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper {
switch algorithm {
case Fibonacci:
return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1}

case Sequence:
return dropeventsfirst.First{N: count}

default:
panic("unknown algorithm: " + algorithm)
}
}

func SkipperAlgorithm(algorithm string) Skipper {

switch algorithm {
Expand Down
30 changes: 25 additions & 5 deletions test/lib/recordevents/event_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,36 @@ type EventInfo struct {
Observer string `json:"observer,omitempty"`
Time time.Time `json:"time,omitempty"`
Sequence uint64 `json:"sequence"`
Dropped bool `json:"dropped"`
}

// Pretty print the event. Meant for debugging. This formats the validation error
// or the full event as appropriate. This does NOT format the headers.
// Pretty print the event. Meant for debugging.
func (ei *EventInfo) String() string {
var sb strings.Builder
sb.WriteString("-- EventInfo --\n")
if ei.Event != nil {
return ei.Event.String()
} else {
return fmt.Sprintf("invalid event \"%s\"", ei.Error)
sb.WriteString("--- Event ---\n")
sb.WriteString(ei.Event.String())
sb.WriteRune('\n')
sb.WriteRune('\n')
}
if ei.Error != "" {
sb.WriteString("--- Error ---\n")
sb.WriteString(ei.Error)
sb.WriteRune('\n')
sb.WriteRune('\n')
}
sb.WriteString("--- HTTP headers ---\n")
for k, v := range ei.HTTPHeaders {
sb.WriteString(" " + k + ": " + v[0] + "\n")
}
sb.WriteRune('\n')
sb.WriteString("--- Origin: '" + ei.Origin + "' ---\n")
sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n")
sb.WriteString("--- Time: " + ei.Time.String() + " ---\n")
sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence))
sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped))
return sb.String()
}

// This is mainly used for providing better failure messages
Expand Down
27 changes: 27 additions & 0 deletions test/lib/recordevents/logger_vent/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package logger_vent

import "knative.dev/eventing/test/lib/recordevents"

type Logger func(string, ...interface{})

func (l Logger) Vent(observed recordevents.EventInfo) error {
l("Event: \n%s", observed.String())

return nil
}
61 changes: 56 additions & 5 deletions test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package observer
import (
"context"
"net/http"
"strings"
"sync/atomic"
"time"

"knative.dev/eventing/test/lib/dropevents"

cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand All @@ -41,11 +45,12 @@ type Observer struct {
ctx context.Context
seq uint64
replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
counter *dropevents.CounterHandler
}

type envConfig struct {
// ObserverName is used to identify this instance of the observer.
ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"`
ObserverName string `envconfig:"POD_NAME" default:"observer-default" required:"true"`

// Reply is used to define if the observer should reply back
Reply bool `envconfig:"REPLY" default:"false" required:"false"`
Expand All @@ -62,6 +67,13 @@ type envConfig struct {
// This string to append in the data field in the reply, if enabled.
// This will threat the data as text/plain field
ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"`

// If events should be dropped, specify the strategy here.
SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"`

// If events should be dropped according to Linear policy, this controls
// how many events are dropped.
SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"`
}

func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer {
Expand All @@ -80,12 +92,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ
logging.FromContext(ctx).Info("Observer won't reply with an event")
replyFunc = NoOpReply
}
var counter *dropevents.CounterHandler

if env.SkipStrategy != "" {
counter = &dropevents.CounterHandler{
Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter),
}
} else {
counter = &dropevents.CounterHandler{
// Don't skip anything, since count is 0. nop skipper.
Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0),
}
}

return &Observer{
Name: env.ObserverName,
EventLogs: eventLogs,
ctx: ctx,
replyFunc: replyFunc,
counter: counter,
}
}

Expand Down Expand Up @@ -118,25 +143,51 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
defer m.Finish(nil)

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
header := request.Header
headers := make(http.Header)
for k, v := range request.Header {
if !strings.HasPrefix(k, "Ce-") {
headers[k] = v
}
}
// Host header is removed from the request.Header map by net/http
if request.Host != "" {
headers.Set("Host", request.Host)
}

eventErrStr := ""
if eventErr != nil {
eventErrStr = eventErr.Error()
}

shouldSkip := o.counter.Skip()

eventInfo := recordevents.EventInfo{
Error: eventErrStr,
Event: event,
HTTPHeaders: header,
HTTPHeaders: headers,
Origin: request.RemoteAddr,
Observer: o.Name,
Time: time.Now(),
Sequence: atomic.AddUint64(&o.seq, 1),
Dropped: shouldSkip,
}

// We still want to emit the event to make it easier to see what we had oberved, but
// we want to transform it a little bit before emitting so that it does not count
// as the real event that we want to emit.
if shouldSkip {
eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type())
}

err := o.EventLogs.Vent(eventInfo)
if err != nil {
logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err)
logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err))
}

o.replyFunc(o.ctx, writer, eventInfo)
if shouldSkip {
// Trigger a redelivery
writer.WriteHeader(http.StatusConflict)
} else {
o.replyFunc(o.ctx, writer, eventInfo)
}
}
95 changes: 9 additions & 86 deletions test/lib/recordevents/recorder_vent/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package recorder_vent
import (
"context"
"log"
"math/rand"
"time"

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/api/errors"
restclient "k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand Down Expand Up @@ -70,86 +64,15 @@ func NewEventLog(ctx context.Context, agentName string, podName string, podNames

logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace)

return &recorder{out: createRecorder(ctx, agentName, podNamespace), on: on}
}

func createRecorder(ctx context.Context, agentName string, namespace string) record.EventRecorder {
logger := logging.FromContext(ctx)

recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
watches := []watch.Interface{
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
eventBroadcaster.StartEventWatcher(
sendToSink(ctx, kubeclient.Get(ctx).CoreV1().Events(namespace).CreateWithEventNamespace),
),
}
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName})
go func() {
<-ctx.Done()
for _, w := range watches {
w.Stop()
}
logging.FromContext(ctx).Debug("Closed event-broadcaster")
}()
}

return recorder
}

func sendToSink(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error)) func(*corev1.Event) {
return func(event *corev1.Event) {
tries := 0
for {
if recordEvent(ctx, sender, event) {
break
}
tries++
if tries >= maxRetry {
logging.FromContext(ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
}

func recordEvent(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error), event *corev1.Event) bool {
newEv, err := sender(event)
if err == nil {
logging.FromContext(ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID)
return true
reference, err := ref.GetReference(scheme.Scheme, on)
if err != nil {
logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err)
}

// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
logging.FromContext(ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
logging.FromContext(ctx).Infof("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
} else {
logging.FromContext(ctx).Errorf("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
}
return true
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
return &recorder{
ctx: ctx,
namespace: podNamespace,
agentName: agentName,
ref: reference,
}
logging.FromContext(ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false
}
Loading