Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,16 @@ import (
"testing"
"time"

ce "github.com/cloudevents/sdk-go"
ce2 "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"github.com/openzipkin/zipkin-go/model"
"go.opentelemetry.io/otel/api/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/pkg/test/zipkin"

tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
)
Expand Down Expand Up @@ -170,6 +168,7 @@ func setupChannelTracingWithReply(
loggerPodName string,
tc TracingTestCase,
) (tracinghelper.TestSpanTree, cetest.EventMatcher) {
eventSource := "sender"
// Create the Channels.
channelName := "ch"
client.CreateChannelOrFail(channelName, channel)
Expand All @@ -182,11 +181,12 @@ func setupChannelTracingWithReply(
client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName))

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{
EventContextV1: ce.EventContextV1{
Type: "mutated",
},
})
transformerPod := resources.EventTransformationPod(
"transformer",
"mutated",
eventSource,
nil,
)
client.CreatePodOrFail(transformerPod, lib.WithService(transformerPod.Name))

// Create the Subscription linking the Channel to the mutator.
Expand All @@ -210,18 +210,20 @@ func setupChannelTracingWithReply(

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := string(uuid.NewUUID())
body := fmt.Sprintf("TestChannelTracing %s", eventID)
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
cloudevents.WithID(eventID),
)
eventID := uuid.New().String()
event := ce2.NewEvent()
event.SetID(eventID)
event.SetSource(senderName)
event.SetType(lib.DefaultEventType)
body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID)
if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressableOrFail
sendEvent := client.SendEventToAddressable
if tc.IncomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressableOrFail
sendEvent = client.SendEventWithTracingToAddressable
}
sendEvent(senderName, channelName, channel, event)

Expand Down Expand Up @@ -332,7 +334,6 @@ func setupChannelTracingWithReply(
Children: []tracinghelper.TestSpanTree{expected},
}
}

matchFunc := func(ev ce2.Event) error {
if ev.Source() != senderName {
return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source())
Expand Down
37 changes: 37 additions & 0 deletions test/lib/event_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,23 @@ func (c *Client) SendEventToAddressable(
}
}

// SendEventToAddressable will send the given event to the given Addressable.
func (c *Client) SendEventWithTracingToAddressable(
senderName,
addressableName string,
typemeta *metav1.TypeMeta,
event cloudevents.Event,
option ...sender.EventSenderOption,
) {
uri, err := c.GetAddressableURI(addressableName, typemeta)
if err != nil {
c.T.Fatalf("Failed to get the URI for %v-%s", typemeta, addressableName)
}
if err = c.SendEventWithTracing(senderName, uri, event, option...); err != nil {
c.T.Fatalf("Failed to send event %v with tracing to %s: %v", event, uri, err)
}
}

// SendEvent will create a sender pod, which will send the given event to the given url.
func (c *Client) SendEvent(
senderName string,
Expand All @@ -60,3 +77,23 @@ func (c *Client) SendEvent(
}
return nil
}

// SendEvent will create a sender pod, which will send the given event to the given url.
func (c *Client) SendEventWithTracing(
senderName string,
uri string,
event cloudevents.Event,
option ...sender.EventSenderOption,
) error {
namespace := c.Namespace
pod, err := sender.EventSenderPod("event-sender", senderName, uri, event, option...)
sender.EnableTracing()
if err != nil {
return err
}
c.CreatePodOrFail(pod)
if err := pkgTest.WaitForPodRunning(c.Kube, senderName, namespace); err != nil {
return err
}
return nil
}