Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pelletier/go-toml v1.8.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.9.1
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/broker_control_plane_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/pkg/reconciler"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/test/lib/recordevents"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
Expand Down Expand Up @@ -82,8 +83,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client
const etLogger = "logger"
const loggerPodName = "logger-pod"

logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(context.TODO(), client, loggerPodName)
client.WaitForAllTestResourcesReadyOrFail(context.Background()) // Can't do this for the trigger because it's not 'ready' yet
client.CreateTriggerOrFailV1Beta1(triggerName,
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}),
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/eventing/pkg/utils"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"
)
Expand Down Expand Up @@ -81,8 +82,7 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes
)

// Create a logger (EventRecord) Pod and a K8s Service that points to it.
logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(ctx, client, loggerPodName)

// Create a Trigger that receives events (type=bar) and sends them to the logger Pod.
loggerTrigger := client.CreateTriggerOrFailV1Beta1(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -59,8 +60,7 @@ func channelHasRequiredSubscriberStatus(ctx context.Context, st *testing.T, clie
client.CreateChannelOrFail(channelName, &channel)
client.WaitForResourceReadyOrFail(channelName, &channel)

pod := resources.EventRecordPod(subscriberServiceName + "-pod")
client.CreatePodOrFail(pod, testlib.WithService(subscriberServiceName))
_ = recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberServiceName+"-pod")

subscription := client.CreateSubscriptionOrFail(
subscriberServiceName,
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"
)
Expand Down Expand Up @@ -69,8 +70,7 @@ func setupChannelTracingWithReply(
client.CreateChannelOrFail(replyChannelName, channel)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, testlib.WithService(recordEventsPodName))
recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName)

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.EventTransformationPod(
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func tracingTest(
expectedTestSpan, eventMatcher := setupInfrastructure(ctx, t, &channel, client, recordEventsPodName, true)

// Start the event info store and assert the event was received correctly
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName, client.Namespace)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
Expand Down Expand Up @@ -107,6 +107,6 @@ func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string {
}
}
}
t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos)
t.Fatalf("FAIL: No traceid in %d messages: (%v)", len(evInfos), evInfos)
return ""
}
5 changes: 3 additions & 2 deletions test/e2e/helpers/trigger_no_broker_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -41,8 +42,8 @@ func TestTriggerNoBroker(ctx context.Context, t *testing.T, channel string, brok
brokerName := strings.ToLower(channel)

subscriberName := "dumper-empty"
eventRecordPod := resources.EventRecordPod(subscriberName)
client.CreatePodOrFail(eventRecordPod, testlib.WithService(subscriberName))
recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberName)

client.CreateTriggerOrFailV1Beta1("testtrigger",
resources.WithSubscriberServiceRefForTriggerV1Beta1(subscriberName),
resources.WithBrokerV1Beta1(brokerName),
Expand Down
6 changes: 6 additions & 0 deletions test/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Client struct {
Dynamic dynamic.Interface
Config *rest.Config

EventListener *EventListener

Namespace string
T *testing.T
Tracker *Tracker
Expand Down Expand Up @@ -93,6 +95,10 @@ func NewClient(configPath string, clusterName string, namespace string, t *testi
client.T = t
client.Tracker = NewTracker(t, client.Dynamic)

// Start informer
client.EventListener = NewEventListener(client.Kube.Kube, client.Namespace, client.T.Logf)
client.Cleanup(client.EventListener.Stop)

client.tracingEnv, err = getTracingConfig(client.Kube.Kube)
if err != nil {
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions test/lib/k8s_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

// EventHandler is the callback type for the EventListener
type EventHandler func(event *corev1.Event)

// EventListener is a type that broadcasts new k8s events
type EventListener struct {
cancel context.CancelFunc

lock sync.Mutex
handlers []EventHandler
}

// NewEventListener creates a new event listener
func NewEventListener(client kubernetes.Interface, namespace string, logf func(string, ...interface{})) *EventListener {
ctx, cancelCtx := context.WithCancel(context.Background())
informerFactory := informers.NewSharedInformerFactoryWithOptions(
client,
0,
informers.WithNamespace(namespace),
)
eventsInformer := informerFactory.Core().V1().Events().Informer()

el := EventListener{
cancel: cancelCtx,
}

eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
el.handle(obj.(*corev1.Event))
},
})

go func() {
eventsInformer.Run(ctx.Done())
logf("EventListener stopped")
}()

return &el
}

func (el *EventListener) handle(event *corev1.Event) {
el.lock.Lock()
defer el.lock.Unlock()
for _, handler := range el.handlers {
handler(event)
}
}

func (el *EventListener) AddHandler(handler EventHandler) {
el.lock.Lock()
defer el.lock.Unlock()
el.handlers = append(el.handlers, handler)
}

func (el *EventListener) Stop() {
el.cancel()
}
Loading