Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 48 additions & 59 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,22 @@ import (
"context"
"fmt"
"net/http"
"strings"
"testing"
"time"

ce "github.com/cloudevents/sdk-go"
ce2 "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"github.com/openzipkin/zipkin-go/model"
"go.opentelemetry.io/otel/api/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"knative.dev/pkg/test/zipkin"

tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
)

// SetupInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the
Expand Down Expand Up @@ -95,7 +93,7 @@ func tracingTest(
tc TracingTestCase,
) {
const (
loggerPodName = "logger"
recordEventsPodName = "recordevents"
)

client := lib.Setup(t, true, setupClient)
Expand All @@ -105,42 +103,39 @@ func tracingTest(
// TestMain.
tracinghelper.Setup(t, client)

expected, mustMatch := setupInfrastructure(t, &channel, client, loggerPodName, tc)
matches := assertEventMatch(t, client, loggerPodName, mustMatch)
// Setup the test infrastructure
expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, tc)

// Start the event info store and assert the event was received correctly
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer targetTracker.Cleanup()
matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher))

// Match the trace
traceID := getTraceIDHeader(t, matches)
trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool {
tree, err := tracinghelper.GetTraceTree(trace)
if err != nil {
return false
}
// Do not pass t to prevent unnecessary log output.
return len(expected.MatchesSubtree(nil, tree)) > 0
return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0
})
if err != nil {
t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace))
tree, err := tracinghelper.GetTraceTree(trace)
if err != nil {
t.Fatal(err)
}
if len(expected.MatchesSubtree(t, tree)) == 0 {
t.Fatalf("No matching subtree. want: %v got: %v", expected, tree)
if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 {
t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree)
}
}
}

// assertEventMatch verifies that recorder pod contains at least one event that
// matches mustMatch. It is used to show that the expected event was sent to
// the logger Pod. It returns a list of the matching events.
func assertEventMatch(t *testing.T, client *lib.Client, recorderPodName string, mustMatch cetest.EventMatcher) []recordevents.EventInfo {
targetTracker, err := recordevents.NewEventInfoStore(client, recorderPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer targetTracker.Cleanup()
return targetTracker.AssertAtLeast(1, recordevents.MatchEvent(mustMatch))
}

// getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the
// first matching event, but registers a fatal error if more than one traceid header is seen
// in that message.
Expand All @@ -167,9 +162,10 @@ func setupChannelTracingWithReply(
t *testing.T,
channel *metav1.TypeMeta,
client *lib.Client,
loggerPodName string,
recordEventsPodName string,
tc TracingTestCase,
) (tracinghelper.TestSpanTree, cetest.EventMatcher) {
eventSource := "sender"
// Create the Channels.
channelName := "ch"
client.CreateChannelOrFail(channelName, channel)
Expand All @@ -178,15 +174,16 @@ func setupChannelTracingWithReply(
client.CreateChannelOrFail(replyChannelName, channel)

// Create the 'sink', a LogEvents Pod and a K8s Service that points to it.
loggerPod := resources.EventRecordPod(loggerPodName)
client.CreatePodOrFail(loggerPod, lib.WithService(loggerPodName))
recordEventsPod := resources.EventRecordPod(recordEventsPodName)
client.CreatePodOrFail(recordEventsPod, lib.WithService(recordEventsPodName))

// Create the subscriber, a Pod that mutates the event.
transformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{
EventContextV1: ce.EventContextV1{
Type: "mutated",
},
})
transformerPod := resources.EventTransformationPod(
"transformer",
"mutated",
eventSource,
nil,
)
client.CreatePodOrFail(transformerPod, lib.WithService(transformerPod.Name))

// Create the Subscription linking the Channel to the mutator.
Expand All @@ -202,28 +199,30 @@ func setupChannelTracingWithReply(
"reply-sub",
replyChannelName,
channel,
resources.WithSubscriberForSubscription(loggerPodName),
resources.WithSubscriberForSubscription(recordEventsPodName),
)

// Wait for all test resources to be ready, so that we can start sending events.
client.WaitForAllTestResourcesReadyOrFail()

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := string(uuid.NewUUID())
body := fmt.Sprintf("TestChannelTracing %s", eventID)
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
cloudevents.WithID(eventID),
)
eventID := uuid.New().String()
event := ce2.NewEvent()
event.SetID(eventID)
event.SetSource(senderName)
event.SetType(lib.DefaultEventType)
body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID)
if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressableOrFail
if tc.IncomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressableOrFail
client.SendEventToAddressable(senderName, channelName, channel, event, sender.EnableTracing())
} else {
client.SendEventToAddressable(senderName, channelName, channel, event)
}
sendEvent(senderName, channelName, channel, event)

