Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions test/lib/recordevents/event_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type EventInfo struct {
Observer string `json:"observer,omitempty"`
Time time.Time `json:"time,omitempty"`
Sequence uint64 `json:"sequence"`
Dropped bool `json:"dropped"`
}

// Pretty print the event. Meant for debugging.
Expand Down Expand Up @@ -69,6 +70,7 @@ func (ei *EventInfo) String() string {
sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n")
sb.WriteString("--- Time: " + ei.Time.String() + " ---\n")
sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence))
sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped))
return sb.String()
}

Expand Down
27 changes: 27 additions & 0 deletions test/lib/recordevents/logger_vent/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package logger_vent
Comment thread
slinkydeveloper marked this conversation as resolved.

import "knative.dev/eventing/test/lib/recordevents"

type Logger func(string, ...interface{})

func (l Logger) Vent(observed recordevents.EventInfo) error {
l("Event: \n%s", observed.String())

return nil
}
31 changes: 17 additions & 14 deletions test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package observer
import (
"context"
"net/http"
"strings"
"sync/atomic"
"time"

Expand All @@ -41,10 +42,8 @@ type Observer struct {
// EventLogs is the list of EventLog implementors to vent observed events.
EventLogs recordevents.EventLogs

ctx context.Context
seq uint64
// Increment this for every dropped event that we see
dropSeq uint64
ctx context.Context
seq uint64
replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
counter *dropevents.CounterHandler
}
Expand Down Expand Up @@ -144,41 +143,45 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
defer m.Finish(nil)

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
header := request.Header
headers := make(http.Header)
for k, v := range request.Header {
if !strings.HasPrefix(k, "Ce-") {
headers[k] = v
}
}
// Host header is removed from the request.Header map by net/http
if request.Host != "" {
header.Set("Host", request.Host)
headers.Set("Host", request.Host)
}

eventErrStr := ""
if eventErr != nil {
eventErrStr = eventErr.Error()
}

shouldSkip := o.counter.Skip()

eventInfo := recordevents.EventInfo{
Error: eventErrStr,
Event: event,
HTTPHeaders: header,
HTTPHeaders: headers,
Origin: request.RemoteAddr,
Observer: o.Name,
Time: time.Now(),
Sequence: atomic.AddUint64(&o.seq, 1),
Dropped: shouldSkip,
}
shouldSkip := o.counter.Skip()

// We still want to emit the event to make it easier to see what we had oberved, but
// we want to transform it a little bit before emitting so that it does not count
// as the real event that we want to emit.
if shouldSkip {
eventInfo.Sequence = atomic.AddUint64(&o.dropSeq, 1)
eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type())
} else {
// Increment the sequence only if we're not dropping so that we do not
// introduce side effects.
eventInfo.Sequence = atomic.AddUint64(&o.seq, 1)
}

err := o.EventLogs.Vent(eventInfo)
if err != nil {
logging.FromContext(o.ctx).Warnw("Error while venting the recorded event", zap.Error(err))
logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err))
}

if shouldSkip {
Expand Down
95 changes: 9 additions & 86 deletions test/lib/recordevents/recorder_vent/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,16 @@ package recorder_vent
import (
"context"
"log"
"math/rand"
"time"

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/api/errors"
restclient "k8s.io/client-go/rest"
"knative.dev/pkg/system"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand Down Expand Up @@ -70,86 +64,15 @@ func NewEventLog(ctx context.Context, agentName string, podName string) recordev

logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace)

return &recorder{out: createRecorder(ctx, agentName), on: on}
}

func createRecorder(ctx context.Context, agentName string) record.EventRecorder {
logger := logging.FromContext(ctx)

recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
watches := []watch.Interface{
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
eventBroadcaster.StartEventWatcher(
sendToSink(ctx, kubeclient.Get(ctx).CoreV1().Events(system.Namespace()).CreateWithEventNamespace),
),
}
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName})
go func() {
<-ctx.Done()
for _, w := range watches {
w.Stop()
}
logging.FromContext(ctx).Debug("Closed event-broadcaster")
}()
}

