From 6f3c5896cf6e8f452f2efe0db1cd49727d019e0d Mon Sep 17 00:00:00 2001 From: Austin Lin Date: Sun, 28 Jan 2018 03:07:42 -0800 Subject: [PATCH 1/5] Add support for Twirp as a Transport Twirp (https://github.com/twitchtv/twirp) is an RPC system that works over HTTP/1.1 using either JSON or Protobuf. It's leverages protobuf for generating servers and clients. --- transport/twirp/client.go | 103 +++++++++++++++++++++ transport/twirp/doc.go | 2 + transport/twirp/encode_decode.go | 29 ++++++ transport/twirp/request_response_funcs.go | 41 +++++++++ transport/twirp/server.go | 105 ++++++++++++++++++++++ 5 files changed, 280 insertions(+) create mode 100644 transport/twirp/client.go create mode 100644 transport/twirp/doc.go create mode 100644 transport/twirp/encode_decode.go create mode 100644 transport/twirp/request_response_funcs.go create mode 100644 transport/twirp/server.go diff --git a/transport/twirp/client.go b/transport/twirp/client.go new file mode 100644 index 000000000..4b3d7067f --- /dev/null +++ b/transport/twirp/client.go @@ -0,0 +1,103 @@ +package twirp + +import ( + "context" + + "github.com/go-kit/kit/endpoint" +) + +// rpcFn is a simple type to represent an RPC method on the Twirp client. +type rpcFn func(context.Context, interface{}) (interface{}, error) + +// Client wraps a Twirp client and provides a method that implements endpoint.Endpoint. +type Client struct { + rpcFn rpcFn + enc EncodeRequestFunc + dec DecodeResponseFunc + before []ClientRequestFunc + after []ClientResponseFunc +} + +// NewClient constructs a usable Client for a single remote method. +func NewClient( + rpcFn rpcFn, + enc EncodeRequestFunc, + dec DecodeResponseFunc, + options ...ClientOption, +) *Client { + c := &Client{ + rpcFn: rpcFn, + enc: enc, + dec: dec, + before: []ClientRequestFunc{}, + after: []ClientResponseFunc{}, + } + for _, option := range options { + option(c) + } + return c +} + +// ClientOption sets an optional parameter for clients. +type ClientOption func(*Client) + +// ClientBefore sets the ClientRequestFunc that are applied to the outgoing HTTP +// request before it's invoked. +func ClientBefore(before ...ClientRequestFunc) ClientOption { + return func(c *Client) { c.before = append(c.before, before...) } +} + +// ClientAfter sets the ClientResponseFuncs applied to the incoming HTTP +// request prior to it being decoded. This is useful for obtaining anything off +// of the response and adding onto the context prior to decoding. +func ClientAfter(after ...ClientResponseFunc) ClientOption { + return func(c *Client) { c.after = append(c.after, after...) } +} + +// Endpoint returns a usable endpoint that invokes the remote endpoint. +func (c Client) Endpoint() endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Encode + var ( + req interface{} + err error + ) + req, err = c.enc(ctx, request) + if err != nil { + return nil, err + } + + // Process ClientRequestFunctions + for _, f := range c.before { + ctx, err = f(ctx) + if err != nil { + return nil, err + } + } + + // Call the actual RPC method + resp, err := c.rpcFn(ctx, req) + if err != nil { + return nil, err + } + + // Process ClientResponseFunctions + for _, f := range c.after { + ctx, err = f(ctx) + if err != nil { + return nil, err + } + } + + // Decode + response, err := c.dec(ctx, resp) + if err != nil { + return nil, err + } + + return response, nil + } +} diff --git a/transport/twirp/doc.go b/transport/twirp/doc.go new file mode 100644 index 000000000..21388dd04 --- /dev/null +++ b/transport/twirp/doc.go @@ -0,0 +1,2 @@ +// Package twirp provides a general purpose Twirp binding for endpoints. +package twirp diff --git a/transport/twirp/encode_decode.go b/transport/twirp/encode_decode.go new file mode 100644 index 000000000..d3d213166 --- /dev/null +++ b/transport/twirp/encode_decode.go @@ -0,0 +1,29 @@ +package twirp + +import ( + "context" +) + +// DecodeRequestFunc extracts a user-domain request object from a Twirp request. +// It's designed to be used in Twirp servers, for server-side endpoints. One +// straightforward DecodeRequestFunc could be something that decodes from the +// Twirp request message to the concrete request type. +type DecodeRequestFunc func(context.Context, interface{}) (request interface{}, err error) + +// EncodeRequestFunc encodes the passed request object into the Twirp request +// object. It's designed to be used in Twirp clients, for client-side endpoints. +// One straightforward EncodeRequestFunc could something that encodes the object +// directly to the Twirp request message. +type EncodeRequestFunc func(context.Context, interface{}) (request interface{}, err error) + +// EncodeResponseFunc encodes the passed response object to the Twirp response +// message. It's designed to be used in Twirp servers, for server-side endpoints. +// One straightforward EncodeResponseFunc could be something that encodes the +// object directly to the Twirp response message. +type EncodeResponseFunc func(context.Context, interface{}) (response interface{}, err error) + +// DecodeResponseFunc extracts a user-domain response object from a Twirp +// response object. It's designed to be used in Twirp clients, for client-side +// endpoints. One straightforward DecodeResponseFunc could be something that +// decodes from the Twirp response message to the concrete response type. +type DecodeResponseFunc func(context.Context, interface{}) (response interface{}, err error) diff --git a/transport/twirp/request_response_funcs.go b/transport/twirp/request_response_funcs.go new file mode 100644 index 000000000..7d80abbea --- /dev/null +++ b/transport/twirp/request_response_funcs.go @@ -0,0 +1,41 @@ +package twirp + +import ( + "context" + "github.com/twitchtv/twirp" + "net/http" +) + +// ClientRequestFunc may modify the context. ClientRequestFuncs are executed +// after creating the request but prior to sending the Twirp request to +// the server. +type ClientRequestFunc func(context.Context) (context.Context, error) + +// ServerRequestFunc may take information from the context. ServerRequestFuncs are +// executed prior to invoking the endpoint. +type ServerRequestFunc func(context.Context) context.Context + +// ServerResponseFunc may modify the context. ServerResponseFuncs are only executed in +// servers, after invoking the endpoint but prior to writing a response. +type ServerResponseFunc func(context.Context) (context.Context, error) + +// ClientResponseFunc may take information from the context. ClientResponseFuncs are only executed in +// clients, after a request has been made, but prior to it being decoded. +type ClientResponseFunc func(context.Context) (context.Context, error) + +// SetResponseHeader returns a ServerResponseFunc that sets the given header. +func SetResponseHeader(key, val string) ServerResponseFunc { + return func(ctx context.Context) (context.Context, error) { + err := twirp.SetHTTPResponseHeader(ctx, key, val) + return ctx, err + } +} + +// SetRequestHeader returns a RequestFunc that sets the given header. +func SetRequestHeader(key, val string) ClientRequestFunc { + h := &http.Header{} + h.Set(key, val) + return func(ctx context.Context) (context.Context, error) { + return twirp.WithHTTPRequestHeaders(ctx, *h) + } +} diff --git a/transport/twirp/server.go b/transport/twirp/server.go new file mode 100644 index 000000000..8269baa65 --- /dev/null +++ b/transport/twirp/server.go @@ -0,0 +1,105 @@ +package twirp + +import ( + "context" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" +) + +// Handler which should be called from the Twirp binding of the service +// implementation. The incoming request parameter, and returned response +// parameter, are both Twirp types, not user-domain. +type Handler interface { + ServeTwirp(ctx context.Context, request interface{}) (context.Context, interface{}, error) +} + +// Server wraps an endpoint and implements Twirp Handler. +type Server struct { + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + logger log.Logger +} + +// NewServer constructs a new server, which implements wraps the provided +// endpoint and implements the Handler interface. Consumers should write +// bindings that adapt the concrete Twirp methods from their compiled protobuf +// definitions to individual handlers. Request and response objects are from the +// caller business domain, not Twirp request and reply types. +func NewServer( + e endpoint.Endpoint, + dec DecodeRequestFunc, + enc EncodeResponseFunc, + options ...ServerOption, +) *Server { + s := &Server{ + e: e, + dec: dec, + enc: enc, + logger: log.NewNopLogger(), + } + for _, option := range options { + option(s) + } + return s +} + +// ServerOption sets an optional parameter for servers. +type ServerOption func(*Server) + +// ServerBefore functions are executed on the HTTP request object before the +// request is decoded. +func ServerBefore(before ...ServerRequestFunc) ServerOption { + return func(s *Server) { s.before = append(s.before, before...) } +} + +// ServerAfter functions are executed on the HTTP response writer after the +// endpoint is invoked, but before anything is written to the client. +func ServerAfter(after ...ServerResponseFunc) ServerOption { + return func(s *Server) { s.after = append(s.after, after...) } +} + +// ServerErrorLogger is used to log non-terminal errors. By default, no errors +// are logged. +func ServerErrorLogger(logger log.Logger) ServerOption { + return func(s *Server) { s.logger = logger } +} + +// ServeTwirp implements the Handler interface. +func (s Server) ServeTwirp(ctx context.Context, req interface{}) (context.Context, interface{}, error) { + + // Process ServerRequestFunctions + for _, f := range s.before { + ctx = f(ctx) + } + request, err := s.dec(ctx, req) + if err != nil { + s.logger.Log("err", err) + return ctx, nil, err + } + + response, err := s.e(ctx, request) + if err != nil { + s.logger.Log("err", err) + return ctx, nil, err + } + + // Process ServerResponseFunctions + for _, f := range s.after { + ctx, err = f(ctx) + if err != nil { + return ctx, nil, err + } + } + + twirpResp, err := s.enc(ctx, response) + if err != nil { + s.logger.Log("err", err) + return ctx, nil, err + } + + return ctx, twirpResp, nil +} From b05c58a17ccbd8f8c5342d54f54ec60a31e81374 Mon Sep 17 00:00:00 2001 From: Austin Lin Date: Mon, 29 Jan 2018 20:59:08 -0800 Subject: [PATCH 2/5] Use endpoint.Endpoint rather than a custom type --- transport/twirp/client.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/transport/twirp/client.go b/transport/twirp/client.go index 4b3d7067f..b7c89e603 100644 --- a/transport/twirp/client.go +++ b/transport/twirp/client.go @@ -6,21 +6,18 @@ import ( "github.com/go-kit/kit/endpoint" ) -// rpcFn is a simple type to represent an RPC method on the Twirp client. -type rpcFn func(context.Context, interface{}) (interface{}, error) - // Client wraps a Twirp client and provides a method that implements endpoint.Endpoint. type Client struct { - rpcFn rpcFn - enc EncodeRequestFunc - dec DecodeResponseFunc - before []ClientRequestFunc - after []ClientResponseFunc + rpcFn endpoint.Endpoint + enc EncodeRequestFunc + dec DecodeResponseFunc + before []ClientRequestFunc + after []ClientResponseFunc } // NewClient constructs a usable Client for a single remote method. func NewClient( - rpcFn rpcFn, + rpcFn endpoint.Endpoint, enc EncodeRequestFunc, dec DecodeResponseFunc, options ...ClientOption, From f2441469e3982e727d51679243b46e34fed14bbb Mon Sep 17 00:00:00 2001 From: Austin Lin Date: Mon, 29 Jan 2018 21:05:08 -0800 Subject: [PATCH 3/5] Add finalizers --- transport/twirp/client.go | 28 +++++++++++++++++++++++++--- transport/twirp/server.go | 25 +++++++++++++++++++------ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/transport/twirp/client.go b/transport/twirp/client.go index b7c89e603..045404a55 100644 --- a/transport/twirp/client.go +++ b/transport/twirp/client.go @@ -13,6 +13,7 @@ type Client struct { dec DecodeResponseFunc before []ClientRequestFunc after []ClientResponseFunc + finalizer ClientFinalizerFunc } // NewClient constructs a usable Client for a single remote method. @@ -38,30 +39,44 @@ func NewClient( // ClientOption sets an optional parameter for clients. type ClientOption func(*Client) -// ClientBefore sets the ClientRequestFunc that are applied to the outgoing HTTP +// ClientBefore sets the ClientRequestFunc that are applied to the outgoing // request before it's invoked. func ClientBefore(before ...ClientRequestFunc) ClientOption { return func(c *Client) { c.before = append(c.before, before...) } } -// ClientAfter sets the ClientResponseFuncs applied to the incoming HTTP +// ClientAfter sets the ClientResponseFuncs applied to the incoming // request prior to it being decoded. This is useful for obtaining anything off // of the response and adding onto the context prior to decoding. func ClientAfter(after ...ClientResponseFunc) ClientOption { return func(c *Client) { c.after = append(c.after, after...) } } +// ClientFinalizer is executed at the end of every request. +// By default, no finalizer is registered. +func ClientFinalizer(f ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = f } +} + // Endpoint returns a usable endpoint that invokes the remote endpoint. func (c Client) Endpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { ctx, cancel := context.WithCancel(ctx) defer cancel() - // Encode var ( req interface{} err error ) + + // Process ClientFinalizers + if c.finalizer != nil { + defer func() { + c.finalizer(ctx, err) + }() + } + + // Encode req, err = c.enc(ctx, request) if err != nil { return nil, err @@ -98,3 +113,10 @@ func (c Client) Endpoint() endpoint.Endpoint { return response, nil } } + +// ClientFinalizerFunc can be used to perform work at the end of a client +// request, after the response is returned. The principal +// intended use is for error logging. Note: err may be nil. +// There maybe also no additional response parameters depending on when +// an error occurs. +type ClientFinalizerFunc func(ctx context.Context, err error) diff --git a/transport/twirp/server.go b/transport/twirp/server.go index 8269baa65..5ae3708ab 100644 --- a/transport/twirp/server.go +++ b/transport/twirp/server.go @@ -16,12 +16,13 @@ type Handler interface { // Server wraps an endpoint and implements Twirp Handler. type Server struct { - e endpoint.Endpoint - dec DecodeRequestFunc - enc EncodeResponseFunc - before []ServerRequestFunc - after []ServerResponseFunc - logger log.Logger + e endpoint.Endpoint + dec DecodeRequestFunc + enc EncodeResponseFunc + before []ServerRequestFunc + after []ServerResponseFunc + finalizer ServerFinalizerFunc + logger log.Logger } // NewServer constructs a new server, which implements wraps the provided @@ -71,6 +72,13 @@ func ServerErrorLogger(logger log.Logger) ServerOption { // ServeTwirp implements the Handler interface. func (s Server) ServeTwirp(ctx context.Context, req interface{}) (context.Context, interface{}, error) { + // Process ServerFinalizerFunctions + if s.finalizer != nil { + defer func() { + s.finalizer(ctx, req) + }() + } + // Process ServerRequestFunctions for _, f := range s.before { ctx = f(ctx) @@ -103,3 +111,8 @@ func (s Server) ServeTwirp(ctx context.Context, req interface{}) (context.Contex return ctx, twirpResp, nil } + +// ServerFinalizerFunc can be used to perform work at the end of a +// request, after the response has been written to the client. The principal +// intended use is for request logging. +type ServerFinalizerFunc func(ctx context.Context, req interface{}) From f450c09e52615fb5d16202cfd29cdbc8536ada0e Mon Sep 17 00:00:00 2001 From: Austin Lin Date: Sun, 11 Feb 2018 17:08:19 -0800 Subject: [PATCH 4/5] Make interacting with headers simpler --- transport/twirp/client.go | 21 ++++++++++-------- transport/twirp/request_response_funcs.go | 26 +++++++---------------- transport/twirp/server.go | 23 ++++++++++++++------ 3 files changed, 36 insertions(+), 34 deletions(-) diff --git a/transport/twirp/client.go b/transport/twirp/client.go index 045404a55..54ef1e0a5 100644 --- a/transport/twirp/client.go +++ b/transport/twirp/client.go @@ -2,8 +2,9 @@ package twirp import ( "context" - "github.com/go-kit/kit/endpoint" + "github.com/twitchtv/twirp" + "net/http" ) // Client wraps a Twirp client and provides a method that implements endpoint.Endpoint. @@ -82,12 +83,17 @@ func (c Client) Endpoint() endpoint.Endpoint { return nil, err } + // Create an empty http.Header to hold the headers that we will accumulate in before functions. + var reqHeader http.Header // Process ClientRequestFunctions for _, f := range c.before { - ctx, err = f(ctx) - if err != nil { - return nil, err - } + ctx = f(ctx, &reqHeader) + } + + // Tell twirp to use these headers in the request. + ctx, err = twirp.WithHTTPRequestHeaders(ctx, reqHeader) + if err != nil { + return nil, err } // Call the actual RPC method @@ -98,10 +104,7 @@ func (c Client) Endpoint() endpoint.Endpoint { // Process ClientResponseFunctions for _, f := range c.after { - ctx, err = f(ctx) - if err != nil { - return nil, err - } + ctx = f(ctx) } // Decode diff --git a/transport/twirp/request_response_funcs.go b/transport/twirp/request_response_funcs.go index 7d80abbea..2c2ebe8f9 100644 --- a/transport/twirp/request_response_funcs.go +++ b/transport/twirp/request_response_funcs.go @@ -2,40 +2,30 @@ package twirp import ( "context" - "github.com/twitchtv/twirp" "net/http" ) // ClientRequestFunc may modify the context. ClientRequestFuncs are executed // after creating the request but prior to sending the Twirp request to // the server. -type ClientRequestFunc func(context.Context) (context.Context, error) +type ClientRequestFunc func(context.Context, *http.Header) context.Context // ServerRequestFunc may take information from the context. ServerRequestFuncs are // executed prior to invoking the endpoint. -type ServerRequestFunc func(context.Context) context.Context +type ServerRequestFunc func(context.Context, http.Header) context.Context // ServerResponseFunc may modify the context. ServerResponseFuncs are only executed in // servers, after invoking the endpoint but prior to writing a response. -type ServerResponseFunc func(context.Context) (context.Context, error) +type ServerResponseFunc func(context.Context) context.Context // ClientResponseFunc may take information from the context. ClientResponseFuncs are only executed in // clients, after a request has been made, but prior to it being decoded. -type ClientResponseFunc func(context.Context) (context.Context, error) +type ClientResponseFunc func(context.Context) context.Context -// SetResponseHeader returns a ServerResponseFunc that sets the given header. -func SetResponseHeader(key, val string) ServerResponseFunc { - return func(ctx context.Context) (context.Context, error) { - err := twirp.SetHTTPResponseHeader(ctx, key, val) - return ctx, err - } -} - -// SetRequestHeader returns a RequestFunc that sets the given header. +// SetRequestHeader returns a RequestFunc that sets the given header. It uses the standard net/http/header Add function and will append the specified value if others already exist. func SetRequestHeader(key, val string) ClientRequestFunc { - h := &http.Header{} - h.Set(key, val) - return func(ctx context.Context) (context.Context, error) { - return twirp.WithHTTPRequestHeaders(ctx, *h) + return func(ctx context.Context, header *http.Header) context.Context { + header.Add(key, val) + return ctx } } diff --git a/transport/twirp/server.go b/transport/twirp/server.go index 5ae3708ab..1cfadad8c 100644 --- a/transport/twirp/server.go +++ b/transport/twirp/server.go @@ -3,8 +3,11 @@ package twirp import ( "context" + "errors" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" + "github.com/twitchtv/twirp" + "net/http" ) // Handler which should be called from the Twirp binding of the service @@ -78,10 +81,20 @@ func (s Server) ServeTwirp(ctx context.Context, req interface{}) (context.Contex s.finalizer(ctx, req) }() } - + // Extract the headers from the ctx + var ( + reqHeader http.Header + ok bool + ) + reqHeader, ok = twirp.HTTPRequestHeaders(ctx) + if !ok { + err := errors.New("error extracting http headers from Twirp Context (twirptransport.HTTPRequestHeaders)") + s.logger.Log("err", err) + return ctx, nil, err + } // Process ServerRequestFunctions for _, f := range s.before { - ctx = f(ctx) + ctx = f(ctx, reqHeader) } request, err := s.dec(ctx, req) if err != nil { @@ -97,12 +110,8 @@ func (s Server) ServeTwirp(ctx context.Context, req interface{}) (context.Contex // Process ServerResponseFunctions for _, f := range s.after { - ctx, err = f(ctx) - if err != nil { - return ctx, nil, err - } + ctx = f(ctx) } - twirpResp, err := s.enc(ctx, response) if err != nil { s.logger.Log("err", err) From 36868d3b32a694d9c109080f843b9a8c8426c4db Mon Sep 17 00:00:00 2001 From: Austin Lin Date: Sun, 11 Feb 2018 17:57:43 -0800 Subject: [PATCH 5/5] Use reflection to handle client method calling --- transport/twirp/client.go | 28 +++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/transport/twirp/client.go b/transport/twirp/client.go index 54ef1e0a5..7ffb7e7ab 100644 --- a/transport/twirp/client.go +++ b/transport/twirp/client.go @@ -2,14 +2,17 @@ package twirp import ( "context" + "fmt" "github.com/go-kit/kit/endpoint" "github.com/twitchtv/twirp" "net/http" + "reflect" ) // Client wraps a Twirp client and provides a method that implements endpoint.Endpoint. type Client struct { - rpcFn endpoint.Endpoint + client interface{} + method string enc EncodeRequestFunc dec DecodeResponseFunc before []ClientRequestFunc @@ -19,13 +22,15 @@ type Client struct { // NewClient constructs a usable Client for a single remote method. func NewClient( - rpcFn endpoint.Endpoint, + client interface{}, + method string, enc EncodeRequestFunc, dec DecodeResponseFunc, options ...ClientOption, ) *Client { c := &Client{ - rpcFn: rpcFn, + client: client, + method: method, enc: enc, dec: dec, before: []ClientRequestFunc{}, @@ -96,8 +101,21 @@ func (c Client) Endpoint() endpoint.Endpoint { return nil, err } - // Call the actual RPC method - resp, err := c.rpcFn(ctx, req) + client := reflect.ValueOf(&c.client) + method := client.MethodByName(c.method) + if !method.IsValid() { + interfaceName := reflect.TypeOf(&c.client).Elem().Name() + return nil, fmt.Errorf("Invalid method specified: %s does not have method %s", interfaceName, c.method) + } + + args := make([]reflect.Value, 2) + args[0] = reflect.ValueOf(ctx) + args[1] = reflect.ValueOf(req) + + retVals := make([]reflect.Value, 2) + retVals = method.Call(args) + resp := retVals[0].Interface() + err = retVals[1].Interface().(error) if err != nil { return nil, err }