From 7742111742fc2ace46d6081bcd6b3997e053fcf3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justin=20Nu=C3=9F?= Date: Fri, 27 Apr 2018 20:43:05 +0200 Subject: [PATCH 1/2] Make sure connections are really closed after tests --- transport/nats/publisher_test.go | 47 ++++------- transport/nats/subscriber_test.go | 136 ++++++++++++++++-------------- 2 files changed, 87 insertions(+), 96 deletions(-) diff --git a/transport/nats/publisher_test.go b/transport/nats/publisher_test.go index 8468f1b2f..8ecd9e9a7 100644 --- a/transport/nats/publisher_test.go +++ b/transport/nats/publisher_test.go @@ -1,13 +1,13 @@ package nats_test import ( - "testing" "context" - "time" "strings" + "testing" + "time" - "github.com/nats-io/go-nats" natstransport "github.com/go-kit/kit/transport/nats" + "github.com/nats-io/go-nats" ) func TestPublisher(t *testing.T) { @@ -19,11 +19,8 @@ func TestPublisher(t *testing.T) { } ) - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil { @@ -66,11 +63,8 @@ func TestPublisherBefore(t *testing.T) { } ) - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, msg.Data); err != nil { @@ -117,11 +111,8 @@ func TestPublisherAfter(t *testing.T) { } ) - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil { @@ -167,11 +158,8 @@ func TestPublisherTimeout(t *testing.T) { } ) - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() ch := make(chan struct{}) defer close(ch) @@ -195,19 +183,14 @@ func TestPublisherTimeout(t *testing.T) { _, err = publisher.Endpoint()(context.Background(), struct{}{}) if err != context.DeadlineExceeded { t.Errorf("want %s, have %s", context.DeadlineExceeded, err) - } - } func TestEncodeJSONRequest(t *testing.T) { var data string - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { data = string(msg.Data) @@ -237,7 +220,9 @@ func TestEncodeJSONRequest(t *testing.T) { {1.2, "1.2"}, {true, "true"}, {"test", "\"test\""}, - {struct{ Foo string `json:"foo"` }{"foo"}, "{\"foo\":\"foo\"}"}, + {struct { + Foo string `json:"foo"` + }{"foo"}, "{\"foo\":\"foo\"}"}, } { if _, err := publisher(context.Background(), test.value); err != nil { t.Fatal(err) diff --git a/transport/nats/subscriber_test.go b/transport/nats/subscriber_test.go index a2ba160f3..ae4c69036 100644 --- a/transport/nats/subscriber_test.go +++ b/transport/nats/subscriber_test.go @@ -1,19 +1,19 @@ package nats_test import ( - "testing" "context" + "encoding/json" "errors" - "time" - "sync" "strings" - "encoding/json" + "sync" + "testing" + "time" - "github.com/nats-io/go-nats" "github.com/nats-io/gnatsd/server" + "github.com/nats-io/go-nats" - natstransport "github.com/go-kit/kit/transport/nats" "github.com/go-kit/kit/endpoint" + natstransport "github.com/go-kit/kit/transport/nats" ) type TestResponse struct { @@ -21,9 +21,13 @@ type TestResponse struct { Error string `json:"err"` } +var natsServer *server.Server + func init() { - opts := server.Options{Host: "localhost", Port: 4222} - natsServer := server.New(&opts) + natsServer = server.New(&server.Options{ + Host: "localhost", + Port: 4222, + }) go func() { natsServer.Start() @@ -34,12 +38,35 @@ func init() { } } -func TestSubscriberBadDecode(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) +func newNatsConn(t *testing.T) (*nats.Conn, func()) { + nc, err := nats.Connect("nats://"+natsServer.Addr().String(), nats.Name(t.Name())) if err != nil { - t.Fatal(err) + t.Fatalf("failed to connect to gnatsd server: %s", err) } - defer nc.Close() + + return nc, func() { + nc.Close() + + // Connections are closed asynchronously, so when the next test runs + // the previous connection could still be open and subscribed to a topic. + // To prevent this we wait for the server to remove the connection, before + // continuing with the next test. + for tries := 20; tries > 0; tries-- { + if natsServer.NumClients() == 0 { + break + } + + time.Sleep(5 * time.Millisecond) + } + + // Just fail instead of trying forever + t.Log("failed to close connection on the server") + } +} + +func TestSubscriberBadDecode(t *testing.T) { + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, @@ -56,11 +83,8 @@ func TestSubscriberBadDecode(t *testing.T) { } func TestSubscriberBadEndpoint(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") }, @@ -76,11 +100,8 @@ func TestSubscriberBadEndpoint(t *testing.T) { } func TestSubscriberBadEncode(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, @@ -96,11 +117,8 @@ func TestSubscriberBadEncode(t *testing.T) { } func TestSubscriberErrorEncoder(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() errTeapot := errors.New("teapot") code := func(err error) error { @@ -152,11 +170,8 @@ func TestSubscriberHappySubject(t *testing.T) { } func TestMultipleSubscriberBefore(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() var ( response = struct{ Body string }{"go eat a fly ugly\n"} @@ -216,11 +231,8 @@ func TestMultipleSubscriberBefore(t *testing.T) { } func TestMultipleSubscriberAfter(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() var ( response = struct{ Body string }{"go eat a fly ugly\n"} @@ -280,14 +292,15 @@ func TestMultipleSubscriberAfter(t *testing.T) { } func TestEncodeJSONResponse(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( - func(context.Context, interface{}) (interface{}, error) { return struct{ Foo string `json:"foo"` }{"bar"}, nil }, + func(context.Context, interface{}) (interface{}, error) { + return struct { + Foo string `json:"foo"` + }{"bar"}, nil + }, func(context.Context, *nats.Msg) (interface{}, error) { return struct{}{}, nil }, natstransport.EncodeJSONResponse, ) @@ -317,13 +330,12 @@ func (m responseError) Error() string { } func TestErrorEncoder(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() - errResp := struct{ Error string `json:"err"` }{"oh no"} + errResp := struct { + Error string `json:"err"` + }{"oh no"} handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return nil, responseError{msg: errResp.Error} @@ -355,11 +367,8 @@ func TestErrorEncoder(t *testing.T) { type noContentResponse struct{} func TestEncodeNoContent(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil }, @@ -384,11 +393,8 @@ func TestEncodeNoContent(t *testing.T) { } func TestNoOpRequestDecoder(t *testing.T) { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() handler := natstransport.NewSubscriber( func(ctx context.Context, request interface{}) (interface{}, error) { @@ -420,7 +426,10 @@ func TestNoOpRequestDecoder(t *testing.T) { func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) { var ( stepch = make(chan bool) - endpoint = func(context.Context, interface{}) (interface{}, error) { <-stepch; return struct{}{}, nil } + endpoint = func(context.Context, interface{}) (interface{}, error) { + <-stepch + return struct{}{}, nil + } response = make(chan *nats.Msg) handler = natstransport.NewSubscriber( endpoint, @@ -432,11 +441,8 @@ func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) { ) go func() { - nc, err := nats.Connect(nats.DefaultURL) - if err != nil { - t.Fatal(err) - } - defer nc.Close() + nc, closenc := newNatsConn(t) + defer closenc() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc)) if err != nil { From d3b0c3b0bd9f06d9c913be107c5806679bc9909f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Justin=20Nu=C3=9F?= Date: Fri, 27 Apr 2018 21:08:18 +0200 Subject: [PATCH 2/2] Check for subscriptions instead of connections --- transport/nats/publisher_test.go | 20 ++++---- transport/nats/subscriber_test.go | 80 +++++++++++++++---------------- 2 files changed, 49 insertions(+), 51 deletions(-) diff --git a/transport/nats/publisher_test.go b/transport/nats/publisher_test.go index 8ecd9e9a7..cbbe9d777 100644 --- a/transport/nats/publisher_test.go +++ b/transport/nats/publisher_test.go @@ -19,8 +19,8 @@ func TestPublisher(t *testing.T) { } ) - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil { @@ -63,8 +63,8 @@ func TestPublisherBefore(t *testing.T) { } ) - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, msg.Data); err != nil { @@ -111,8 +111,8 @@ func TestPublisherAfter(t *testing.T) { } ) - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { if err := nc.Publish(msg.Reply, []byte(testdata)); err != nil { @@ -158,8 +158,8 @@ func TestPublisherTimeout(t *testing.T) { } ) - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() ch := make(chan struct{}) defer close(ch) @@ -189,8 +189,8 @@ func TestPublisherTimeout(t *testing.T) { func TestEncodeJSONRequest(t *testing.T) { var data string - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", func(msg *nats.Msg) { data = string(msg.Data) diff --git a/transport/nats/subscriber_test.go b/transport/nats/subscriber_test.go index ae4c69036..4d44dbbd0 100644 --- a/transport/nats/subscriber_test.go +++ b/transport/nats/subscriber_test.go @@ -38,35 +38,33 @@ func init() { } } -func newNatsConn(t *testing.T) (*nats.Conn, func()) { +func newNatsConn(t *testing.T) *nats.Conn { + // Subscriptions and connections are closed asynchronously, so it's possible + // that there's still a subscription from an old connection that must be closed + // before the current test can be run. + for tries := 20; tries > 0; tries-- { + if natsServer.NumSubscriptions() == 0 { + break + } + + time.Sleep(5 * time.Millisecond) + } + + if n := natsServer.NumSubscriptions(); n > 0 { + t.Fatalf("found %d active subscriptions on the server", n) + } + nc, err := nats.Connect("nats://"+natsServer.Addr().String(), nats.Name(t.Name())) if err != nil { t.Fatalf("failed to connect to gnatsd server: %s", err) } - return nc, func() { - nc.Close() - - // Connections are closed asynchronously, so when the next test runs - // the previous connection could still be open and subscribed to a topic. - // To prevent this we wait for the server to remove the connection, before - // continuing with the next test. - for tries := 20; tries > 0; tries-- { - if natsServer.NumClients() == 0 { - break - } - - time.Sleep(5 * time.Millisecond) - } - - // Just fail instead of trying forever - t.Log("failed to close connection on the server") - } + return nc } func TestSubscriberBadDecode(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, @@ -83,8 +81,8 @@ func TestSubscriberBadDecode(t *testing.T) { } func TestSubscriberBadEndpoint(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("dang") }, @@ -100,8 +98,8 @@ func TestSubscriberBadEndpoint(t *testing.T) { } func TestSubscriberBadEncode(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, @@ -117,8 +115,8 @@ func TestSubscriberBadEncode(t *testing.T) { } func TestSubscriberErrorEncoder(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() errTeapot := errors.New("teapot") code := func(err error) error { @@ -170,8 +168,8 @@ func TestSubscriberHappySubject(t *testing.T) { } func TestMultipleSubscriberBefore(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() var ( response = struct{ Body string }{"go eat a fly ugly\n"} @@ -231,8 +229,8 @@ func TestMultipleSubscriberBefore(t *testing.T) { } func TestMultipleSubscriberAfter(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() var ( response = struct{ Body string }{"go eat a fly ugly\n"} @@ -292,8 +290,8 @@ func TestMultipleSubscriberAfter(t *testing.T) { } func TestEncodeJSONResponse(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { @@ -330,8 +328,8 @@ func (m responseError) Error() string { } func TestErrorEncoder(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() errResp := struct { Error string `json:"err"` @@ -367,8 +365,8 @@ func TestErrorEncoder(t *testing.T) { type noContentResponse struct{} func TestEncodeNoContent(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(context.Context, interface{}) (interface{}, error) { return noContentResponse{}, nil }, @@ -393,8 +391,8 @@ func TestEncodeNoContent(t *testing.T) { } func TestNoOpRequestDecoder(t *testing.T) { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() handler := natstransport.NewSubscriber( func(ctx context.Context, request interface{}) (interface{}, error) { @@ -441,8 +439,8 @@ func testSubscriber(t *testing.T) (step func(), resp <-chan *nats.Msg) { ) go func() { - nc, closenc := newNatsConn(t) - defer closenc() + nc := newNatsConn(t) + defer nc.Close() sub, err := nc.QueueSubscribe("natstransport.test", "natstransport", handler.ServeMsg(nc)) if err != nil {