Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions test/conformance/helpers/broker_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ package helpers

import (
"fmt"
"strings"
"testing"

ce "github.com/cloudevents/sdk-go"
ce2 "github.com/cloudevents/sdk-go/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/openzipkin/zipkin-go/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"

"knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/pkg/utils"
tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/eventing/test/lib"
"knative.dev/eventing/test/lib/cloudevents"
"knative.dev/eventing/test/lib/recordevents"
"knative.dev/eventing/test/lib/resources"
"knative.dev/eventing/test/lib/resources/sender"
)

// BrokerTracingTestHelperWithChannelTestRunner runs the Broker tracing tests for all Channels in
Expand All @@ -45,44 +43,31 @@ func BrokerTracingTestHelperWithChannelTestRunner(
setupClient lib.SetupClientOption,
) {
channelTestRunner.RunTests(t, lib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) {
BrokerTracingTestHelper(t, brokerClass, channel, setupClient)
tracingTest(t, setupClient, setupBrokerTracing(brokerClass), channel)
})
}

// BrokerTracingTestHelper runs the Broker tracing test using the given TypeMeta.
func BrokerTracingTestHelper(t *testing.T, brokerClass string, channel metav1.TypeMeta, setupClient lib.SetupClientOption) {
testCases := map[string]TracingTestCase{
"includes incoming trace id": {
IncomingTraceId: true,
},
}

for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
tracingTest(t, setupClient, setupBrokerTracing(brokerClass), channel, tc)
})
}
}

