diff --git a/tavern/internal/http/stream/gcp_coldstart_test.go b/tavern/internal/http/stream/gcp_coldstart_test.go index 45722466e..acfc3eb88 100644 --- a/tavern/internal/http/stream/gcp_coldstart_test.go +++ b/tavern/internal/http/stream/gcp_coldstart_test.go @@ -2,6 +2,7 @@ package stream_test import ( "context" + "fmt" "testing" "time" @@ -16,18 +17,19 @@ func TestPreventPubSubColdStarts_ValidInterval(t *testing.T) { defer cancel() // Create a mock topic and subscription. - topic, err := pubsub.OpenTopic(ctx, "mem://valid") + topicName := fmt.Sprintf("mem://valid-%d", time.Now().UnixNano()) + topic, err := pubsub.OpenTopic(ctx, topicName) if err != nil { t.Fatalf("Failed to open topic: %v", err) } defer topic.Shutdown(ctx) - sub, err := pubsub.OpenSubscription(ctx, "mem://valid") + sub, err := pubsub.OpenSubscription(ctx, topicName) if err != nil { t.Fatalf("Failed to open subscription: %v", err) } defer sub.Shutdown(ctx) - go stream.PreventPubSubColdStarts(ctx, 50*time.Millisecond, "mem://valid", "mem://valid") + go stream.PreventPubSubColdStarts(ctx, 50*time.Millisecond, topicName, topicName) // Expect to receive a message msg, err := sub.Receive(ctx) @@ -43,18 +45,19 @@ func TestPreventPubSubColdStarts_ZeroInterval(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() - topic, err := pubsub.OpenTopic(ctx, "mem://zero") + topicName := fmt.Sprintf("mem://zero-%d", time.Now().UnixNano()) + topic, err := pubsub.OpenTopic(ctx, topicName) if err != nil { t.Fatalf("Failed to open topic: %v", err) } defer topic.Shutdown(ctx) - sub, err := pubsub.OpenSubscription(ctx, "mem://zero") + sub, err := pubsub.OpenSubscription(ctx, topicName) if err != nil { t.Fatalf("Failed to open subscription: %v", err) } defer sub.Shutdown(ctx) - go stream.PreventPubSubColdStarts(ctx, 0, "mem://zero", "mem://zero") + go stream.PreventPubSubColdStarts(ctx, 0, topicName, topicName) // Expect to not receive a message and for the context to timeout _, err = sub.Receive(ctx) @@ -66,18 +69,19 @@ func TestPreventPubSubColdStarts_SubMillisecondInterval(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - topic, err := pubsub.OpenTopic(ctx, "mem://sub") + topicName := fmt.Sprintf("mem://sub-%d", time.Now().UnixNano()) + topic, err := pubsub.OpenTopic(ctx, topicName) if err != nil { t.Fatalf("Failed to open topic: %v", err) } defer topic.Shutdown(ctx) - sub, err := pubsub.OpenSubscription(ctx, "mem://sub") + sub, err := pubsub.OpenSubscription(ctx, topicName) if err != nil { t.Fatalf("Failed to open subscription: %v", err) } defer sub.Shutdown(ctx) - go stream.PreventPubSubColdStarts(ctx, 1*time.Microsecond, "mem://sub", "mem://sub") + go stream.PreventPubSubColdStarts(ctx, 1*time.Microsecond, topicName, topicName) // Expect to receive a message msg, err := sub.Receive(ctx) diff --git a/tavern/internal/http/stream/mux_test.go b/tavern/internal/http/stream/mux_test.go index 38be5b97a..de466ed4f 100644 --- a/tavern/internal/http/stream/mux_test.go +++ b/tavern/internal/http/stream/mux_test.go @@ -2,6 +2,7 @@ package stream_test import ( "context" + "fmt" "testing" "time" @@ -17,10 +18,11 @@ func TestMux(t *testing.T) { defer cancel() // Setup Topic and Subscription - topic, err := pubsub.OpenTopic(ctx, "mem://mux-test") + topicName := fmt.Sprintf("mem://mux-test-%d", time.Now().UnixNano()) + topic, err := pubsub.OpenTopic(ctx, topicName) require.NoError(t, err) defer topic.Shutdown(ctx) - sub, err := pubsub.OpenSubscription(ctx, "mem://mux-test") + sub, err := pubsub.OpenSubscription(ctx, topicName) require.NoError(t, err) defer sub.Shutdown(ctx) @@ -37,9 +39,6 @@ func TestMux(t *testing.T) { mux.Register(stream2) defer mux.Unregister(stream2) - // Give the mux a moment to register the streams - time.Sleep(50 * time.Millisecond) - // Send a message for stream1 err = topic.Send(ctx, &pubsub.Message{ Body: []byte("hello stream 1"), diff --git a/tavern/internal/http/stream/stream_test.go b/tavern/internal/http/stream/stream_test.go index 7a837e0ee..3d6fb02cd 100644 --- a/tavern/internal/http/stream/stream_test.go +++ b/tavern/internal/http/stream/stream_test.go @@ -17,10 +17,11 @@ func TestStream_SendMessage(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - topic, err := pubsub.OpenTopic(ctx, "mem://stream-test-send") + topicName := fmt.Sprintf("mem://stream-test-send-%d", time.Now().UnixNano()) + topic, err := pubsub.OpenTopic(ctx, topicName) require.NoError(t, err) defer topic.Shutdown(ctx) - sub, err := pubsub.OpenSubscription(ctx, "mem://stream-test-send") + sub, err := pubsub.OpenSubscription(ctx, topicName) require.NoError(t, err) defer sub.Shutdown(ctx) diff --git a/tavern/internal/http/stream/websocket_test.go b/tavern/internal/http/stream/websocket_test.go index 8e67d5421..d3cc7770f 100644 --- a/tavern/internal/http/stream/websocket_test.go +++ b/tavern/internal/http/stream/websocket_test.go @@ -2,6 +2,7 @@ package stream_test import ( "context" + "fmt" "net/http/httptest" "strconv" "strings" @@ -29,18 +30,20 @@ func TestNewShellHandler(t *testing.T) { defer cancel() // Topic for messages going TO the websocket (server -> shell) - outputTopic, err := pubsub.OpenTopic(ctx, "mem://websocket-output") + outputTopicName := fmt.Sprintf("mem://websocket-output-%d", time.Now().UnixNano()) + outputTopic, err := pubsub.OpenTopic(ctx, outputTopicName) require.NoError(t, err) defer outputTopic.Shutdown(ctx) - outputSub, err := pubsub.OpenSubscription(ctx, "mem://websocket-output") + outputSub, err := pubsub.OpenSubscription(ctx, outputTopicName) require.NoError(t, err) defer outputSub.Shutdown(ctx) // Topic for messages coming FROM the websocket (shell -> server) - inputTopic, err := pubsub.OpenTopic(ctx, "mem://websocket-input") + inputTopicName := fmt.Sprintf("mem://websocket-input-%d", time.Now().UnixNano()) + inputTopic, err := pubsub.OpenTopic(ctx, inputTopicName) require.NoError(t, err) defer inputTopic.Shutdown(ctx) - inputSub, err := pubsub.OpenSubscription(ctx, "mem://websocket-input") + inputSub, err := pubsub.OpenSubscription(ctx, inputTopicName) require.NoError(t, err) defer inputSub.Shutdown(ctx) diff --git a/tavern/internal/redirectors/grpc/grpc_test.go b/tavern/internal/redirectors/grpc/grpc_test.go index 5cc768189..6eb690d6c 100644 --- a/tavern/internal/redirectors/grpc/grpc_test.go +++ b/tavern/internal/redirectors/grpc/grpc_test.go @@ -155,8 +155,8 @@ func TestRedirector_ContextCancellation(t *testing.T) { serverErr <- redirector.Redirect(ctx, addr, upstreamConn) }() - // Wait a moment for the server to start listening. - time.Sleep(100 * time.Millisecond) + // Wait for the server to start listening. + waitForServer(t, addr) // Cancel the context, which should trigger GracefulStop. cancel() @@ -215,3 +215,17 @@ func TestRedirector_UpstreamFailure(t *testing.T) { require.True(t, ok, "error should be a gRPC status error") require.Equal(t, codes.Unavailable, s.Code(), "error code should be Unavailable") } + +func waitForServer(t *testing.T, addr string) { + t.Helper() + deadline := time.Now().Add(5 * time.Second) + for time.Now().Before(deadline) { + conn, err := net.DialTimeout("tcp", addr, 100*time.Millisecond) + if err == nil { + conn.Close() + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("server did not start listening on %s", addr) +}