diff --git a/go.mod b/go.mod index 069d1f3a770..6d73ed8b61d 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/pelletier/go-toml v1.8.0 github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2 github.com/pkg/errors v0.9.1 + github.com/prometheus/common v0.9.1 github.com/rickb777/date v1.13.0 github.com/robfig/cron/v3 v3.0.1 github.com/rogpeppe/fastuuid v1.2.0 diff --git a/test/conformance/helpers/broker_control_plane_test_helper.go b/test/conformance/helpers/broker_control_plane_test_helper.go index 22cc8447061..f9385ce6958 100644 --- a/test/conformance/helpers/broker_control_plane_test_helper.go +++ b/test/conformance/helpers/broker_control_plane_test_helper.go @@ -25,6 +25,7 @@ import ( "knative.dev/pkg/reconciler" eventingv1beta1 "knative.dev/eventing/pkg/apis/eventing/v1beta1" + "knative.dev/eventing/test/lib/recordevents" testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/duck" @@ -82,8 +83,7 @@ func triggerV1Beta1BeforeBrokerHelper(triggerName string, client *testlib.Client const etLogger = "logger" const loggerPodName = "logger-pod" - logPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName)) + _ = recordevents.DeployEventRecordOrFail(context.TODO(), client, loggerPodName) client.WaitForAllTestResourcesReadyOrFail(context.Background()) // Can't do this for the trigger because it's not 'ready' yet client.CreateTriggerOrFailV1Beta1(triggerName, resources.WithAttributesTriggerFilterV1Beta1(eventingv1beta1.TriggerAnyFilter, etLogger, map[string]interface{}{}), diff --git a/test/conformance/helpers/broker_tracing_test_helper.go b/test/conformance/helpers/broker_tracing_test_helper.go index 19b12be797c..f13ecc9df07 100644 --- a/test/conformance/helpers/broker_tracing_test_helper.go +++ b/test/conformance/helpers/broker_tracing_test_helper.go @@ -30,6 +30,7 @@ import ( "knative.dev/eventing/pkg/utils" tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/lib/sender" ) @@ -81,8 +82,7 @@ func setupBrokerTracing(ctx context.Context, brokerClass string) SetupTracingTes ) // Create a logger (EventRecord) Pod and a K8s Service that points to it. - logPod := resources.EventRecordPod(loggerPodName) - client.CreatePodOrFail(logPod, testlib.WithService(loggerPodName)) + _ = recordevents.DeployEventRecordOrFail(ctx, client, loggerPodName) // Create a Trigger that receives events (type=bar) and sends them to the logger Pod. loggerTrigger := client.CreateTriggerOrFailV1Beta1( diff --git a/test/conformance/helpers/channel_status_subscriber_test_helper.go b/test/conformance/helpers/channel_status_subscriber_test_helper.go index d148e26d682..4a0c2e79834 100644 --- a/test/conformance/helpers/channel_status_subscriber_test_helper.go +++ b/test/conformance/helpers/channel_status_subscriber_test_helper.go @@ -24,6 +24,7 @@ import ( duckv1beta1 "knative.dev/eventing/pkg/apis/duck/v1beta1" eventingv1beta1 "knative.dev/eventing/pkg/apis/messaging/v1beta1" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" corev1 "k8s.io/api/core/v1" @@ -59,8 +60,7 @@ func channelHasRequiredSubscriberStatus(ctx context.Context, st *testing.T, clie client.CreateChannelOrFail(channelName, &channel) client.WaitForResourceReadyOrFail(channelName, &channel) - pod := resources.EventRecordPod(subscriberServiceName + "-pod") - client.CreatePodOrFail(pod, testlib.WithService(subscriberServiceName)) + _ = recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberServiceName+"-pod") subscription := client.CreateSubscriptionOrFail( subscriberServiceName, diff --git a/test/conformance/helpers/channel_tracing_test_helper.go b/test/conformance/helpers/channel_tracing_test_helper.go index 49888b3be09..35bf6ab584d 100644 --- a/test/conformance/helpers/channel_tracing_test_helper.go +++ b/test/conformance/helpers/channel_tracing_test_helper.go @@ -29,6 +29,7 @@ import ( tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" "knative.dev/eventing/test/lib/sender" ) @@ -69,8 +70,7 @@ func setupChannelTracingWithReply( client.CreateChannelOrFail(replyChannelName, channel) // Create the 'sink', a LogEvents Pod and a K8s Service that points to it. - recordEventsPod := resources.EventRecordPod(recordEventsPodName) - client.CreatePodOrFail(recordEventsPod, testlib.WithService(recordEventsPodName)) + recordEventsPod := recordevents.DeployEventRecordOrFail(ctx, client, recordEventsPodName) // Create the subscriber, a Pod that mutates the event. transformerPod := resources.EventTransformationPod( diff --git a/test/conformance/helpers/tracing_test_helper.go b/test/conformance/helpers/tracing_test_helper.go index 67a7f3a5936..4811bda4aae 100644 --- a/test/conformance/helpers/tracing_test_helper.go +++ b/test/conformance/helpers/tracing_test_helper.go @@ -67,7 +67,7 @@ func tracingTest( expectedTestSpan, eventMatcher := setupInfrastructure(ctx, t, &channel, client, recordEventsPodName, true) // Start the event info store and assert the event was received correctly - targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName) + targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName, client.Namespace) if err != nil { t.Fatalf("Pod tracker failed: %v", err) } @@ -107,6 +107,6 @@ func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string { } } } - t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos) + t.Fatalf("FAIL: No traceid in %d messages: (%v)", len(evInfos), evInfos) return "" } diff --git a/test/e2e/helpers/trigger_no_broker_test_helper.go b/test/e2e/helpers/trigger_no_broker_test_helper.go index a514fb9c4e5..e25a90166cd 100644 --- a/test/e2e/helpers/trigger_no_broker_test_helper.go +++ b/test/e2e/helpers/trigger_no_broker_test_helper.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" testlib "knative.dev/eventing/test/lib" + "knative.dev/eventing/test/lib/recordevents" "knative.dev/eventing/test/lib/resources" ) @@ -41,8 +42,8 @@ func TestTriggerNoBroker(ctx context.Context, t *testing.T, channel string, brok brokerName := strings.ToLower(channel) subscriberName := "dumper-empty" - eventRecordPod := resources.EventRecordPod(subscriberName) - client.CreatePodOrFail(eventRecordPod, testlib.WithService(subscriberName)) + recordevents.DeployEventRecordOrFail(context.TODO(), client, subscriberName) + client.CreateTriggerOrFailV1Beta1("testtrigger", resources.WithSubscriberServiceRefForTriggerV1Beta1(subscriberName), resources.WithBrokerV1Beta1(brokerName), diff --git a/test/lib/client.go b/test/lib/client.go index 1eaef6ececd..d1c5782cc0f 100644 --- a/test/lib/client.go +++ b/test/lib/client.go @@ -48,6 +48,8 @@ type Client struct { Dynamic dynamic.Interface Config *rest.Config + EventListener *EventListener + Namespace string T *testing.T Tracker *Tracker @@ -93,6 +95,10 @@ func NewClient(configPath string, clusterName string, namespace string, t *testi client.T = t client.Tracker = NewTracker(t, client.Dynamic) + // Start informer + client.EventListener = NewEventListener(client.Kube.Kube, client.Namespace, client.T.Logf) + client.Cleanup(client.EventListener.Stop) + client.tracingEnv, err = getTracingConfig(client.Kube.Kube) if err != nil { return nil, err diff --git a/test/lib/k8s_events.go b/test/lib/k8s_events.go new file mode 100644 index 00000000000..1188d5c234f --- /dev/null +++ b/test/lib/k8s_events.go @@ -0,0 +1,84 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package lib + +import ( + "context" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +// EventHandler is the callback type for the EventListener +type EventHandler func(event *corev1.Event) + +// EventListener is a type that broadcasts new k8s events +type EventListener struct { + cancel context.CancelFunc + + lock sync.Mutex + handlers []EventHandler +} + +// NewEventListener creates a new event listener +func NewEventListener(client kubernetes.Interface, namespace string, logf func(string, ...interface{})) *EventListener { + ctx, cancelCtx := context.WithCancel(context.Background()) + informerFactory := informers.NewSharedInformerFactoryWithOptions( + client, + 0, + informers.WithNamespace(namespace), + ) + eventsInformer := informerFactory.Core().V1().Events().Informer() + + el := EventListener{ + cancel: cancelCtx, + } + + eventsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + el.handle(obj.(*corev1.Event)) + }, + }) + + go func() { + eventsInformer.Run(ctx.Done()) + logf("EventListener stopped") + }() + + return &el +} + +func (el *EventListener) handle(event *corev1.Event) { + el.lock.Lock() + defer el.lock.Unlock() + for _, handler := range el.handlers { + handler(event) + } +} + +func (el *EventListener) AddHandler(handler EventHandler) { + el.lock.Lock() + defer el.lock.Unlock() + el.handlers = append(el.handlers, handler) +} + +func (el *EventListener) Stop() { + el.cancel() +} diff --git a/test/lib/recordevents/event_info.go b/test/lib/recordevents/event_info.go index dc0ffd1f0b6..d9dbbdae819 100644 --- a/test/lib/recordevents/event_info.go +++ b/test/lib/recordevents/event_info.go @@ -17,56 +17,31 @@ limitations under the License. package recordevents import ( - "context" - "encoding/json" "fmt" - "io/ioutil" - "math/rand" - "net/http" "strings" "time" cloudevents "github.com/cloudevents/sdk-go/v2" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "knative.dev/pkg/test/monitoring" - - "knative.dev/pkg/test/logging" - - testlib "knative.dev/eventing/test/lib" ) -// Port for the recordevents pod REST listener -const RecordEventsPort = 8392 - -// HTTP path for the GetMinMax REST call -const GetMinMaxPath = "/minmax" - -// HTTP path for the GetEntry REST call -const GetEntryPath = "/entry/" - -// HTTP path for the TrimThrough REST call -const TrimThroughPath = "/trimthrough/" - -// On-wire json rest api format for recordevents GetMinMax calls -// sennt to the recordevents pod. -type MinMaxResponse struct { - MinAvail int - MaxSeen int -} +const ( + // EventReason is the Kubernetes event reason used for observed events. + CloudEventObservedReason = "CloudEventObserved" +) // Structure to hold information about an event seen by recordevents pod. type EventInfo struct { // Set if the http request received by the pod couldn't be decoded or // didn't pass validation - Error string + Error string `json:"error,omitempty"` // Event received if the cloudevent received by the pod passed validation - Event *cloudevents.Event + Event *cloudevents.Event `json:"event,omitempty"` // HTTPHeaders of the connection that delivered the event - HTTPHeaders map[string][]string + HTTPHeaders map[string][]string `json:"httpHeaders,omitempty"` + Origin string `json:"origin,omitempty"` + Observer string `json:"observer,omitempty"` + Time time.Time `json:"time,omitempty"` + Sequence uint64 `json:"sequence"` } // Pretty print the event. Meant for debugging. This formats the validation error @@ -95,189 +70,3 @@ func (s *SearchedInfo) String() string { } return sb.String() } - -// Connection state for a REST connection to a pod -type eventGetter struct { - podName string - podNamespace string - podPort int - kubeClientset kubernetes.Interface - logf logging.FormatLogger - - host string - port int - forwardPID int -} - -// Creates a forwarded port to the specified recordevents pod and waits until -// it can successfully talk to the REST API. Times out after timeoutEvRetry -func newEventGetter(podName string, client *testlib.Client, logf logging.FormatLogger) (eventGetterInterface, error) { - egi := &eventGetter{podName: podName, podNamespace: client.Namespace, - kubeClientset: client.Kube.Kube, podPort: RecordEventsPort, logf: logf} - err := egi.forwardPort() - if err != nil { - return nil, err - } - - err = egi.waitTillUp() - if err != nil { - return nil, err - } - return egi, nil -} - -// Get information about the provided podName. Uses list (rather than get) and -// returns a pod list for compatibility with the monitoring.PortForward -// interface -func (eg *eventGetter) getRunningPodInfo(podName, namespace string) (*v1.PodList, error) { - pods, err := eg.kubeClientset.CoreV1().Pods(namespace).List(context.Background(), - metav1.ListOptions{FieldSelector: fmt.Sprintf("metadata.name=%s", podName)}) - if err == nil && len(pods.Items) != 1 { - err = fmt.Errorf("no %s Pod found on the cluster", podName) - } else if pods.Items[0].Status.Phase != corev1.PodRunning { - err = fmt.Errorf("pod %s in state %s, wanted Running", podName, - pods.Items[0].Status.Phase) - } - - return pods, err -} - -// Try to forward the pod port to a local port somewhere in the range 30000-60000. -// keeps retrying with random ports in that range, timing out after timeoutEvRetry -func (eg *eventGetter) forwardPort() error { - portRand := rand.New(rand.NewSource(time.Now().UnixNano())) - portMin := 30000 - portMax := 60000 - var internalErr error - - wait.PollImmediate(minEvRetryInterval, timeoutEvRetry, func() (bool, error) { - localPort := portMin + portRand.Intn(portMax-portMin) - if err := monitoring.CheckPortAvailability(localPort); err != nil { - internalErr = err - return false, nil - } - pods, err := eg.getRunningPodInfo(eg.podName, eg.podNamespace) - if err != nil { - internalErr = err - return false, nil - } - - pid, err := monitoring.PortForward(eg.logf, pods, localPort, eg.podPort, eg.podNamespace) - if err != nil { - internalErr = err - return false, nil - } - internalErr = nil - - eg.forwardPID = pid - eg.port = localPort - eg.host = "localhost" - return true, nil - }) - if internalErr != nil { - return fmt.Errorf("timeout forwarding port: %v", internalErr) - } - return nil -} - -// Return the min available, max seen by the recordevents pod. -// maxRet is the largest event that has ever been seen (whether it's been trimmed -// or not). minRet is the smallest event still available via Get, or 1+maxRet if -// no events are available. maxRet starts at 0 when no events have been seen. -func (eg *eventGetter) getMinMax() (minRet int, maxRet int, errRet error) { - resp, err := http.Get(fmt.Sprintf("http://%s:%d%s", eg.host, eg.port, GetMinMaxPath)) - if err != nil { - return -1, -1, fmt.Errorf("http get error: %v", err) - } - defer resp.Body.Close() - bodyContents, err := ioutil.ReadAll(resp.Body) - if err != nil { - return -1, -1, fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return -1, -1, fmt.Errorf("error %d reading GetMinMax response", resp.StatusCode) - } - minMaxResponse := MinMaxResponse{} - err = json.Unmarshal(bodyContents, &minMaxResponse) - if err != nil { - return -1, -1, fmt.Errorf("error unmarshalling response %w", err) - } - if minMaxResponse.MinAvail == 0 { - return -1, -1, fmt.Errorf("invalid decoded json: %+v", minMaxResponse) - } - - return minMaxResponse.MinAvail, minMaxResponse.MaxSeen, nil -} - -// Return the event with the provided sequence number. Returns the appropriate -// EventInfo or an error if no such event is known, or the event has already -// been trimmed. -func (eg *eventGetter) getEntry(seqno int) (EventInfo, error) { - resp, err := http.Get(fmt.Sprintf("http://%s:%d%s/%d", eg.host, eg.port, GetEntryPath, seqno)) - if err != nil { - return EventInfo{}, fmt.Errorf("http get err %v", err) - } - defer resp.Body.Close() - bodyContents, err := ioutil.ReadAll(resp.Body) - if err != nil { - return EventInfo{}, fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return EventInfo{}, fmt.Errorf("error %d reading GetEntry response", resp.StatusCode) - } - entryResponse := EventInfo{} - err = json.Unmarshal(bodyContents, &entryResponse) - if err != nil { - return EventInfo{}, fmt.Errorf("error unmarshalling response %w", err) - } - if len(entryResponse.Error) == 0 && entryResponse.Event == nil { - return EventInfo{}, fmt.Errorf("invalid decoded json: %+v", entryResponse) - } - - return entryResponse, nil -} - -// Trim the events up to and including seqno from the recordevents pod. -// Returns an error if a nonsensical seqno is passed in, but does not return -// error for trimming already trimmed regions. -func (eg *eventGetter) trimThrough(seqno int) error { - resp, err := http.Post(fmt.Sprintf("http://%s:%d%s/%d", eg.host, eg.port, TrimThroughPath, seqno), "", nil) - if err != nil { - return fmt.Errorf("http post err %v", err) - } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) - if err != nil { - return fmt.Errorf("error reading response body %w", err) - } - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("error %d reading TrimThrough response: %s", resp.StatusCode, string(body)) - } - - return nil -} - -// Clean up the getter by tearing down the port forward. -func (eg *eventGetter) cleanup() { - pid := eg.forwardPID - eg.forwardPID = 0 - if pid != 0 { - monitoring.Cleanup(pid) - } -} - -// Wait (up to timeoutEvRetry) for the pod to RestAPI to answer request. -func (eg *eventGetter) waitTillUp() error { - var internalErr error - wait.PollImmediate(minEvRetryInterval, timeoutEvRetry, func() (bool, error) { - _, _, internalErr = eg.getMinMax() - if internalErr != nil { - return false, nil - } - return true, nil - }) - if internalErr != nil { - return fmt.Errorf("timeout waiting for recordevents pod to come up: %v", internalErr) - } - return nil -} diff --git a/test/lib/recordevents/event_info_store.go b/test/lib/recordevents/event_info_store.go index 52fcbae3cdb..ba80a19c255 100644 --- a/test/lib/recordevents/event_info_store.go +++ b/test/lib/recordevents/event_info_store.go @@ -18,6 +18,7 @@ package recordevents import ( "context" + "encoding/json" "fmt" "strconv" "strings" @@ -27,6 +28,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" + rbacv1 "k8s.io/api/rbac/v1" "k8s.io/apimachinery/pkg/util/wait" pkgTest "knative.dev/pkg/test" @@ -36,169 +38,106 @@ import ( const ( // The interval and timeout used for checking events - minEvRetryInterval = 4 * time.Second - timeoutEvRetry = 4 * time.Minute + retryInterval = 4 * time.Second + retryTimeout = 4 * time.Minute ) -// Stateful store of events received by the recordevents pod it is pointed at. -// This pulls events from the pod during any Find or Wait call, storing them -// locally and triming them from the remote pod store. -type EventInfoStore struct { - tb testing.TB - - podName string - getter eventGetterInterface - - lock sync.Mutex - allEvents []EventInfo - firstID int - closeCh chan struct{} - doRefresh chan chan error - timeout time.Duration - retryInterval time.Duration -} - -// Functions used for getting data from the REST api of the recordevents pod. -// The interface exists for use with unit tests of this module. -type eventGetterInterface interface { - getMinMax() (minRet int, maxRet int, errRet error) - getEntry(seqno int) (EventInfo, error) - trimThrough(seqno int) error - cleanup() -} - -// Internal function to create an event store. This is called directly by unit tests of -// this module. -func newTestableEventInfoStore(egi eventGetterInterface, retryInterval time.Duration, - timeout time.Duration) *EventInfoStore { - if timeout == -1 { - timeout = timeoutEvRetry - } - if retryInterval == -1 { - retryInterval = minEvRetryInterval - } - ei := &EventInfoStore{getter: egi, firstID: 1, timeout: timeout, retryInterval: retryInterval} - ei.start() - return ei -} +type EventRecordOption = func(*corev1.Pod, *testlib.Client) error -// Creates an EventInfoStore that is used to iteratively download events recorded by the -// recordevents pod. Calling this forwards the recordevents port to the local machine -// and blocks waiting to connect to that pod. Fails if it cannot connect within -// the expected timeout (4 minutes currently) -func NewEventInfoStore(client *testlib.Client, podName string) (*EventInfoStore, error) { - egi, err := newEventGetter(podName, client, client.T.Logf) +func DeployEventRecordOrFail(ctx context.Context, client *testlib.Client, name string, options ...EventRecordOption) *corev1.Pod { + client.CreateServiceAccountOrFail(name) + client.CreateRoleOrFail(resources.Role(name, + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"pods"}, + Verbs: []string{"get"}, + }), + resources.WithRuleForRole(&rbacv1.PolicyRule{ + APIGroups: []string{""}, + Resources: []string{"events"}, + Verbs: []string{rbacv1.VerbAll}, + }), + )) + client.CreateRoleBindingOrFail(name, "Role", name, name, client.Namespace) + + eventRecordPod := EventRecordPod(name, name) + client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(name))...) + err := pkgTest.WaitForPodRunning(ctx, client.Kube, name, client.Namespace) if err != nil { - return nil, err + client.T.Fatalf("Failed to start the recordevent pod '%s': %v", name, errors.WithStack(err)) } - ei := newTestableEventInfoStore(egi, -1, -1) - ei.podName = podName - ei.tb = client.T - client.Cleanup(ei.cleanup) - return ei, nil + client.WaitForServiceEndpointsOrFail(ctx, name, 1) + return eventRecordPod } -type EventRecordOption = func(*corev1.Pod, *testlib.Client) error - // Deploys a new recordevents pod and start the associated EventInfoStore func StartEventRecordOrFail(ctx context.Context, client *testlib.Client, podName string, options ...EventRecordOption) (*EventInfoStore, *corev1.Pod) { - eventRecordPod := resources.EventRecordPod(podName) - client.CreatePodOrFail(eventRecordPod, append(options, testlib.WithService(podName))...) - err := pkgTest.WaitForPodRunning(ctx, client.Kube, podName, client.Namespace) - if err != nil { - client.T.Fatalf("Failed to start the recordevent pod '%s': %v", podName, errors.WithStack(err)) - } - client.WaitForServiceEndpointsOrFail(ctx, podName, 1) + eventRecordPod := DeployEventRecordOrFail(ctx, client, podName, options...) - eventTracker, err := NewEventInfoStore(client, podName) + eventTracker, err := NewEventInfoStore(client, podName, client.Namespace) if err != nil { client.T.Fatalf("Failed to start the EventInfoStore associated to pod '%s': %v", podName, err) } return eventTracker, eventRecordPod } -// Starts the single threaded background goroutine used to update local state -// from the remote REST API. -func (ei *EventInfoStore) start() { - ei.closeCh = make(chan struct{}) - ei.doRefresh = make(chan chan error) - go func() { - for { - select { - case <-ei.closeCh: - ei.getter.cleanup() - return - case replyCh := <-ei.doRefresh: - replyCh <- ei.doRetrieveData() - } - } - }() +// Stateful store of events received by the recordevents pod it is pointed at. +// This pulls events from the pod during any Find or Wait call, storing them +// locally and triming them from the remote pod store. +type EventInfoStore struct { + tb testing.TB + + podName string + podNamespace string + + lock sync.Mutex + collected []EventInfo } -// The data update thread used by the single threaded background goroutine -// for updating data from the REST api. -func (ei *EventInfoStore) doRetrieveData() error { - min, max, err := ei.getter.getMinMax() - if err != nil { - return fmt.Errorf("error getting MinMax %v", err) - } - ei.lock.Lock() - curMin := ei.firstID - curMax := curMin + len(ei.allEvents) - 1 - ei.lock.Unlock() - if min == max+1 { - // Nothing to read or trim - return nil - } else { - if min > curMax+1 { - return fmt.Errorf("mismatched stored max/available min: %d, %d", curMax, min) - } - min = curMax + 1 - // We may have data to read, definitely have data to trim. +// Creates an EventInfoStore that is used to iteratively download events recorded by the +// recordevents pod. +func NewEventInfoStore(client *testlib.Client, podName string, podNamespace string) (*EventInfoStore, error) { + store := &EventInfoStore{ + tb: client.T, + podName: podName, + podNamespace: podNamespace, } - var newEvents []EventInfo - for i := min; i <= max; i++ { - e, err := ei.getter.getEntry(i) - if err != nil { - return fmt.Errorf("error calling getEntry of %d %v", i, err) - } - newEvents = append(newEvents, e) - } - ei.lock.Lock() - ei.allEvents = append(ei.allEvents, newEvents...) - ei.lock.Unlock() - err = ei.getter.trimThrough(max) - return err + client.EventListener.AddHandler(store.handle) + return store, nil } -// Clean up any background resources used by the store. Must be called exactly once after -// the last use. -func (ei *EventInfoStore) cleanup() { - close(ei.closeCh) +func (ei *EventInfoStore) getEventInfo() []EventInfo { + ei.lock.Lock() + defer ei.lock.Unlock() + return ei.collected } -//TODO remove it, this is not useful anymore -// Deprecated: you can remove the manual cleanup of the event getter, since now it's done at test tear down automatically -func (ei *EventInfoStore) Cleanup() {} - -// Called internally by functions wanting the current list of all -// known events. This calls for an update from the REST server and -// returns the summary of all locally and remotely known events. -// Returns an error in case of a connection or protocol error. -func (ei *EventInfoStore) refreshData() ([]EventInfo, error) { - var allEvents []EventInfo - replyCh := make(chan error) - ei.doRefresh <- replyCh - err := <-replyCh +func (ei *EventInfoStore) handle(event *corev1.Event) { + // Filter events + if !ei.isMyEvent(event) { + return + } + + eventInfo := EventInfo{} + err := json.Unmarshal([]byte(event.Message), &eventInfo) if err != nil { - return nil, err + ei.tb.Errorf("Received EventInfo that cannot be unmarshalled! %+v", err) + return } + ei.lock.Lock() - allEvents = append(allEvents, ei.allEvents...) - ei.lock.Unlock() - return allEvents, nil + defer ei.lock.Unlock() + ei.collected = append(ei.collected, eventInfo) +} + +func (ei *EventInfoStore) isMyEvent(event *corev1.Event) bool { + return event.Type == corev1.EventTypeNormal && + event.Reason == CloudEventObservedReason && + event.InvolvedObject.Kind == "Pod" && + event.InvolvedObject.Name == ei.podName && + event.InvolvedObject.Namespace == ei.podNamespace } // Find all events received by the recordevents pod that match the provided matchers, @@ -216,10 +155,7 @@ func (ei *EventInfoStore) Find(matchers ...EventInfoMatcher) ([]EventInfo, Searc lastEvents := []EventInfo{} var nonMatchingErrors []error - allEvents, err := ei.refreshData() - if err != nil { - return nil, sInfo, nonMatchingErrors, fmt.Errorf("error getting events %v", err) - } + allEvents := ei.getEventInfo() for i := range allEvents { if err := f(allEvents[i]); err == nil { allMatch = append(allMatch, allEvents[i]) @@ -290,7 +226,7 @@ func (ei *EventInfoStore) waitAtLeastNMatch(f EventInfoMatcher, min int) ([]Even var matchRet []EventInfo var internalErr error - wait.PollImmediate(ei.retryInterval, ei.timeout, func() (bool, error) { + wait.PollImmediate(retryInterval, retryTimeout, func() (bool, error) { allMatch, sInfo, matchErrs, err := ei.Find(f) if err != nil { internalErr = fmt.Errorf("FAIL MATCHING: unexpected error during find: %v", err) diff --git a/test/lib/recordevents/event_info_store_test.go b/test/lib/recordevents/event_info_store_test.go deleted file mode 100644 index 52f09c76fb8..00000000000 --- a/test/lib/recordevents/event_info_store_test.go +++ /dev/null @@ -1,279 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package recordevents - -import ( - "fmt" - "strconv" - "sync" - "testing" - "time" - - cloudevents "github.com/cloudevents/sdk-go/v2" -) - -type dummyEventGet struct { - lock sync.Mutex - - minSeq int - allEv []EventInfo - clean bool - tCalls int -} - -func newDummyEventGet() *dummyEventGet { - return &dummyEventGet{minSeq: 1} -} - -func (deg *dummyEventGet) getMinMax() (minRet int, maxRet int, errRet error) { - deg.lock.Lock() - defer deg.lock.Unlock() - if deg.clean { - panic("getminmax called on cleaned event getter") - } - min := deg.minSeq - max := min + len(deg.allEv) - 1 - return min, max, nil -} -func (deg *dummyEventGet) getEntry(seqno int) (EventInfo, error) { - deg.lock.Lock() - defer deg.lock.Unlock() - if deg.clean { - panic("getentry called on cleaned event getter") - } - min := deg.minSeq - max := min + len(deg.allEv) - 1 - if seqno >= min && seqno <= max { - return deg.allEv[seqno-deg.minSeq], nil - } else { - return EventInfo{}, fmt.Errorf("illegal get seqno: %d, range min = %d, max = %d", seqno, min, max) - } -} -func (deg *dummyEventGet) trimThrough(seqno int) error { - deg.lock.Lock() - defer deg.lock.Unlock() - deg.tCalls++ - if deg.clean { - panic("trim called on cleaned event getter") - } - min := deg.minSeq - max := min + len(deg.allEv) - 1 - if seqno < 0 { - return fmt.Errorf("Illegal negative seqno %d", seqno) - } else if seqno > max { - return fmt.Errorf("Illegal negative seqno %d > %d", seqno, max) - } else if seqno < min { - return nil - } - deg.allEv = deg.allEv[seqno-min+1:] - deg.minSeq = seqno + 1 - return nil -} -func (deg *dummyEventGet) trimCalls() int { - deg.lock.Lock() - defer deg.lock.Unlock() - return deg.tCalls -} -func (deg *dummyEventGet) cleanup() { - deg.lock.Lock() - defer deg.lock.Unlock() - if deg.clean { - panic("Unexpected clean") - } else { - deg.clean = true - } -} - -func makeEvents() []EventInfo { - var allEv []EventInfo - for i := 0; i < 30; i++ { - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - ce.SetSource("https://source.test.event.knative.dev/foo") - ce.SetID(strconv.FormatInt(int64(i), 10)) - allEv = append(allEv, EventInfo{Event: &ce}) - } - return allEv -} - -func checkEvIDEqual(t *testing.T, seen []EventInfo, expected []EventInfo) { - if len(seen) != len(expected) { - t.Fatalf("Seen ev list length %d does not match expected %d", len(seen), len(expected)) - - } - for i := range seen { - if seen[i].Event.ID() != expected[i].Event.ID() { - t.Fatalf("Id entry %d mismatch: seen %s != expected %s", - i, seen[i].Event.ID(), expected[i].Event.ID()) - } - } -} - -func (deg *dummyEventGet) setEv(firstID int, allEv []EventInfo) { - deg.lock.Lock() - defer deg.lock.Unlock() - deg.minSeq = firstID - deg.allEv = allEv -} - -// Test that Finds where the server gives sequential updates that -// don't overlap see the expected events. -func TestSequentialAndTrim(t *testing.T) { - totalEv := makeEvents() - expectedFull := makeEvents() - deg := newDummyEventGet() - subEv := totalEv[:10] - deg.setEv(1, subEv) - ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:10]) - min, _, _ := deg.getMinMax() - if min != 11 { - t.Fatalf("Expected trim to have moved min to %d, saw %d", 11, min) - } - - subEv = totalEv[10:19] - deg.setEv(11, subEv) - - allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:19]) - ei.cleanup() -} - -// Test that Finds where the server gives overlapping updates -// see the expected events (no double events) -func TestOverlap(t *testing.T) { - totalEv := makeEvents() - expectedFull := makeEvents() - deg := newDummyEventGet() - subEv := totalEv[:10] - deg.setEv(1, subEv) - ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:10]) - subEv = totalEv[6:19] - deg.setEv(7, subEv) - allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:19]) - ei.cleanup() -} - -// Test that we see an error if repeated Finds see a gap in the sequence -// space -func TestGap(t *testing.T) { - totalEv := makeEvents() - expectedFull := makeEvents() - deg := newDummyEventGet() - subEv := totalEv[:10] - deg.setEv(1, subEv) - ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:10]) - subEv = totalEv[11:19] - deg.setEv(12, subEv) - _, _, _, err = ei.Find(func(EventInfo) error { return nil }) - if err == nil { - t.Fatalf("Unexpected success from find") - } - ei.cleanup() -} - -// Test that two finds, with the second one having -// no new events, returns the right events. -func TestSequentialNoOp(t *testing.T) { - totalEv := makeEvents() - expectedFull := makeEvents() - deg := newDummyEventGet() - subEv := totalEv[:10] - deg.setEv(1, subEv) - ei := newTestableEventInfoStore(deg, -1, -1) - allData, _, _, err := ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:10]) - subEv = []EventInfo{} - deg.setEv(11, subEv) - allData, _, _, err = ei.Find(func(EventInfo) error { return nil }) - if err != nil { - t.Fatalf("Unexpected error from find: %v", err) - } - checkEvIDEqual(t, allData, expectedFull[:10]) - ei.cleanup() -} - -// Test that wait for N Works -func TestWaitForN(t *testing.T) { - deg := newDummyEventGet() - subEv := makeEvents()[:10] - deg.setEv(1, subEv) - ei := newTestableEventInfoStore(deg, time.Second/8, -1) - var wg sync.WaitGroup - wg.Add(1) - var waitErr error - var allMatch []EventInfo - go func() { - matchFunc := func(ev cloudevents.Event) error { - if ev.ID() == "3" { - return nil - } else { - return fmt.Errorf("mismatch %s %s", ev.ID(), "3") - } - } - - allMatch, waitErr = ei.waitAtLeastNMatch(MatchEvent(matchFunc), 2) - wg.Done() - }() - var tCalls int - for tCalls == 0 { - time.Sleep(time.Second / 100) - tCalls = deg.trimCalls() - } - subEv = makeEvents()[:10] - deg.setEv(11, subEv) - - wg.Wait() - if waitErr != nil { - t.Fatalf("Unexpected error from find: %v", waitErr) - } - if len(allMatch) != 2 { - t.Fatalf("Unexpected match length: %d != %d", len(allMatch), 2) - } - if allMatch[0].Event.ID() != "3" || allMatch[1].Event.ID() != "3" { - t.Fatalf("Unexpected IDs %s, %s, expected both %s", allMatch[0].Event.ID(), allMatch[1].Event.ID(), "3") - } - tCalls = deg.trimCalls() - if tCalls < 2 { - t.Fatalf("Expected at least %d trim calls, saw %d", 2, tCalls) - } - ei.cleanup() -} diff --git a/test/lib/recordevents/event_log.go b/test/lib/recordevents/event_log.go new file mode 100644 index 00000000000..62aba6113b4 --- /dev/null +++ b/test/lib/recordevents/event_log.go @@ -0,0 +1,33 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recordevents + +// EventLog is the contract for an event logger to vent an event. +type EventLog interface { + Vent(observed EventInfo) error +} + +type EventLogs []EventLog + +func (e EventLogs) Vent(observed EventInfo) error { + for _, el := range e { + if err := el.Vent(observed); err != nil { + return err + } + } + return nil +} diff --git a/test/lib/recordevents/observer/observer.go b/test/lib/recordevents/observer/observer.go new file mode 100644 index 00000000000..e9bd4224aac --- /dev/null +++ b/test/lib/recordevents/observer/observer.go @@ -0,0 +1,116 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package observer + +import ( + "context" + "net/http" + "sync/atomic" + "time" + + cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" + cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" + "github.com/kelseyhightower/envconfig" + "github.com/prometheus/common/log" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +// Observer is the entry point for sinking events into the event log. +type Observer struct { + // Name is the name of this Observer, used to filter if multiple observers. + Name string + // EventLogs is the list of EventLog implementors to vent observed events. + EventLogs recordevents.EventLogs + + seq uint64 +} + +// New returns an observer that will vent observations to the list of provided +// EventLog instances. It will listen on :8080. +func New(name string, eventLogs ...recordevents.EventLog) *Observer { + return &Observer{ + Name: name, + EventLogs: eventLogs, + } +} + +type envConfig struct { + // ObserverName is used to identify this instance of the observer. + ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"` +} + +func NewFromEnv(eventLogs ...recordevents.EventLog) *Observer { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatal("Failed to process env var", err) + } + + return New(env.ObserverName, eventLogs...) +} + +// Start will create the CloudEvents client and start listening for inbound +// HTTP requests. This is a is a blocking call. +func (o *Observer) Start(ctx context.Context, handlerFuncs ...func(handler http.Handler) http.Handler) error { + var handler http.Handler = o + + for _, dec := range handlerFuncs { + handler = dec(handler) + } + + server := &http.Server{Addr: ":8080", Handler: handler} + + go func() { + if err := server.ListenAndServe(); err != nil { + logging.FromContext(ctx).Fatal("Error while starting the HTTP server", err) + } + }() + + <-ctx.Done() + + logging.FromContext(ctx).Info("Closing the HTTP server") + + return server.Close() +} + +func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + m := cloudeventshttp.NewMessageFromHttpRequest(request) + defer m.Finish(nil) + + event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) + header := request.Header + + eventErrStr := "" + if eventErr != nil { + eventErrStr = eventErr.Error() + } + err := o.EventLogs.Vent(recordevents.EventInfo{ + Error: eventErrStr, + Event: event, + HTTPHeaders: header, + Origin: request.RemoteAddr, + Observer: o.Name, + Time: time.Now(), + Sequence: atomic.AddUint64(&o.seq, 1), + }) + if err != nil { + log.Warn("Error while venting the recorded event", err) + } + + writer.WriteHeader(http.StatusAccepted) +} diff --git a/test/lib/recordevents/recorder_vent/constructor.go b/test/lib/recordevents/recorder_vent/constructor.go new file mode 100644 index 00000000000..cc73a9671f0 --- /dev/null +++ b/test/lib/recordevents/recorder_vent/constructor.go @@ -0,0 +1,108 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recorder_vent + +import ( + "context" + "log" + "strings" + + "github.com/kelseyhightower/envconfig" + "knative.dev/pkg/system" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes/scheme" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/controller" + "knative.dev/pkg/logging" + + "knative.dev/eventing/test/lib/recordevents" +) + +type envConfig struct { + AgentName string `envconfig:"AGENT_NAME" default:"observer-default" required:"true"` + PodName string `envconfig:"POD_NAME" required:"true"` + Port int `envconfig:"PORT" default:"8080" required:"true"` +} + +func NewFromEnv(ctx context.Context) recordevents.EventLog { + var env envConfig + if err := envconfig.Process("", &env); err != nil { + log.Fatal("Failed to process env var", err) + } + + logging.FromContext(ctx).Infof("Environment configuration: %+v", env) + + return NewEventLog(ctx, env.AgentName, env.PodName) +} + +func NewEventLog(ctx context.Context, agentName string, podName string) recordevents.EventLog { + on, err := kubeclient.Get(ctx).CoreV1().Pods(system.Namespace()).Get(ctx, podName, metav1.GetOptions{}) + if err != nil { + logging.FromContext(ctx).Fatal("Error while trying to retrieve the pod", err) + } + + logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace) + + return &recorder{out: createRecorder(ctx, agentName), on: on} +} + +func createRecorder(ctx context.Context, agentName string) record.EventRecorder { + logger := logging.FromContext(ctx) + + recorder := controller.GetEventRecorder(ctx) + if recorder == nil { + // Create event broadcaster + logger.Debug("Creating event broadcaster") + eventBroadcaster := record.NewBroadcasterWithCorrelatorOptions(record.CorrelatorOptions{ + KeyFunc: func(event *corev1.Event) (aggregateKey string, localKey string) { + return strings.Join([]string{ + event.Source.Component, + event.Source.Host, + event.InvolvedObject.Kind, + event.InvolvedObject.Namespace, + event.InvolvedObject.Name, + string(event.InvolvedObject.UID), + event.InvolvedObject.APIVersion, + event.Type, + event.Reason, + }, ""), string(event.UID) + }, + }) + watches := []watch.Interface{ + eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), + eventBroadcaster.StartRecordingToSink( + &v1.EventSinkImpl{Interface: kubeclient.Get(ctx).CoreV1().Events(system.Namespace())}, + ), + } + recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName}) + go func() { + <-ctx.Done() + for _, w := range watches { + w.Stop() + } + logging.FromContext(ctx).Debug("Closed event-broadcaster") + }() + } + + return recorder +} diff --git a/test/lib/recordevents/recorder_vent/doc.go b/test/lib/recordevents/recorder_vent/doc.go new file mode 100644 index 00000000000..1a4bcc2907f --- /dev/null +++ b/test/lib/recordevents/recorder_vent/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package recorder_vent implements an recordevents.EventLog backed by Kubernetes +// Events using an event recorder. +package recorder_vent diff --git a/test/lib/recordevents/recorder_vent/recorder.go b/test/lib/recordevents/recorder_vent/recorder.go new file mode 100644 index 00000000000..9669741f9b6 --- /dev/null +++ b/test/lib/recordevents/recorder_vent/recorder.go @@ -0,0 +1,43 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recorder_vent + +import ( + "encoding/json" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + + "knative.dev/eventing/test/lib/recordevents" +) + +type recorder struct { + out record.EventRecorder + on runtime.Object +} + +func (r *recorder) Vent(observed recordevents.EventInfo) error { + b, err := json.Marshal(observed) + if err != nil { + return err + } + + r.out.Eventf(r.on, corev1.EventTypeNormal, recordevents.CloudEventObservedReason, "%s", string(b)) + + return nil +} diff --git a/test/lib/recordevents/resources.go b/test/lib/recordevents/resources.go new file mode 100644 index 00000000000..54726bedfdc --- /dev/null +++ b/test/lib/recordevents/resources.go @@ -0,0 +1,61 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package recordevents + +import ( + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "knative.dev/pkg/test" +) + +// EventRecordPod creates a Pod that stores received events for test retrieval. +func EventRecordPod(name string, serviceAccountName string) *corev1.Pod { + return recordEventsPod("recordevents", name, serviceAccountName) +} + +func recordEventsPod(imageName string, name string, serviceAccountName string) *corev1.Pod { + return &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{ + Name: imageName, + Image: test.ImagePath(imageName), + ImagePullPolicy: corev1.PullAlways, + Env: []corev1.EnvVar{{ + Name: "SYSTEM_NAMESPACE", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.namespace"}, + }, + }, { + Name: "OBSERVER", + Value: "recorder-" + name, + }, { + Name: "POD_NAME", + ValueFrom: &corev1.EnvVarSource{ + FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}, + }, + }}, + }}, + ServiceAccountName: serviceAccountName, + RestartPolicy: corev1.RestartPolicyAlways, + }, + } +} diff --git a/test/lib/resources/kube.go b/test/lib/resources/kube.go index 2b154fe2b20..bc415d10873 100644 --- a/test/lib/resources/kube.go +++ b/test/lib/resources/kube.go @@ -37,28 +37,6 @@ type PodOption func(*corev1.Pod) // Option enables further configuration of a Role. type RoleOption func(*rbacv1.Role) -// EventRecordPod creates a Pod that stores received events for test retrieval. -func EventRecordPod(name string) *corev1.Pod { - return eventLoggerPod("recordevents", name) -} - -func eventLoggerPod(imageName string, name string) *corev1.Pod { - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Labels: map[string]string{"e2etest": string(uuid.NewUUID())}, - }, - Spec: corev1.PodSpec{ - Containers: []corev1.Container{{ - Name: imageName, - Image: pkgTest.ImagePath(imageName), - ImagePullPolicy: corev1.PullAlways, - }}, - RestartPolicy: corev1.RestartPolicyAlways, - }, - } -} - // EventTransformationPod creates a Pod that transforms events received receiving as arg a cloudevents sdk2 Event func EventTransformationPod(name string, newEventType string, newEventSource string, newEventData []byte) *corev1.Pod { const imageName = "transformevents" diff --git a/test/test_images/recordevents/eventstore.go b/test/test_images/recordevents/eventstore.go deleted file mode 100644 index 470e5adc21d..00000000000 --- a/test/test_images/recordevents/eventstore.go +++ /dev/null @@ -1,191 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - https://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "encoding/json" - "fmt" - "sync" - - cloudevents "github.com/cloudevents/sdk-go/v2" - - "knative.dev/eventing/test/lib/recordevents" -) - -// Number of EventInfo per block -const evBlockSize = 100 - -// Block of stored EventInfo -type eventBlock struct { - // seqno of [0] evInfoBytes entry - firstIndex int - // offset inside block for newly appended event - firstOffsetFree int - // offset inside block of first non-trimmed event - firstValid int - // serialized EventInfo structures for each seqno. We enforce - // that there is always at least one block. - evInfoBytes [evBlockSize][]byte -} - -// All events currently seen -type eventStore struct { - // Blocks of events in increasing sequency number order - evBlocks []*eventBlock - evBlocksLock sync.Mutex -} - -// Create a new event store. -func newEventStore() *eventStore { - es := &eventStore{} - es.evBlocks = []*eventBlock{{}} - - // One block with no entries starting at sequence number 1 - es.evBlocks[0].firstIndex = 1 - es.evBlocks[0].firstOffsetFree = 0 - es.evBlocks[0].firstValid = 0 - return es -} - -// See if there's enough room to append to the current last block. If not, -// append an extra block. -func (es *eventStore) checkAppendBlock() { - if es.evBlocks[len(es.evBlocks)-1].firstOffsetFree == evBlockSize { - newEVBlock := &eventBlock{ - firstIndex: es.evBlocks[len(es.evBlocks)-1].firstIndex + evBlockSize, - } - es.evBlocks = append(es.evBlocks, newEVBlock) - } -} - -// Store the specified event. -func (es *eventStore) StoreEvent(event *cloudevents.Event, evErr error, httpHeaders map[string][]string) { - var evInfo recordevents.EventInfo - var err error - var evInfoBytes []byte - if evErr != nil { - evInfo.HTTPHeaders = httpHeaders - evInfo.Error = evErr.Error() - if evInfo.Error == "" { - evInfo.Error = "Unknown Incoming Error" - } - evInfoBytes, err = json.Marshal(&evInfo) - if err != nil { - panic(fmt.Errorf("unexpected marshal error (%v) (%+v)", err, evInfo)) - } - } else { - evInfo.Event = event - evInfo.HTTPHeaders = httpHeaders - evInfoBytes, err = json.Marshal(&evInfo) - - if err != nil { - evInfo.Event = nil - evInfo.Error = err.Error() - if evInfo.Error == "" { - evInfo.Error = "Unknown Error" - } - evInfoBytes, err = json.Marshal(&evInfo) - if err != nil { - panic(fmt.Errorf("unexpected marshal error (%v) (%+v)", err, evInfo)) - } - } - } - - es.evBlocksLock.Lock() - // Add a new block if we're out of space - es.checkAppendBlock() - - evBlock := es.evBlocks[len(es.evBlocks)-1] - if evBlock.firstOffsetFree < evBlockSize { - evBlock.evInfoBytes[evBlock.firstOffsetFree] = evInfoBytes - evBlock.firstOffsetFree++ - } - - es.evBlocksLock.Unlock() -} - -// Logically trim all events up to and include the provided -// sequence number. Returns error for patently incorrect -// values (negative sequence number or sequence number larger -// than the largest event seen). Trimming already trimmed -// regions is legal. -func (es *eventStore) TrimThrough(through int) error { - es.evBlocksLock.Lock() - defer es.evBlocksLock.Unlock() - minAvail, maxSeen := es.minMaxUnlocked() - if through > maxSeen { - return fmt.Errorf("invalid trim through %d, maxSeen %d", through, maxSeen) - } else if through < 0 { - return fmt.Errorf("invalid trim less than zero %d", through) - } else if through < minAvail { - return nil - } - // Completely remove blocks if they are full and all events in them are less than - // the specified value. - for len(es.evBlocks) > 1 && (es.evBlocks[0].firstIndex+evBlockSize-1) <= through { - es.evBlocks = es.evBlocks[1:] - } - // Logically trim the block split by through. - es.evBlocks[0].firstValid = through - es.evBlocks[0].firstIndex + 1 - return nil -} - -// return min/max untrimmed value when already holding the lock -func (es *eventStore) minMaxUnlocked() (minAvail int, maxSeen int) { - minBlock := es.evBlocks[0] - minAvail = minBlock.firstIndex + (minBlock.firstValid) - - maxBlock := es.evBlocks[len(es.evBlocks)-1] - maxSeen = maxBlock.firstIndex + maxBlock.firstOffsetFree - 1 - return minAvail, maxSeen -} - -// Returns min available value and max seen value for the store. -// min is the minimum value that can be retrieved via GetEntry, or -// if no values can be retrieved, min == max+1. Max starts at 0 -// when no events have been seen. -func (es *eventStore) MinMax() (minAvail int, maxSeen int) { - es.evBlocksLock.Lock() - minAvail, maxSeen = es.minMaxUnlocked() - - es.evBlocksLock.Unlock() - return minAvail, maxSeen -} - -// Get the already serialized EventInfo structure for the provided sequence -// number. -func (es *eventStore) GetEventInfoBytes(seq int) ([]byte, error) { - var evInfoBytes []byte - found := false - - es.evBlocksLock.Lock() - for _, block := range es.evBlocks { - if seq < block.firstIndex+block.firstValid { - break - } - if seq < block.firstIndex+block.firstOffsetFree { - found = true - evInfoBytes = block.evInfoBytes[seq-block.firstIndex] - break - } - } - es.evBlocksLock.Unlock() - if !found { - return evInfoBytes, fmt.Errorf("Invalid sequence number %d", seq) - } - return evInfoBytes, nil -} diff --git a/test/test_images/recordevents/eventstore_test.go b/test/test_images/recordevents/eventstore_test.go deleted file mode 100644 index 4642fe7b02a..00000000000 --- a/test/test_images/recordevents/eventstore_test.go +++ /dev/null @@ -1,339 +0,0 @@ -/* -Copyright 2020 The Knative Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "encoding/json" - "fmt" - "strconv" - "testing" - - cloudevents "github.com/cloudevents/sdk-go/v2" - - "knative.dev/eventing/test/lib/recordevents" -) - -func helperGetEvInfo(es *eventStore, entry int) (*recordevents.EventInfo, error) { - evInfoBytes, err := es.GetEventInfoBytes(entry) - if err != nil { - return nil, fmt.Errorf("error calling get on item %d: %v", entry, err) - } - if len(evInfoBytes) == 0 { - return nil, fmt.Errorf("empty info bytes") - } - - var evInfo recordevents.EventInfo - err = json.Unmarshal(evInfoBytes, &evInfo) - if err != nil { - return nil, fmt.Errorf("error unmarshalling stored JSON: %v", err) - } - return &evInfo, nil -} - -// Test that adding and getting a bunch of events stores them all -// and that retrieving the events retrieves the correct events. -func TestAddGetMany(t *testing.T) { - es := newEventStore() - - count := 10009 - for i := 0; i < count; i++ { - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - ce.SetSource("https://source.test.event.knative.dev/foo") - ce.SetID(strconv.FormatInt(int64(i), 10)) - es.StoreEvent(&ce, nil, nil) - minAvail, maxSeen := es.MinMax() - if minAvail != 1 { - t.Fatalf("Pass %d Bad min: %d, expected %d", i, minAvail, 1) - } - if maxSeen != i+1 { - t.Fatalf("Pass %d Bad max: %d, expected %d", i, maxSeen, i+1) - } - - } - for i := 1; i <= count; i++ { - evInfo, err := helperGetEvInfo(es, i) - if err != nil { - t.Fatalf("Count %d error: %v", count, err) - } - - if evInfo.Event == nil { - t.Fatalf("Unexpected empty event info event %d: %+v", i, evInfo) - } - if len(evInfo.Error) != 0 { - t.Fatalf("Unexpected error for stored event %d: %s", i, evInfo.Error) - } - - // Make sure it's the expected event - seenID := evInfo.Event.ID() - expectedID := strconv.FormatInt(int64(i-1), 10) - if seenID != expectedID { - t.Errorf("Incorrect id on retrieval: %s, expected %s", seenID, expectedID) - } - } - _, err := es.GetEventInfoBytes(count + 1) - if err == nil { - t.Errorf("Unexpected non-error return for getinfo of %d", count+1) - } - - _, err = es.GetEventInfoBytes(0) - if err == nil { - t.Errorf("Unexpected non-error return for getinfo of %d", 0) - } - -} - -func TestEmpty(t *testing.T) { - es := newEventStore() - min, max := es.MinMax() - if min != 1 { - t.Errorf("Invalid min: %d, expected %d", min, 1) - } - if max != 0 { - t.Errorf("Invalid max: %d, expected %d", max, 0) - } - - for i := -2; i < 2; i++ { - _, err := es.GetEventInfoBytes(0) - if err == nil { - t.Errorf("Unexpected non-error return for getinfo of %d", i) - } - } - -} - -func TestAddGetSingleValid(t *testing.T) { - expectedType := "knative.dev.test.event.a" - expectedSource := "https://source.test.event.knative.dev/foo" - expectedID := "111" - es := newEventStore() - - headers := make(map[string][]string) - headers["foo"] = []string{"bar", "baz"} - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType(expectedType) - ce.SetSource(expectedSource) - ce.SetID(expectedID) - es.StoreEvent(&ce, nil, headers) - minAvail, maxSeen := es.MinMax() - if minAvail != maxSeen { - t.Fatalf("Expected match, saw %d, %d", minAvail, maxSeen) - } - - evInfoBytes, err := es.GetEventInfoBytes(minAvail) - if err != nil { - t.Fatalf("Error calling get: %v", err) - } - var evInfo recordevents.EventInfo - err = json.Unmarshal(evInfoBytes, &evInfo) - if err != nil { - t.Fatalf("Error unmarshalling stored JSON: %v", err) - } - - if evInfo.Event == nil { - t.Fatalf("Unexpected empty event info event: %+v", evInfo) - } - if len(evInfo.Error) != 0 { - t.Fatalf("Unexpected error for stored event: %s", evInfo.Error) - } - if len(evInfo.HTTPHeaders) != 1 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if len(evInfo.HTTPHeaders["foo"]) != 2 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if evInfo.HTTPHeaders["foo"][0] != "bar" || evInfo.HTTPHeaders["foo"][1] != "baz" { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - seenID := evInfo.Event.ID() - if seenID != expectedID { - t.Errorf("Incorrect id on retrieval: %s, expected %s", seenID, expectedID) - } - seenSource := evInfo.Event.Source() - if seenSource != expectedSource { - t.Errorf("Incorrect source on retrieval: %s, expected %s", seenSource, expectedSource) - } - seenType := evInfo.Event.Type() - if seenType != expectedType { - t.Errorf("Incorrect type on retrieval: %s, expected %s", seenType, expectedType) - } -} - -func TestAddGetSingleInvalid(t *testing.T) { - es := newEventStore() - - headers := make(map[string][]string) - headers["foo"] = []string{"bar", "baz"} - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - // No source - ce.SetID("111") - es.StoreEvent(&ce, nil, headers) - minAvail, maxSeen := es.MinMax() - if minAvail != maxSeen { - t.Fatalf("Expected match, saw %d, %d", minAvail, maxSeen) - } - - evInfoBytes, err := es.GetEventInfoBytes(minAvail) - if err != nil { - t.Fatalf("Error calling get: %v", err) - } - var evInfo recordevents.EventInfo - err = json.Unmarshal(evInfoBytes, &evInfo) - if err != nil { - t.Fatalf("Error unmarshalling stored JSON: %v", err) - } - if evInfo.Event != nil { - t.Fatalf("Unexpected event info: %+v", evInfo) - } - if len(evInfo.Error) == 0 { - t.Fatalf("Unexpected empty error for stored event: %s", evInfo.Error) - } - if len(evInfo.HTTPHeaders) != 1 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if len(evInfo.HTTPHeaders["foo"]) != 2 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if evInfo.HTTPHeaders["foo"][0] != "bar" || evInfo.HTTPHeaders["foo"][1] != "baz" { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } -} - -func TestAddGetSingleInvalidError(t *testing.T) { - es := newEventStore() - - headers := make(map[string][]string) - headers["foo"] = []string{"bar", "baz"} - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - ce.SetID("111") - ce.SetSource("nnn") - es.StoreEvent(&ce, fmt.Errorf("Error passed in"), headers) - minAvail, maxSeen := es.MinMax() - if minAvail != maxSeen { - t.Fatalf("Expected match, saw %d, %d", minAvail, maxSeen) - } - - evInfoBytes, err := es.GetEventInfoBytes(minAvail) - if err != nil { - t.Fatalf("Error calling get: %v", err) - } - var evInfo recordevents.EventInfo - err = json.Unmarshal(evInfoBytes, &evInfo) - if err != nil { - t.Fatalf("Error unmarshalling stored JSON: %v", err) - } - if evInfo.Event != nil { - t.Fatalf("Unexpected event info: %+v", evInfo) - } - if len(evInfo.Error) == 0 { - t.Fatalf("Unexpected empty error for stored event: %s", evInfo.Error) - } - if len(evInfo.HTTPHeaders) != 1 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if len(evInfo.HTTPHeaders["foo"]) != 2 { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } - if evInfo.HTTPHeaders["foo"][0] != "bar" || evInfo.HTTPHeaders["foo"][1] != "baz" { - t.Fatalf("Unexpected header contents for stored event: %+v", evInfo.HTTPHeaders) - } -} - -func helperFillCount(es *eventStore, count int) { - for i := 0; i < count; i++ { - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - ce.SetSource("https://source.test.event.knative.dev/foo") - ce.SetID(strconv.FormatInt(int64(i), 10)) - es.StoreEvent(&ce, nil, nil) - } -} - -// Test that adding and getting a bunch of events stores them all -// and that retrieving the events retrieves the correct events. -func TestTrim(t *testing.T) { - count := evBlockSize + 10 - - validTrimPoints := []int{0, 1, 2, evBlockSize - 1, evBlockSize, evBlockSize + 1, evBlockSize + 2, count - 1, count} - invalidTrimPoints := []int{-2, -1, count + 1, count + evBlockSize} - - for _, testVal := range validTrimPoints { - es := newEventStore() - helperFillCount(es, count) - err := es.TrimThrough(testVal) - if err != nil { - t.Fatalf("Unexpected error trimming to %d: %v", testVal, err) - } - minAvail, maxSeen := es.MinMax() - if testVal == count { - if minAvail != maxSeen+1 { - t.Errorf("Incorrect minAvail (%d != %d+1) trimming to %d", minAvail, maxSeen, testVal) - } - } else if minAvail != testVal+1 { - t.Errorf("Incorrect minAvail %d trimming to %d", minAvail, testVal) - } - if maxSeen != count { - t.Errorf("Incorrect maxSeen %d trimming to %d", maxSeen, testVal) - } - if minAvail <= maxSeen { - evInfo, err := helperGetEvInfo(es, minAvail) - if err != nil { - t.Fatalf("Couldn't get min avail %d, trim %d error: %v", minAvail, testVal, err) - } - seenID := evInfo.Event.ID() - expectedID := strconv.FormatInt(int64(minAvail-1), 10) - if seenID != expectedID { - t.Fatalf("Expected ID %s, saw %s for ev %d, trim %d", expectedID, seenID, minAvail, testVal) - } - } - ce := cloudevents.NewEvent(cloudevents.VersionV1) - ce.SetType("knative.dev.test.event.a") - ce.SetSource("https://source.test.event.knative.dev/foo") - ce.SetID(strconv.FormatInt(int64(count), 10)) - es.StoreEvent(&ce, nil, nil) - addedMinAvail, addedMaxSeen := es.MinMax() - if addedMaxSeen != maxSeen+1 { - t.Fatalf("Add after trim resulted in bad maxSeen: expected %d, saw %d for trim %d", - maxSeen, addedMaxSeen, testVal) - } - if minAvail == -1 && addedMinAvail != addedMaxSeen { - t.Errorf("Add after full trim resulted in bad minAvail: expected %d, saw %d for trim %d", - addedMaxSeen, addedMinAvail, testVal) - } else if minAvail != -1 && minAvail != addedMinAvail { - t.Errorf("Add after partial trim resulted in bad minAvail: expected %d, saw %d for trim %d", - minAvail, addedMinAvail, testVal) - - } - - } - for _, testVal := range invalidTrimPoints { - es := newEventStore() - helperFillCount(es, count) - err := es.TrimThrough(testVal) - if err == nil { - t.Fatalf("Incorrect missing error trimming to %d", testVal) - } - minAvail, maxSeen := es.MinMax() - if minAvail != 1 { - t.Errorf("Incorrect minAvail %d trimming to %d", minAvail, testVal) - } - if maxSeen != count { - t.Errorf("Incorrect maxSeen %d trimming to %d", maxSeen, testVal) - } - } -} diff --git a/test/test_images/recordevents/main.go b/test/test_images/recordevents/main.go index 8dd74116338..51fde078e9b 100644 --- a/test/test_images/recordevents/main.go +++ b/test/test_images/recordevents/main.go @@ -17,164 +17,60 @@ limitations under the License. package main import ( - "context" - "encoding/json" - "fmt" "log" "net/http" "os" - "strconv" - "strings" - cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding" - cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http" - "go.uber.org/zap" + "k8s.io/client-go/rest" + "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/logging" + _ "knative.dev/pkg/system/testing" "knative.dev/eventing/pkg/kncloudevents" - testlib "knative.dev/eventing/test/lib" "knative.dev/eventing/test/lib/dropevents" - "knative.dev/eventing/test/lib/recordevents" + "knative.dev/eventing/test/lib/recordevents/observer" + "knative.dev/eventing/test/lib/recordevents/recorder_vent" "knative.dev/eventing/test/test_images" ) -type eventRecorder struct { - es *eventStore -} - -func newEventRecorder() *eventRecorder { - return &eventRecorder{es: newEventStore()} -} - -// Start the recordevents REST api server -func (er *eventRecorder) StartServer(port int) { - http.HandleFunc(recordevents.GetMinMaxPath, er.handleMinMax) - http.HandleFunc(recordevents.GetEntryPath, er.handleGetEntry) - http.HandleFunc(recordevents.TrimThroughPath, er.handleTrim) - go http.ListenAndServe(fmt.Sprintf(":%d", port), nil) -} - -// HTTP handler for GetMinMax requests -func (er *eventRecorder) handleMinMax(w http.ResponseWriter, r *http.Request) { - minAvail, maxSeen := er.es.MinMax() - minMax := recordevents.MinMaxResponse{ - MinAvail: minAvail, - MaxSeen: maxSeen, - } - respBytes, err := json.Marshal(minMax) - if err != nil { - log.Panicf("Internal error: json marshal shouldn't fail: (%v) (%+v)", err, minMax) - } - - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) - w.Write(respBytes) -} - -// HTTP handler for TrimThrough requests -func (er *eventRecorder) handleTrim(w http.ResponseWriter, r *http.Request) { - // If we extend this much at all we should vendor a better mux(gorilla, etc) - path := strings.TrimLeft(r.URL.Path, "/") - getPrefix := strings.TrimLeft(recordevents.TrimThroughPath, "/") - suffix := strings.TrimLeft(strings.TrimPrefix(path, getPrefix), "/") - - seqNum, err := strconv.ParseInt(suffix, 10, 32) - if err != nil { - http.Error(w, "Can't parse event sequence number in request", http.StatusBadRequest) - return - } - - err = er.es.TrimThrough(int(seqNum)) - if err != nil { - http.Error(w, "Invalid sequence number in request to trim", http.StatusNotFound) - return - } - - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) -} - -// HTTP handler for GetEntry requests -func (er *eventRecorder) handleGetEntry(w http.ResponseWriter, r *http.Request) { - // If we extend this much at all we should vendor a better mux(gorilla, etc) - path := strings.TrimLeft(r.URL.Path, "/") - getPrefix := strings.TrimLeft(recordevents.GetEntryPath, "/") - suffix := strings.TrimLeft(strings.TrimPrefix(path, getPrefix), "/") - - seqNum, err := strconv.ParseInt(suffix, 10, 32) +func main() { + cfg, err := rest.InClusterConfig() if err != nil { - http.Error(w, "Can't parse event sequence number in request", http.StatusBadRequest) - return + log.Fatal("Error while reading the cfg", err) } + //nolint // nil ctx is fine here, look at the code of EnableInjectionOrDie + ctx := sharedmain.EnableInjectionOrDie(nil, cfg) - entryBytes, err := er.es.GetEventInfoBytes(int(seqNum)) - if err != nil { - http.Error(w, "Couldn't find requested event", http.StatusNotFound) - return + if err := test_images.ConfigureTracing(logging.FromContext(ctx), ""); err != nil { + logging.FromContext(ctx).Fatal("Unable to setup trace publishing", err) } - w.Header().Set("Content-Type", "text/json") - w.WriteHeader(http.StatusOK) - w.Write(entryBytes) -} - -func (er *eventRecorder) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - m := cloudeventshttp.NewMessageFromHttpRequest(request) - defer m.Finish(nil) - - event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m) - header := request.Header - - er.es.StoreEvent(event, eventErr, map[string][]string(header)) - - headerNameList := testlib.InterestingHeaders() - for _, headerName := range headerNameList { - if headerValue := header.Get(headerName); headerValue != "" { - log.Printf("Header %s: %s\n", headerName, headerValue) - } - } - - if eventErr != nil { - log.Printf("error receiving the event: %v", eventErr) - } else { - valErr := event.Validate() - if valErr == nil { - log.Printf("eventdetails:\n%s", event.String()) - } else { - log.Printf("error validating the event: %v", valErr) - } - } - - writer.WriteHeader(http.StatusAccepted) -} - -func main() { - er := newEventRecorder() - er.StartServer(recordevents.RecordEventsPort) - - logger, _ := zap.NewDevelopment() - if err := test_images.ConfigureTracing(logger.Sugar(), ""); err != nil { - log.Fatalf("Unable to setup trace publishing: %v", err) - } + obs := observer.NewFromEnv( + recorder_vent.NewFromEnv(ctx), + ) algorithm, ok := os.LookupEnv(dropevents.SkipAlgorithmKey) - handler := kncloudevents.CreateHandler(er) if ok { skipper := dropevents.SkipperAlgorithm(algorithm) counter := dropevents.CounterHandler{ Skipper: skipper, } - next := handler - handler = http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - if counter.Skip() { - writer.WriteHeader(http.StatusConflict) - return - } - next.ServeHTTP(writer, request) + err = obs.Start(ctx, kncloudevents.CreateHandler, func(handler http.Handler) http.Handler { + return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { + if counter.Skip() { + writer.WriteHeader(http.StatusConflict) + return + } + handler.ServeHTTP(writer, request) + }) }) + } else { + err = obs.Start(ctx, kncloudevents.CreateHandler) } - err := http.ListenAndServe(":8080", handler) if err != nil { - panic(err) + logging.FromContext(ctx).Fatal("Error during start", err) } + + logging.FromContext(ctx).Info("Closing the recordevents process") } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4d11980730a..e5574149f3e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -254,6 +254,7 @@ github.com/prometheus/client_golang/prometheus/promhttp # github.com/prometheus/client_model v0.2.0 github.com/prometheus/client_model/go # github.com/prometheus/common v0.9.1 +## explicit github.com/prometheus/common/expfmt github.com/prometheus/common/internal/bitbucket.org/ww/goautoneg github.com/prometheus/common/log