// setupBrokerTracing is the general setup for TestBrokerTracing. It creates the following:
// 1. Broker.
// 2. Trigger on 'foo' events -> K8s Service -> transformer Pod (which replies with a 'bar' event).
// 3. Trigger on 'bar' events -> K8s Service -> eventdetails Pod.
// 4. Sender Pod which sends a 'foo' event.
// It returns a string that is expected to be sent by the SendEvents Pod and should be present in
// the LogEvents Pod logs.
func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
func setupBrokerTracing(brokerClass string) SetupTracingTestInfrastructureFunc {
const (
etTransformer = "transformer"
etLogger = "logger"
defaultCMPName = "eventing"
etTransformer = "transformer"
etLogger = "logger"
senderName = "sender"
eventID = "event-1"
eventBody = `{"msg":"TestBrokerTracing event-1"}`
)
return func(
t *testing.T,
channel *metav1.TypeMeta,
client *lib.Client,
loggerPodName string,
tc TracingTestCase,
senderPublishTrace bool,
) (tracinghelper.TestSpanTree, cetest.EventMatcher) {
// Create a configmap used by the broker.
client.CreateBrokerConfigMapOrFail("br", channel)
Expand All @@ -107,11 +92,12 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {

// Create a transformer (EventTransfrmer) Pod that replies with the same event as the input,
// except the reply's event's type is changed to bar.
eventTransformerPod := resources.DeprecatedEventTransformationPod("transformer", &cloudevents.CloudEvent{
EventContextV1: ce.EventContextV1{
Type: etLogger,
},
})
eventTransformerPod := resources.EventTransformationPod(
"transformer",
etLogger,
senderName,
[]byte(eventBody),
)
client.CreatePodOrFail(eventTransformerPod, lib.WithService(eventTransformerPod.Name))

// Create a Trigger that receives events (type=foo) and sends them to the transformer Pod.
Expand All @@ -126,22 +112,20 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
client.WaitForAllTestResourcesReadyOrFail()

// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := string(uuid.NewUUID())
body := fmt.Sprintf("TestBrokerTracing %s", eventID)
event := cloudevents.New(
fmt.Sprintf(`{"msg":%q}`, body),
cloudevents.WithSource(senderName),
cloudevents.WithID(eventID),
cloudevents.WithType(etTransformer),
)
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetSource(senderName)
event.SetType(etTransformer)
if err := event.SetData(cloudevents.ApplicationJSON, []byte(eventBody)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
sendEvent := client.SendFakeEventToAddressableOrFail
if tc.IncomingTraceId {
sendEvent = client.SendFakeEventWithTracingToAddressableOrFail
if senderPublishTrace {
client.SendEventToAddressable(senderName, broker.Name, lib.BrokerTypeMeta, event, sender.EnableTracing())
} else {
client.SendEventToAddressable(senderName, broker.Name, lib.BrokerTypeMeta, event)
}
sendEvent(senderName, broker.Name, lib.BrokerTypeMeta, event)

domain := utils.GetClusterDomainName()

Expand Down Expand Up @@ -207,7 +191,7 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
},
}

if tc.IncomingTraceId {
if senderPublishTrace {
expected = tracinghelper.TestSpanTree{
Note: "1. Send pod sends event to the Broker Ingress (only if the sending pod generates a span).",
Span: tracinghelper.MatchHTTPSpanNoReply(
Expand All @@ -217,21 +201,12 @@ func setupBrokerTracing(brokerClass string) SetupInfrastructureFunc {
Children: []tracinghelper.TestSpanTree{expected},
}
}
matchFunc := func(ev ce2.Event) error {
if ev.Source() != senderName {
return fmt.Errorf("expected source %s, saw %s", senderName, ev.Source())
}
if ev.ID() != eventID {
return fmt.Errorf("expected id %s, saw %s", eventID, ev.ID())
}
db := ev.Data()
if !strings.Contains(string(db), body) {
return fmt.Errorf("expected substring %s in data %s", body, string(db))
}
return nil
}

return expected, matchFunc
return expected, cetest.AllOf(
cetest.HasSource(senderName),
cetest.HasId(eventID),
recordevents.DataContains(eventBody),
)
}
}

Expand Down
121 changes: 7 additions & 114 deletions test/conformance/helpers/channel_tracing_test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,14 @@ limitations under the License.
package helpers

import (
"context"
"fmt"
"net/http"
"testing"
"time"

ce2 "github.com/cloudevents/sdk-go/v2"
cloudevents "github.com/cloudevents/sdk-go/v2"
cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"
"github.com/openzipkin/zipkin-go/model"
"go.opentelemetry.io/otel/api/trace"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/test/zipkin"

tracinghelper "knative.dev/eventing/test/conformance/helpers/tracing"
"knative.dev/eventing/test/lib"
Expand All @@ -38,26 +33,6 @@ import (
"knative.dev/eventing/test/lib/resources/sender"
)

// SetupInfrastructureFunc sets up the infrastructure for running tracing tests. It returns the
// expected trace as well as a string that is expected to be in the logger Pod's logs.
type SetupInfrastructureFunc func(
t *testing.T,
channel *metav1.TypeMeta,
client *lib.Client,
loggerPodName string,
tc TracingTestCase,
) (tracinghelper.TestSpanTree, cetest.EventMatcher)

// TracingTestCase is the test case information for tracing tests.
type TracingTestCase struct {
// IncomingTraceId controls whether the original request is sent to the Broker/Channel already
// has a trace ID associated with it by the sender.
IncomingTraceId bool
// Istio controls whether the Pods being created for the test (sender, transformer, logger,
// etc.) have Istio sidecars. It does not affect the Channel Pods.
Istio bool
}

// ChannelTracingTestHelperWithChannelTestRunner runs the Channel tracing tests for all Channels in
// the ChannelTestRunner.
func ChannelTracingTestHelperWithChannelTestRunner(
Expand All @@ -66,92 +41,10 @@ func ChannelTracingTestHelperWithChannelTestRunner(
setupClient lib.SetupClientOption,
) {
channelTestRunner.RunTests(t, lib.FeatureBasic, func(t *testing.T, channel metav1.TypeMeta) {
ChannelTracingTestHelper(t, channel, setupClient)
tracingTest(t, setupClient, setupChannelTracingWithReply, channel)
})
}

// ChannelTracingTestHelper runs the Channel tracing test using the given TypeMeta.
func ChannelTracingTestHelper(t *testing.T, channel metav1.TypeMeta, setupClient lib.SetupClientOption) {
testCases := map[string]TracingTestCase{
"includes incoming trace id": {
IncomingTraceId: true,
},
}

for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
tracingTest(t, setupClient, setupChannelTracingWithReply, channel, tc)
})
}
}

func tracingTest(
t *testing.T,
setupClient lib.SetupClientOption,
setupInfrastructure SetupInfrastructureFunc,
channel metav1.TypeMeta,
tc TracingTestCase,
) {
const (
recordEventsPodName = "recordevents"
)

client := lib.Setup(t, true, setupClient)
defer lib.TearDown(client)

// Do NOT call zipkin.CleanupZipkinTracingSetup. That will be called exactly once in
// TestMain.
tracinghelper.Setup(t, client)

// Setup the test infrastructure
expectedTestSpan, eventMatcher := setupInfrastructure(t, &channel, client, recordEventsPodName, tc)

// Start the event info store and assert the event was received correctly
targetTracker, err := recordevents.NewEventInfoStore(client, recordEventsPodName)
if err != nil {
t.Fatalf("Pod tracker failed: %v", err)
}
defer targetTracker.Cleanup()
matches := targetTracker.AssertAtLeast(1, recordevents.MatchEvent(eventMatcher))

// Match the trace
traceID := getTraceIDHeader(t, matches)
trace, err := zipkin.JSONTracePred(traceID, 5*time.Minute, func(trace []model.SpanModel) bool {
tree, err := tracinghelper.GetTraceTree(trace)
if err != nil {
return false
}
// Do not pass t to prevent unnecessary log output.
return len(expectedTestSpan.MatchesSubtree(nil, tree)) > 0
})
if err != nil {
t.Logf("Unable to get trace %q: %v. Trace so far %+v", traceID, err, tracinghelper.PrettyPrintTrace(trace))
tree, err := tracinghelper.GetTraceTree(trace)
if err != nil {
t.Fatal(err)
}
if len(expectedTestSpan.MatchesSubtree(t, tree)) == 0 {
t.Fatalf("No matching subtree. want: %v got: %v", expectedTestSpan, tree)
}
}
}

// getTraceIDHeader gets the TraceID from the passed in events. It returns the header from the
// first matching event, but registers a fatal error if more than one traceid header is seen
// in that message.
func getTraceIDHeader(t *testing.T, evInfos []recordevents.EventInfo) string {
for i := range evInfos {
if nil != evInfos[i].HTTPHeaders {
sc := trace.RemoteSpanContextFromContext(trace.DefaultHTTPPropagator().Extract(context.TODO(), http.Header(evInfos[i].HTTPHeaders)))
if sc.HasTraceID() {
return sc.TraceIDString()
}
}
}
t.Fatalf("FAIL: No traceid in %d messages: (%s)", len(evInfos), evInfos)
return ""
}

// setupChannelTracing is the general setup for TestChannelTracing. It creates the following:
// SendEvents (Pod) -> Channel -> Subscription -> K8s Service -> Mutate (Pod)
// v
Expand All @@ -163,7 +56,7 @@ func setupChannelTracingWithReply(
channel *metav1.TypeMeta,
client *lib.Client,
recordEventsPodName string,
tc TracingTestCase,
senderPublishTrace bool,
) (tracinghelper.TestSpanTree, cetest.EventMatcher) {
eventSource := "sender"
// Create the Channels.
Expand Down Expand Up @@ -208,17 +101,17 @@ func setupChannelTracingWithReply(
// Everything is setup to receive an event. Generate a CloudEvent.
senderName := "sender"
eventID := uuid.New().String()
event := ce2.NewEvent()
event := cloudevents.NewEvent()
event.SetID(eventID)
event.SetSource(senderName)
event.SetType(lib.DefaultEventType)
body := fmt.Sprintf(`{"msg":"TestChannelTracing %s"}`, eventID)
if err := event.SetData(ce2.ApplicationJSON, []byte(body)); err != nil {
if err := event.SetData(cloudevents.ApplicationJSON, []byte(body)); err != nil {
t.Fatalf("Cannot set the payload of the event: %s", err.Error())
}

// Send the CloudEvent (either with or without tracing inside the SendEvents Pod).
if tc.IncomingTraceId {
if senderPublishTrace {
client.SendEventToAddressable(senderName, channelName, channel, event, sender.EnableTracing())
} else {
client.SendEventToAddressable(senderName, channelName, channel, event)
Expand Down Expand Up @@ -317,7 +210,7 @@ func setupChannelTracingWithReply(
},
}

if tc.IncomingTraceId {
if senderPublishTrace {
expected = tracinghelper.TestSpanTree{
// 1. Sending pod sends event to Channel (only if the sending pod generates a span).
Span: tracinghelper.MatchHTTPSpanNoReply(
Expand Down
Loading