Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 90 additions & 87 deletions transport/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,120 +13,61 @@ import (
"github.com/go-kit/kit/endpoint"
)

// HTTPClient is an interface that models *http.Client.
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}

// Client wraps a URL and provides a method that implements endpoint.Endpoint.
type Client struct {
client HTTPClient
method string
tgt *url.URL
enc EncodeRequestFunc
dec DecodeResponseFunc
before []RequestFunc
after []ClientResponseFunc
finalizer []ClientFinalizerFunc
bufferedStream bool
}

// NewClient constructs a usable Client for a single remote method.
func NewClient(
// NewClientEndpoint returns a usable endpoint that invokes the remote endpoint.
func NewClientEndpoint(
method string,
tgt *url.URL,
enc EncodeRequestFunc,
enc EncodeClientRequestFunc,
dec DecodeResponseFunc,
options ...ClientOption,
) *Client {
c := &Client{
options ...ClientEndpointOption,
) endpoint.Endpoint {
opts := &clientEndpointOpts{
client: http.DefaultClient,
method: method,
tgt: tgt,
enc: enc,
dec: dec,
before: []RequestFunc{},
after: []ClientResponseFunc{},
bufferedStream: false,
}
for _, option := range options {
option(c)
option(opts)
}
return c
}

// ClientOption sets an optional parameter for clients.
type ClientOption func(*Client)

// SetClient sets the underlying HTTP client used for requests.
// By default, http.DefaultClient is used.
func SetClient(client HTTPClient) ClientOption {
return func(c *Client) { c.client = client }
}

// ClientBefore sets the RequestFuncs that are applied to the outgoing HTTP
// request before it's invoked.
func ClientBefore(before ...RequestFunc) ClientOption {
return func(c *Client) { c.before = append(c.before, before...) }
}

// ClientAfter sets the ClientResponseFuncs applied to the incoming HTTP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func ClientAfter(after ...ClientResponseFunc) ClientOption {
return func(c *Client) { c.after = append(c.after, after...) }
}

// ClientFinalizer is executed at the end of every HTTP request.
// By default, no finalizer is registered.
func ClientFinalizer(f ...ClientFinalizerFunc) ClientOption {
return func(s *Client) { s.finalizer = append(s.finalizer, f...) }
}

// BufferedStream sets whether the Response.Body is left open, allowing it
// to be read from later. Useful for transporting a file as a buffered stream.
// That body has to be Closed to propery end the request.
func BufferedStream(buffered bool) ClientOption {
return func(c *Client) { c.bufferedStream = buffered }
}

// Endpoint returns a usable endpoint that invokes the remote endpoint.
func (c Client) Endpoint() endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)

