Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ require (
github.com/pelletier/go-toml v1.8.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pkg/errors v0.9.1
github.com/prometheus/common v0.9.1
github.com/rickb777/date v1.13.0
github.com/robfig/cron/v3 v3.0.1
github.com/rogpeppe/fastuuid v1.2.0
Expand All @@ -31,7 +32,7 @@ require (
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
google.golang.org/grpc v1.31.1
gopkg.in/yaml.v2 v2.3.0
gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
k8s.io/api v0.18.8
k8s.io/apiextensions-apiserver v0.18.4
k8s.io/apimachinery v0.18.8
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,7 @@ github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJ
github.com/imdario/mergo v0.3.8/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.9 h1:UauaLniWCFHWd+Jp9oCEkTBj8VO/9DKg3PV3VCNMDIg=
github.com/imdario/mergo v0.3.9/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb v0.0.0-20161215172503-049f9b42e9a5/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0=
Expand Down Expand Up @@ -821,6 +822,7 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.8/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI=
Expand Down Expand Up @@ -1062,6 +1064,7 @@ github.com/spf13/cobra v0.0.2-0.20171109065643-2da4a54c5cee/go.mod h1:1l0Ry5zgKv
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU=
github.com/spf13/cobra v0.0.6/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/cobra v1.0.0 h1:6m/oheQuQ13N9ks4hubMG6BnvwOeaJrqSPLahSnczz8=
github.com/spf13/cobra v1.0.0/go.mod h1:/6GTrnGXV9HjY+aR4k0oJ5tcvakLuG6EuKReYlHNrgE=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
Expand Down Expand Up @@ -1726,8 +1729,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20190709130402-674ba3eaed22/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d h1:LCPbGQ34PMrwad11aMZ+dbz5SAsq/0ySjRwQ8I9Qwd8=
gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
helm.sh/helm/v3 v3.1.1/go.mod h1:WYsFJuMASa/4XUqLyv54s0U/f3mlAaRErGmyy4z921g=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/broker_control_plane_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"knative.dev/pkg/reconciler"

eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/test/lib/recordevents"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/duck"
Expand Down Expand Up @@ -82,8 +83,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client
const etLogger = "logger"
const loggerPodName = "logger-pod"

logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(context.TODO(), client, loggerPodName)
client.WaitForAllTestResourcesReadyOrFail(context.Background()) // Can't do this for the trigger because it's not 'ready' yet
client.CreateTriggerOrFailV1Beta1(triggerName,
resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}),
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"knative.dev/eventing/pkg/apis/eventing/v1beta1"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"
)
Expand Down Expand Up @@ -82,8 +83,7 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes
)

// Create a logger (EventRecord) Pod and a K8s Service that points to it.
logPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName))
_ = recordevents.DeployEventRecordOrFail(ctx, client, loggerPodName)

// Create a Trigger that receives events (type=bar) and sends them to the logger Pod.
loggerTrigger := client.CreateTriggerOrFailV1Beta1(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1"
eventingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"

corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -59,8 +60,7 @@ func channelHasRequiredSubscriberStatus(ctx context.Context, st *testing.T, clie
client.CreateChannelOrFail(channelName, &channel)
client.WaitForResourceReadyOrFail(channelName, &channel)

pod := resources.EventRecordPod(subscriberServiceName + "-pod")
client.CreatePodOrFail(pod, testlib.WithService(subscriberServiceName))
_ = recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberServiceName+"-pod")

subscription := client.CreateSubscriptionOrFail(
subscriberServiceName,
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/sender"
)
Expand Down Expand Up @@ -69,8 +70,7 @@ func setupChannelTracingWithReply(
client.CreateChannelOrFail(replyChannelName, channel)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, testlib.WithService(recordEventsPodName))
recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName)

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.EventTransformationPod(
Expand Down
4 changes: 2 additions & 2 deletions test/conformance/helpers/tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func tracingTest(
expectedTestSpan, eventMatcher := setupInfrastructure(ctx, t, &channel, client, recordEventsPodName, true)

// Start the event info store and assert the event was received correctly
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName, client.Namespace)
if err != nil {
t.Fatal("Pod tracker failed:", err)
}
Expand Down Expand Up @@ -107,6 +107,6 @@ func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string {
}
}
}
t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos)
t.Fatalf("FAIL: No traceid in %d messages: (%v)", len(evInfos), evInfos)
return ""
}
5 changes: 3 additions & 2 deletions test/e2e/helpers/trigger_no_broker_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"

testlib "knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
)

Expand All @@ -41,8 +42,8 @@ func TestTriggerNoBroker(ctx context.Context, t *testing.T, channel string, brok
brokerName := strings.ToLower(channel)

subscriberName := "dumper-empty"
eventRecordPod := resources.EventRecordPod(subscriberName)
client.CreatePodOrFail(eventRecordPod, testlib.WithService(subscriberName))
recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberName)

client.CreateTriggerOrFailV1Beta1("testtrigger",
resources.WithSubscriberServiceRefForTriggerV1Beta1(subscriberName),
resources.WithBrokerV1Beta1(brokerName),
Expand Down
6 changes: 6 additions & 0 deletions test/lib/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Client struct {
Dynamic dynamic.Interface
Config *rest.Config

EventListener *EventListener

Namespace string
T *testing.T
Tracker *Tracker
Expand Down Expand Up @@ -93,6 +95,10 @@ func NewClient(configPath string, clusterName string, namespace string, t *testi
client.T = t
client.Tracker = NewTracker(t, client.Dynamic)

// Start informer
client.EventListener = NewEventListener(client.Kube.Kube, client.Namespace, client.T.Logf)
client.Cleanup(client.EventListener.Stop)

client.tracingEnv, err = getTracingConfig(client.Kube.Kube)
if err != nil {
return nil, err
Expand Down
84 changes: 84 additions & 0 deletions test/lib/k8s_events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2020 The Knative Authors

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lib

import (
"context"
"sync"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

// EventHandler is the callback type for the EventListener
type EventHandler func(event *corev1.Event)

// EventListener is a type that broadcasts new k8s events
type EventListener struct {
cancel context.CancelFunc

lock sync.Mutex
handlers []EventHandler
}

// NewEventListener creates a new event listener
func NewEventListener(client kubernetes.Interface, namespace string, logf func(string, ...interface{})) *EventListener {
ctx, cancelCtx := context.WithCancel(context.Background())
informerFactory := informers.NewSharedInformerFactoryWithOptions(
client,
0,
informers.WithNamespace(namespace),
)
eventsInformer := informerFactory.Core().V1().Events().Informer()

el := EventListener{
cancel: cancelCtx,
}

eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
el.handle(obj.(*corev1.Event))
},
})

go func() {
eventsInformer.Run(ctx.Done())
logf("EventListener stopped")
}()
Comment on lines +43 to +63
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reconciler-test the event informer is injected into the context (something to keep in mind when moving this code over).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool so we can avoid creating it manually!

Copy link
Copy Markdown
Contributor Author

@slinkydeveloper slinkydeveloper Oct 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll still need this EventListener, right? Note that this is shared between different EventInfoStore

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup I think so.


return &el
}

func (el *EventListener) handle(event *corev1.Event) {
el.lock.Lock()
defer el.lock.Unlock()
for _, handler := range el.handlers {
handler(event)
}
}

func (el *EventListener) AddHandler(handler EventHandler) {
el.lock.Lock()
defer el.lock.Unlock()
el.handlers = append(el.handlers, handler)
}

func (el *EventListener) Stop() {
el.cancel()
}
Loading