return recorder
}

func sendToSink(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error)) func(*corev1.Event) {
return func(event *corev1.Event) {
tries := 0
for {
if recordEvent(ctx, sender, event) {
break
}
tries++
if tries >= maxRetry {
logging.FromContext(ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
}

func recordEvent(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error), event *corev1.Event) bool {
newEv, err := sender(event)
if err == nil {
logging.FromContext(ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID)
return true
reference, err := ref.GetReference(scheme.Scheme, on)
if err != nil {
logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err)
}

// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
logging.FromContext(ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
logging.FromContext(ctx).Infof("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
} else {
logging.FromContext(ctx).Errorf("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
}
return true
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
return &recorder{
ctx: ctx,
namespace: system.Namespace(),
agentName: agentName,
ref: reference,
}
logging.FromContext(ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false
}
89 changes: 83 additions & 6 deletions test/lib/recordevents/recorder_vent/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,104 @@ limitations under the License.
package recorder_vent

import (
"context"
"encoding/json"
"fmt"
"math/rand"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
restclient "k8s.io/client-go/rest"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
)

type recorder struct {
out record.EventRecorder
on runtime.Object
ctx context.Context
namespace string
agentName string

ref *corev1.ObjectReference
}

func (r *recorder) Vent(observed recordevents.EventInfo) error {
b, err := json.Marshal(observed)
if err != nil {
return err
}
message := string(b)

t := time.Now()
// Note: DO NOT SET EventTime, or you'll trigger k8s api server hilarity:
// - https://github.com/kubernetes/kubernetes/issues/95913
// - https://github.com/kubernetes/kubernetes/blob/master/pkg/apis/core/validation/events.go#L122
event := &corev1.Event{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("%v.%d", r.ref.Name, observed.Sequence),
Namespace: r.namespace,
},
InvolvedObject: *r.ref,
Reason: recordevents.CloudEventObservedReason,
Message: message,
Source: corev1.EventSource{Component: r.agentName},
FirstTimestamp: metav1.Time{Time: t},
LastTimestamp: metav1.Time{Time: t},
Comment thread
slinkydeveloper marked this conversation as resolved.
Count: 1,
Type: corev1.EventTypeNormal,
}

return r.recordEvent(event)
}

func (r *recorder) recordEvent(event *corev1.Event) error {
tries := 0
for {
done, err := r.trySendEvent(event)
if done {
return nil
}
tries++
if tries >= maxRetry {
logging.FromContext(r.ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name)
return err
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}

r.out.Eventf(r.on, corev1.EventTypeNormal, recordevents.CloudEventObservedReason, "%s", string(b))
func (r *recorder) trySendEvent(event *corev1.Event) (bool, error) {
newEv, err := kubeclient.Get(r.ctx).CoreV1().Events(r.namespace).CreateWithEventNamespace(event)
if err == nil {
logging.FromContext(r.ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID)
return true, nil
}

return nil
// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
logging.FromContext(r.ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err)
return true, err
case *apierrors.StatusError:
logging.FromContext(r.ctx).Errorf("Server rejected event '%s'. Reason: '%v' (will not retry!). Event: %v", event.Name, err, event)
return true, err
case *apierrors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
}
logging.FromContext(r.ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false, err
}
2 changes: 2 additions & 0 deletions test/test_images/recordevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
_ "knative.dev/pkg/system/testing"

"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/test/lib/recordevents/logger_vent"
"knative.dev/eventing/test/lib/recordevents/observer"
"knative.dev/eventing/test/lib/recordevents/recorder_vent"
"knative.dev/eventing/test/test_images"
Expand All @@ -44,6 +45,7 @@ func main() {
}

obs := observer.NewFromEnv(ctx,
logger_vent.Logger(logging.FromContext(ctx).Infof),
recorder_vent.NewFromEnv(ctx),
)

Expand Down