From c8e2326f2c5980a296c77f3913eef98fcc73231e Mon Sep 17 00:00:00 2001 From: xescugc Date: Thu, 18 Oct 2018 20:16:49 +0200 Subject: [PATCH 1/4] transport/http/client_test: Modify the test to make it fail With this modifications we can trigger the error that we are searching, 'context canceled' --- transport/http/client_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/http/client_test.go b/transport/http/client_test.go index e31d201cc..ebfaae5a1 100644 --- a/transport/http/client_test.go +++ b/transport/http/client_test.go @@ -99,7 +99,7 @@ func TestHTTPClient(t *testing.T) { func TestHTTPClientBufferedStream(t *testing.T) { var ( - testbody = "testbody" + testbody = string(make([]byte, 6000)) encode = func(context.Context, *http.Request, interface{}) error { return nil } decode = func(_ context.Context, r *http.Response) (interface{}, error) { return TestResponse{r.Body, ""}, nil @@ -129,6 +129,7 @@ func TestHTTPClientBufferedStream(t *testing.T) { if !ok { t.Fatal("response should be TestResponse") } + time.Sleep(time.Second * 1) // Check that response body was NOT closed b := make([]byte, len(testbody)) From 1fd618d8999dc97a4020b7eed42afbb7c0ef9524 Mon Sep 17 00:00:00 2001 From: xescugc Date: Thu, 18 Oct 2018 20:22:11 +0200 Subject: [PATCH 2/4] transport/http/client: Add the 'bodyWithCancel' to wrap the Response.Body It adds the context.CancelFunc to the io.ReadCloser.Close function so bouth are called together --- transport/http/client.go | 24 ++++++++++++++++++++++-- transport/http/client_test.go | 1 + 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/transport/http/client.go b/transport/http/client.go index eca566300..0980cfcc0 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "encoding/xml" + "io" "io/ioutil" "net/http" "net/url" @@ -92,7 +93,6 @@ func BufferedStream(buffered bool) ClientOption { func (c Client) Endpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { ctx, cancel := context.WithCancel(ctx) - defer cancel() var ( resp *http.Response @@ -112,10 +112,12 @@ func (c Client) Endpoint() endpoint.Endpoint { req, err := http.NewRequest(c.method, c.tgt.String(), nil) if err != nil { + cancel() return nil, err } if err = c.enc(ctx, req, request); err != nil { + cancel() return nil, err } @@ -126,11 +128,15 @@ func (c Client) Endpoint() endpoint.Endpoint { resp, err = c.client.Do(req.WithContext(ctx)) if err != nil { + cancel() return nil, err } - if !c.bufferedStream { + if c.bufferedStream { + resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel} + } else { defer resp.Body.Close() + defer cancel() } for _, f := range c.after { @@ -146,6 +152,20 @@ func (c Client) Endpoint() endpoint.Endpoint { } } +// bodyWithCancel is a wrapper for an io.ReadCloser with also a +// cancel function which is called when the Close is used +type bodyWithCancel struct { + io.ReadCloser + + cancel context.CancelFunc +} + +func (bwc bodyWithCancel) Close() error { + bwc.ReadCloser.Close() + bwc.cancel() + return nil +} + // ClientFinalizerFunc can be used to perform work at the end of a client HTTP // request, after the response is returned. The principal // intended use is for error logging. Additional response parameters are diff --git a/transport/http/client_test.go b/transport/http/client_test.go index ebfaae5a1..ee4384063 100644 --- a/transport/http/client_test.go +++ b/transport/http/client_test.go @@ -129,6 +129,7 @@ func TestHTTPClientBufferedStream(t *testing.T) { if !ok { t.Fatal("response should be TestResponse") } + defer response.Body.Close() time.Sleep(time.Second * 1) // Check that response body was NOT closed From d5a7f52b3394aa0c1f3ebd1b0d167d3068bb0562 Mon Sep 17 00:00:00 2001 From: xescugc Date: Sat, 20 Oct 2018 20:29:08 +0200 Subject: [PATCH 3/4] transport/http/client: Add more documentation to clarify the changes Also abstracted some logic on the test to make it more clear and also added more docuemntation. Added more documentation on the definition of 'BufferedStream' to clarify that the Body has to be closed manually to properly close the response. --- transport/http/client.go | 3 +++ transport/http/client_test.go | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/transport/http/client.go b/transport/http/client.go index 0980cfcc0..f65723611 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -85,6 +85,7 @@ func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { // BufferedStream sets whether the Response.Body is left open, allowing it // to be read from later. Useful for transporting a file as a buffered stream. +// That body has to be Closed to propery end the request func BufferedStream(buffered bool) ClientOption { return func(c *Client) { c.bufferedStream = buffered } } @@ -132,6 +133,8 @@ func (c Client) Endpoint() endpoint.Endpoint { return nil, err } + // If we expect a buffered stream, we don't cancel the context when the endpoint returns. + // Instead, we should call the cancel func when closing the response body. if c.bufferedStream { resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel} } else { diff --git a/transport/http/client_test.go b/transport/http/client_test.go index ee4384063..9ec1a6caf 100644 --- a/transport/http/client_test.go +++ b/transport/http/client_test.go @@ -98,8 +98,12 @@ func TestHTTPClient(t *testing.T) { } func TestHTTPClientBufferedStream(t *testing.T) { + // bodysize has a size big enought to make the resopnse.Body not an instant read + // so if the response is cancelled it wount be all readed and the test would fail + // The 6000 has not a particular meaning, it big enough to fulfill the usecase. + const bodysize = 6000 var ( - testbody = string(make([]byte, 6000)) + testbody = string(make([]byte, bodysize)) encode = func(context.Context, *http.Request, interface{}) error { return nil } decode = func(_ context.Context, r *http.Response) (interface{}, error) { return TestResponse{r.Body, ""}, nil @@ -130,6 +134,7 @@ func TestHTTPClientBufferedStream(t *testing.T) { t.Fatal("response should be TestResponse") } defer response.Body.Close() + // Faking work time.Sleep(time.Second * 1) // Check that response body was NOT closed From f37c50ffdec600daec38b6f6c5ecac9c73f2fdb9 Mon Sep 17 00:00:00 2001 From: xescugc Date: Sat, 20 Oct 2018 21:15:42 +0200 Subject: [PATCH 4/4] transport/http/client: Add period at the end of the doc of 'BufferedStream' --- transport/http/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http/client.go b/transport/http/client.go index f65723611..4cd8f27a8 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -85,7 +85,7 @@ func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption { // BufferedStream sets whether the Response.Body is left open, allowing it // to be read from later. Useful for transporting a file as a buffered stream. -// That body has to be Closed to propery end the request +// That body has to be Closed to propery end the request. func BufferedStream(buffered bool) ClientOption { return func(c *Client) { c.bufferedStream = buffered } }