// We expect the following spans:
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
Expand Down Expand Up @@ -293,7 +292,7 @@ func setupChannelTracingWithReply(
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Client,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace),
"/",
),
),
Expand All @@ -303,10 +302,10 @@ func setupChannelTracingWithReply(
Span: tracinghelper.MatchHTTPSpanNoReply(
model.Server,
tracinghelper.WithHTTPHostAndPath(
fmt.Sprintf("%s.%s.svc.cluster.local", loggerPod.Name, client.Namespace),
fmt.Sprintf("%s.%s.svc.cluster.local", recordEventsPod.Name, client.Namespace),
"/",
),
tracinghelper.WithLocalEndpointServiceName(loggerPod.Name),
tracinghelper.WithLocalEndpointServiceName(recordEventsPod.Name),
),
},
},
Expand All @@ -333,19 +332,9 @@ func setupChannelTracingWithReply(
}
}

matchFunc := func(ev ce2.Event) error {
if ev.Source() != senderName {
return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source())
}
if ev.ID() != eventID {
return fmt.Errorf("expected id %s, saw %s", eventID, ev.ID())
}
db := ev.Data()
if !strings.Contains(string(db), body) {
return fmt.Errorf("expected substring %s in %s", body, string(db))
}
return nil
}

return expected, matchFunc
return expected, cetest.AllOf(
cetest.HasSource(senderName),
cetest.HasId(eventID),
recordevents.DataContains(body),
)
}
27 changes: 22 additions & 5 deletions test/test_images/event-sender/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"flag"
"fmt"
"log"
nethttp "net/http"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/plugin/ochttp/propagation/tracecontext"
"go.uber.org/zap"

"knative.dev/eventing/pkg/tracing"
Expand Down Expand Up @@ -98,23 +101,37 @@ func main() {
log.Fatalf("unsupported encoding option: %q\n", eventEncoding)
}

t, err := cloudevents.NewHTTP(cloudevents.WithTarget(sink))
t, err := cloudevents.NewHTTP(
cloudevents.WithTarget(sink),
cloudevents.WithRoundTripper(&ochttp.Transport{
Base: nethttp.DefaultTransport,
Propagation: &tracecontext.HTTPFormat{},
}),
)
if err != nil {
log.Fatalf("failed to create transport, %v", err)
}

var c cloudevents.Client
if addTracing {
log.Println("Adding tracing")
logger, _ := zap.NewDevelopment()
if err := tracing.SetupStaticPublishing(logger.Sugar(), "", tracing.AlwaysSample); err != nil {
log.Fatalf("Unable to setup trace publishing: %v", err)
}

c, err = cloudevents.NewClientObserved(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
cloudevents.WithTracePropagation,
)
} else {
c, err = cloudevents.NewClient(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
)
}

c, err := cloudevents.NewClient(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
)
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion test/test_images/transformevents/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,10 @@ func main() {
log.Fatalf("failed to create transport, %v", err)
}

c, err := cloudevents.NewClient(t,
c, err := cloudevents.NewClientObserved(t,
cloudevents.WithTimeNow(),
cloudevents.WithUUIDs(),
cloudevents.WithTracePropagation,
)
if err != nil {
log.Fatalf("failed to create client, %v", err)
Expand Down