Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/pelletier/go-toml v1.8.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.9.1
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand Down
13 changes: 8 additions & 5 deletions test/e2e/helpers/channel_event_tranformation_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,16 @@ func EventTransformationForSubscriptionTestHelper(
if err := eventAfterTransformation.SetData(cloudevents.ApplicationJSON, []byte(transformedEventBody)); err != nil {
t.Fatal("Cannot set the payload of the event:", err.Error())
}
transformationPod := resources.EventTransformationPod(
recordevents.DeployEventRecordOrFail(
ctx,
client,
transformationPodName,
eventAfterTransformation.Type(),
eventAfterTransformation.Source(),
eventAfterTransformation.Data(),
recordevents.ReplyWithTransformedEvent(
eventAfterTransformation.Type(),
eventAfterTransformation.Source(),
string(eventAfterTransformation.Data()),
),
)
client.CreatePodOrFail(transformationPod, testlib.WithService(transformationPodName))

// create event logger pod and service as the subscriber
eventTracker, _ := recordevents.StartEventRecordOrFail(ctx, client, recordEventsPodName)
Expand Down
31 changes: 0 additions & 31 deletions test/lib/recordevents/event_info_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,9 @@ import (

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/util/wait"
pkgTest "knative.dev/pkg/test"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
)

const (
Expand All @@ -42,34 +39,6 @@ const (
retryTimeout = 4 * time.Minute
)

type EventRecordOption = func(*corev1.Pod, *testlib.Client) error

func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod {
client.CreateServiceAccountOrFail(name)
client.CreateRoleOrFail(resources.Role(name,
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get"},
}),
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{rbacv1.VerbAll},
}),
))
client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace)

eventRecordPod := EventRecordPod(name, name)
client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...)
err := pkgTest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace)
if err != nil {
client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err))
}
client.WaitForServiceEndpointsOrFail(ctx, name, 1)
return eventRecordPod
}

// Deploys a new recordevents pod and start the associated EventInfoStore
func StartEventRecordOrFail(ctx context.Context, client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) {
eventRecordPod := DeployEventRecordOrFail(ctx, client, podName, options...)
Expand Down
58 changes: 40 additions & 18 deletions test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,64 @@ import (
cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
"github.com/prometheus/common/log"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
)

// Observer is the entry point for sinking events into the event log.
type Observer struct {

// Name is the name of this Observer, used to filter if multiple observers.
Name string
// EventLogs is the list of EventLog implementors to vent observed events.
EventLogs recordevents.EventLogs

seq uint64
}

// New returns an observer that will vent observations to the list of provided
// EventLog instances. It will listen on :8080.
func New(name string, eventLogs ...recordevents.EventLog) *Observer {
return &Observer{
Name: name,
EventLogs: eventLogs,
}
ctx context.Context
seq uint64
replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
}

type envConfig struct {
// ObserverName is used to identify this instance of the observer.
ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"`

// Reply is used to define if the observer should reply back
Reply bool `envconfig:"REPLY" default:"false" required:"false"`

// The event type to use in the reply, if enabled
ReplyEventType string `envconfig:"REPLY_EVENT_TYPE" default:"" required:"false"`

// The event source to use in the reply, if enabled
ReplyEventSource string `envconfig:"REPLY_EVENT_SOURCE" default:"" required:"false"`

// The event data to use in the reply, if enabled
ReplyEventData string `envconfig:"REPLY_EVENT_DATA" default:"" required:"false"`
}

func NewFromEnv(eventLogs ...recordevents.EventLog) *Observer {
func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
log.Fatal("Failed to process env var", err)
logging.FromContext(ctx).Fatal("Failed to process env var", err)
}

logging.FromContext(ctx).Infof("Observer environment configuration: %+v", env)

var replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
if env.Reply {
logging.FromContext(ctx).Info("Observer will reply with an event")
replyFunc = ReplyTransformerFunc(env.ReplyEventType, env.ReplyEventSource, env.ReplyEventData)
} else {
logging.FromContext(ctx).Info("Observer won't reply with an event")
replyFunc = NoOpReply
}

return New(env.ObserverName, eventLogs...)
return &Observer{
Name: env.ObserverName,
EventLogs: eventLogs,
ctx: ctx,
replyFunc: replyFunc,
}
}

// Start will create the CloudEvents client and start listening for inbound
Expand Down Expand Up @@ -99,18 +120,19 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
if eventErr != nil {
eventErrStr = eventErr.Error()
}
err := o.EventLogs.Vent(recordevents.EventInfo{
eventInfo := recordevents.EventInfo{
Error: eventErrStr,
Event: event,
HTTPHeaders: header,
Origin: request.RemoteAddr,
Observer: o.Name,
Time: time.Now(),
Sequence: atomic.AddUint64(&o.seq, 1),
})
}
err := o.EventLogs.Vent(eventInfo)
if err != nil {
log.Warn("Error while venting the recorded event", err)
logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err)
}

writer.WriteHeader(http.StatusAccepted)
o.replyFunc(o.ctx, writer, eventInfo)
}
70 changes: 70 additions & 0 deletions test/lib/recordevents/observer/reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package observer

import (
"context"
"net/http"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
)

func NoOpReply(_ context.Context, writer http.ResponseWriter, _ recordevents.EventInfo) {
writer.WriteHeader(http.StatusAccepted)
}