var (
resp *http.Response
err error
)
if c.finalizer != nil {
if opts.finalizer != nil {
defer func() {
if resp != nil {
ctx = context.WithValue(ctx, ContextKeyResponseHeaders, resp.Header)
ctx = context.WithValue(ctx, ContextKeyResponseSize, resp.ContentLength)
}
for _, f := range c.finalizer {
for _, f := range opts.finalizer {
f(ctx, err)
}
}()
}

req, err := http.NewRequest(c.method, c.tgt.String(), nil)
req, err := enc(ctx, method, tgt.String(), request)
if err != nil {
cancel()
return nil, err
}

if err = c.enc(ctx, req, request); err != nil {
cancel()
return nil, err
// If user didn't bother to create a req object we have to set a default value for it ourselves.
if req == nil {
req, err = http.NewRequest(method, tgt.String(), nil)
if err != nil {
cancel()
return nil, err
}
}

for _, f := range c.before {
for _, f := range opts.before {
ctx = f(ctx, req)
}

resp, err = c.client.Do(req.WithContext(ctx))
resp, err = opts.client.Do(req.WithContext(ctx))

if err != nil {
cancel()
Expand All @@ -135,18 +76,18 @@ func (c Client) Endpoint() endpoint.Endpoint {

// If we expect a buffered stream, we don't cancel the context when the endpoint returns.
// Instead, we should call the cancel func when closing the response body.
if c.bufferedStream {
if opts.bufferedStream {
resp.Body = bodyWithCancel{ReadCloser: resp.Body, cancel: cancel}
} else {
defer resp.Body.Close()
defer cancel()
}

for _, f := range c.after {
for _, f := range opts.after {
ctx = f(ctx, resp)
}

response, err := c.dec(ctx, resp)
response, err := dec(ctx, resp)
if err != nil {
return nil, err
}
Expand All @@ -155,6 +96,54 @@ func (c Client) Endpoint() endpoint.Endpoint {
}
}

// ClientEndpointOption sets an optional parameter for client endpoint.
type ClientEndpointOption func(*clientEndpointOpts)

type clientEndpointOpts struct {
client HTTPClient
before []RequestFunc
after []ClientResponseFunc
finalizer []ClientFinalizerFunc
bufferedStream bool
}

// ClientEndpointSetClient sets the underlying HTTP client used for requests.
// By default, http.DefaultClient is used.
func ClientEndpointSetClient(client HTTPClient) ClientEndpointOption {
return func(e *clientEndpointOpts) { e.client = client }
}

// ClientEndpointBefore sets the RequestFuncs that are applied to the outgoing HTTP
// request before it's invoked.
func ClientEndpointBefore(before ...RequestFunc) ClientEndpointOption {
return func(e *clientEndpointOpts) { e.before = append(e.before, before...) }
}

// ClientEndpointAfter sets the ClientResponseFuncs applied to the incoming HTTP
// request prior to it being decoded. This is useful for obtaining anything off
// of the response and adding onto the context prior to decoding.
func ClientEndpointAfter(after ...ClientResponseFunc) ClientEndpointOption {
return func(e *clientEndpointOpts) { e.after = append(e.after, after...) }
}

// ClientEndpointFinalizer is executed at the end of every HTTP request.
// By default, no finalizer is registered.
func ClientEndpointFinalizer(f ...ClientFinalizerFunc) ClientEndpointOption {
return func(e *clientEndpointOpts) { e.finalizer = append(e.finalizer, f...) }
}

// ClientEndpointBufferedStream sets whether the Response.Body is left open, allowing it
// to be read from later. Useful for transporting a file as a buffered stream.
// That body has to be Closed to propery end the request.
func ClientEndpointBufferedStream(buffered bool) ClientEndpointOption {
return func(e *clientEndpointOpts) { e.bufferedStream = buffered }
}

// HTTPClient is an interface that models *http.Client.
type HTTPClient interface {
Do(req *http.Request) (*http.Response, error)
}

// bodyWithCancel is a wrapper for an io.ReadCloser with also a
// cancel function which is called when the Close is used
type bodyWithCancel struct {
Expand All @@ -177,11 +166,15 @@ func (bwc bodyWithCancel) Close() error {
// depending on when an error occurs.
type ClientFinalizerFunc func(ctx context.Context, err error)

// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
// EncodeJSONClientRequest is an EncodeRequestFunc that serializes the request as a
// JSON object to the Request body. Many JSON-over-HTTP services can use it as
// a sensible default. If the request implements Headerer, the provided headers
// will be applied to the request.
func EncodeJSONRequest(c context.Context, r *http.Request, request interface{}) error {
func EncodeJSONClientRequest(c context.Context, method, url string, request interface{}) (*http.Request, error) {
r, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
r.Header.Set("Content-Type", "application/json; charset=utf-8")
if headerer, ok := request.(Headerer); ok {
for k := range headerer.Headers() {
Expand All @@ -190,13 +183,20 @@ func EncodeJSONRequest(c context.Context, r *http.Request, request interface{})
}
var b bytes.Buffer
r.Body = ioutil.NopCloser(&b)
return json.NewEncoder(&b).Encode(request)
if err := json.NewEncoder(&b).Encode(request); err != nil {
return nil, err
}
return r, nil
}

// EncodeXMLRequest is an EncodeRequestFunc that serializes the request as a
// EncodeXMLClientRequest is an EncodeRequestFunc that serializes the request as a
// XML object to the Request body. If the request implements Headerer,
// the provided headers will be applied to the request.
func EncodeXMLRequest(c context.Context, r *http.Request, request interface{}) error {
func EncodeXMLClientRequest(c context.Context, method, url string, request interface{}) (*http.Request, error) {
r, err := http.NewRequest(method, url, nil)
if err != nil {
return nil, err
}
r.Header.Set("Content-Type", "text/xml; charset=utf-8")
if headerer, ok := request.(Headerer); ok {
for k := range headerer.Headers() {
Expand All @@ -205,5 +205,8 @@ func EncodeXMLRequest(c context.Context, r *http.Request, request interface{}) e
}
var b bytes.Buffer
r.Body = ioutil.NopCloser(&b)
return xml.NewEncoder(&b).Encode(request)
if err := xml.NewEncoder(&b).Encode(request); err != nil {
return nil, err
}
return r, nil
}
Loading