Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions test/base/resources/cloud_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@ package resources

// CloudEvent specifies the arguments for a CloudEvent used by the sendevents or transformevents image.
type CloudEvent struct {
ID string
Type string
Source string
Data string // must be in json format
Encoding string // binary or structured
ID string
Type string
Source string
Extensions map[string]interface{}
Data string // must be in json format
Encoding string // binary or structured
}

// CloudEventBaseData defines a simple struct that can be used as data of a CloudEvent.
Expand Down
21 changes: 19 additions & 2 deletions test/base/resources/eventing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package resources
// This file contains functions that construct Eventing resources.

import (
"fmt"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
eventingduckv1alpha1 "knative.dev/eventing/pkg/apis/duck/v1alpha1"
Expand Down Expand Up @@ -105,8 +106,8 @@ func Broker(name string, options ...BrokerOption) *eventingv1alpha1.Broker {
return broker
}

// WithTriggerFilter returns an option that adds a TriggerFilter for the given Trigger.
func WithTriggerFilter(eventSource, eventType string) TriggerOption {
// WithDeprecatedSourceAndTypeTriggerFilter returns an option that adds a TriggerFilter with DeprecatedSourceAndType for the given Trigger.
func WithDeprecatedSourceAndTypeTriggerFilter(eventSource, eventType string) TriggerOption {
return func(t *eventingv1alpha1.Trigger) {
triggerFilter := &eventingv1alpha1.TriggerFilter{
DeprecatedSourceAndType: &eventingv1alpha1.TriggerFilterSourceAndType{
Expand All @@ -118,6 +119,22 @@ func WithTriggerFilter(eventSource, eventType string) TriggerOption {
}
}

// WithAttributesTriggerFilter returns an option that adds a TriggerFilter with Attributes for the given Trigger.
func WithAttributesTriggerFilter(eventSource, eventType string, extensions map[string]interface{}) TriggerOption {
attrs := make(map[string]string)
attrs["type"] = eventType
attrs["source"] = eventSource
for k, v := range extensions {
attrs[k] = fmt.Sprintf("%v", v)
}
triggerFilterAttributes := eventingv1alpha1.TriggerFilterAttributes(attrs)
return func(t *eventingv1alpha1.Trigger) {
t.Spec.Filter = &eventingv1alpha1.TriggerFilter{
Attributes: &triggerFilterAttributes,
}
}
}

// WithBroker returns an option that adds a Broker for the given Trigger.
func WithBroker(brokerName string) TriggerOption {
return func(t *eventingv1alpha1.Trigger) {
Expand Down
14 changes: 12 additions & 2 deletions test/base/resources/kube.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package resources
// This file contains functions that construct common Kubernetes resources.

import (
"encoding/json"
"fmt"
"strconv"

corev1 "k8s.io/api/core/v1"
Expand All @@ -30,11 +32,17 @@ import (
)

// EventSenderPod creates a Pod that sends a single event to the given address.
func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod {
func EventSenderPod(name string, sink string, event *CloudEvent) (*corev1.Pod, error) {
const imageName = "sendevents"
if event.Encoding == "" {
event.Encoding = CloudEventEncodingBinary
}
eventExtensionsBytes, error := json.Marshal(event.Extensions)
eventExtensions := string(eventExtensionsBytes)
if error != nil {
return nil, fmt.Errorf("encountered error when we marshall cloud event extensions %v", error)
}

return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Expand All @@ -51,6 +59,8 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod {
event.Type,
"-event-source",
event.Source,
"-event-extensions",
eventExtensions,
"-event-data",
event.Data,
"-event-encoding",
Expand All @@ -62,7 +72,7 @@ func EventSenderPod(name string, sink string, event *CloudEvent) *corev1.Pod {
//TODO restart on failure?
RestartPolicy: corev1.RestartPolicyNever,
},
}
}, nil
}

// EventLoggerPod creates a Pod that logs events received.
Expand Down
5 changes: 4 additions & 1 deletion test/common/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,10 @@ func (client *Client) sendFakeEventToAddress(
event *resources.CloudEvent,
) error {
namespace := client.Namespace
pod := resources.EventSenderPod(senderName, uri, event)
pod, err := resources.EventSenderPod(senderName, uri, event)
if err != nil {
return err
}
client.CreatePodOrFail(pod)
if err := pkgTest.WaitForPodRunning(client.Kube, senderName, namespace); err != nil {
return err
Expand Down
31 changes: 30 additions & 1 deletion test/common/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ limitations under the License.
package common

import (
"encoding/json"
"regexp"
"strings"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)

Expand Down Expand Up @@ -92,14 +95,40 @@ func (client *Client) FindAnyLogContents(podName string, contents []string) (boo
if err != nil {
return false, err
}
eventContentsSet, err := parseEventContentsFromPodLogs(string(logs))
if err != nil {
return false, err
}
for _, content := range contents {
if strings.Contains(string(logs), content) {
if eventContentsSet.Has(content) {
return true, nil
}
}
return false, nil
}

// parseEventContentsFromPodLogs extracts the contents of events from a Pod logs
// Example log entry: 2019/08/21 22:46:38 {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"}
// Use regex to get the event content with json format: {"msg":"Body-type1-source1--extname1-extval1-extname2-extvalue2","sequence":"1"}
// Get the eventContent with key "msg"
// Returns a set with all unique event contents
func parseEventContentsFromPodLogs(logs string) (sets.String, error) {
re := regexp.MustCompile(`{.+}`)
matches := re.FindAllString(logs, -1)
eventContentsSet := sets.String{}
for _, match := range matches {
var matchedLogs map[string]string
err := json.Unmarshal([]byte(match), &matchedLogs)
if err != nil {
return nil, err
} else {
eventContent := matchedLogs["msg"]
eventContentsSet.Insert(eventContent)
}
}
return eventContentsSet, nil
}

// getContainerName gets name of the first container of the given pod.
// Now our logger pod only contains one single container, and is only used for receiving events and validation.
func (client *Client) getContainerName(podName, namespace string) (string, error) {
Expand Down
Loading