func ReplyTransformerFunc(replyEventType string, replyEventSource string, replyEventData string) func(context.Context, http.ResponseWriter, recordevents.EventInfo) {
return func(ctx context.Context, writer http.ResponseWriter, info recordevents.EventInfo) {
if info.Error != "" {
writer.WriteHeader(http.StatusBadRequest)
_, _ = writer.Write([]byte(info.Error))
logging.FromContext(ctx).Warn("Conversion error in the event to send back", info.Error)
return
}

if info.Event == nil {
writer.WriteHeader(http.StatusBadRequest)
_, _ = writer.Write([]byte("No event!"))
logging.FromContext(ctx).Warn("No event to send back")
return
}

outputEvent := info.Event.Clone()

if replyEventSource != "" {
outputEvent.SetSource(replyEventSource)
}
if replyEventType != "" {
outputEvent.SetType(replyEventType)
}
if replyEventData != "" {
if err := outputEvent.SetData(cloudevents.ApplicationJSON, []byte(replyEventData)); err != nil {
logging.FromContext(ctx).Warn("Cannot set the event data")
}
}

err := cehttp.WriteResponseWriter(ctx, binding.ToMessage(&outputEvent), 200, writer)
if err != nil {
logging.FromContext(ctx).Warn("Error while writing the event as response", err)
}
}
}
2 changes: 1 addition & 1 deletion test/lib/recordevents/recorder_vent/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewFromEnv(ctx context.Context) recordevents.EventLog {
log.Fatal("Failed to process env var", err)
}

logging.FromContext(ctx).Infof("Environment configuration: %+v", env)
logging.FromContext(ctx).Infof("Recorder vent environment configuration: %+v", env)

return NewEventLog(ctx, env.AgentName, env.PodName)
}
Expand Down
81 changes: 79 additions & 2 deletions test/lib/recordevents/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,89 @@ limitations under the License.
package recordevents

import (
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/pkg/test"
pkgtest "knative.dev/pkg/test"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/resources"
)

type EventRecordOption = func(*corev1.Pod, *testlib.Client) error

// EchoEvent is an option to let the recordevents reply with the received event
func EchoEvent(pod *corev1.Pod, client *testlib.Client) error {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY", Value: "true"},
)
return nil
}

var _ EventRecordOption = EchoEvent

// ReplyWithTransformedEvent is an option to let the recordevents reply with the transformed event
func ReplyWithTransformedEvent(replyEventType string, replyEventSource string, replyEventData string) EventRecordOption {
return func(pod *corev1.Pod, client *testlib.Client) error {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY", Value: "true"},
)
if replyEventType != "" {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY_EVENT_TYPE", Value: replyEventType},
)
}
if replyEventSource != "" {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY_EVENT_SOURCE", Value: replyEventSource},
)
}
if replyEventData != "" {
pod.Spec.Containers[0].Env = append(
pod.Spec.Containers[0].Env,
corev1.EnvVar{Name: "REPLY_EVENT_DATA", Value: replyEventData},
)
}

return nil
}
}

// DeployEventRecordOrFail deploys the recordevents image with necessary sa, roles, rb to execute the image
func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod {
client.CreateServiceAccountOrFail(name)
client.CreateRoleOrFail(resources.Role(name,
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"pods"},
Verbs: []string{"get"},
}),
resources.WithRuleForRole(&rbacv1.PolicyRule{
APIGroups: []string{""},
Resources: []string{"events"},
Verbs: []string{rbacv1.VerbAll},
}),
))
client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace)

eventRecordPod := EventRecordPod(name, name)
client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...)
err := pkgtest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace)
if err != nil {
client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err))
}
client.WaitForServiceEndpointsOrFail(ctx, name, 1)
return eventRecordPod
}

// EventRecordPod creates a Pod that stores received events for test retrieval.
func EventRecordPod(name string, serviceAccountName string) *corev1.Pod {
return recordEventsPod("recordevents", name, serviceAccountName)
Expand All @@ -37,7 +114,7 @@ func recordEventsPod(imageName string, name string, serviceAccountName string) *
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: imageName,
Image: test.ImagePath(imageName),
Image: pkgtest.ImagePath(imageName),
ImagePullPolicy: corev1.PullIfNotPresent,
Env: []corev1.EnvVar{{
Name: "SYSTEM_NAMESPACE",
Expand Down
6 changes: 3 additions & 3 deletions test/test_images/recordevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"os"

"k8s.io/client-go/rest"
"knative.dev/pkg/injection/sharedmain"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
_ "knative.dev/pkg/system/testing"

Expand All @@ -39,13 +39,13 @@ func main() {
log.Fatal("Error while reading the cfg", err)
}
//nolint // nil ctx is fine here, look at the code of EnableInjectionOrDie
ctx := sharedmain.EnableInjectionOrDie(nil, cfg)
ctx, _ := injection.EnableInjectionOrDie(nil, cfg)

if err := test_images.ConfigureTracing(logging.FromContext(ctx), ""); err != nil {
logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err)
}

obs := observer.NewFromEnv(
obs := observer.NewFromEnv(ctx,
recorder_vent.NewFromEnv(ctx),
)

Expand Down
1 change: 0 additions & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,6 @@ github.com/prometheus/client_golang/prometheus/promhttp
# github.com/prometheus/client_model v0.2.0
github.com/prometheus/client_model/go
# github.com/prometheus/common v0.9.1
## explicit
github.com/prometheus/common/expfmt
github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg
github.com/prometheus/common/log
Expand Down