From 673600c300987975dcbf1481c5240a7432e56fae Mon Sep 17 00:00:00 2001 From: Blain Smith Date: Tue, 7 Feb 2017 10:51:00 -0500 Subject: [PATCH 01/40] first pass at JSON RPC HTTP transport --- transport/http/jsonrpc/encode_decode.go | 31 +++ transport/http/jsonrpc/error.go | 72 +++++++ .../http/jsonrpc/request_response_types.go | 28 +++ transport/http/jsonrpc/server.go | 202 ++++++++++++++++++ 4 files changed, 333 insertions(+) create mode 100644 transport/http/jsonrpc/encode_decode.go create mode 100644 transport/http/jsonrpc/error.go create mode 100644 transport/http/jsonrpc/request_response_types.go create mode 100644 transport/http/jsonrpc/server.go diff --git a/transport/http/jsonrpc/encode_decode.go b/transport/http/jsonrpc/encode_decode.go new file mode 100644 index 000000000..cda66dbff --- /dev/null +++ b/transport/http/jsonrpc/encode_decode.go @@ -0,0 +1,31 @@ +package jsonrpc + +import ( + "encoding/json" + + "golang.org/x/net/context" +) + +// DecodeRequestFunc extracts a user-domain request object from an HTTP +// request object. It's designed to be used in HTTP servers, for server-side +// endpoints. One straightforward DecodeRequestFunc could be something that +// JSON decodes from the request body to the concrete response type. +type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error) + +// EncodeRequestFunc encodes the passed request object into the HTTP request +// object. It's designed to be used in HTTP clients, for client-side +// endpoints. One straightforward EncodeRequestFunc could something that JSON +// encodes the object directly to the request body. +// type EncodeRequestFunc func(context.Context, *http.Request, interface{}) error + +// EncodeResponseFunc encodes the passed response object to the HTTP response +// writer. It's designed to be used in HTTP servers, for server-side +// endpoints. One straightforward EncodeResponseFunc could be something that +// JSON encodes the object directly to the response body. +type EncodeResponseFunc func(context.Context, interface{}) (response json.RawMessage, err error) + +// DecodeResponseFunc extracts a user-domain response object from an HTTP +// response object. It's designed to be used in HTTP clients, for client-side +// endpoints. One straightforward DecodeResponseFunc could be something that +// JSON decodes from the response body to the concrete response type. +// type DecodeResponseFunc func(context.Context, *http.Response) (response interface{}, err error) diff --git a/transport/http/jsonrpc/error.go b/transport/http/jsonrpc/error.go new file mode 100644 index 000000000..c1d42b34d --- /dev/null +++ b/transport/http/jsonrpc/error.go @@ -0,0 +1,72 @@ +package jsonrpc + +// Error defines a JSON RPC error that can be returned +// in a Response from the spec +// http://www.jsonrpc.org/specification#error_object +type Error struct { + Code int `json:"code"` + Message string `json:"message"` + Data interface{} `json:"data,omitempty"` +} + +const ( + // ParseError defines invalid JSON was received by the server. + // An error occurred on the server while parsing the JSON text. + ParseError int = -32700 + + // InvalidRequestError defines the JSON sent is not a valid Request object. + InvalidRequestError int = -32600 + + // MethodNotFoundError defines the method does not exist / is not available. + MethodNotFoundError int = -32601 + + // InvalidParamsError defines invalid method parameter(s). + InvalidParamsError int = -32602 + + // InternalError defines a server error + InternalError int = -32603 +) + +var errorMessage = map[int]string{ + ParseError: "An error occurred on the server while parsing the JSON text.", + InvalidRequestError: "The JSON sent is not a valid Request object.", + MethodNotFoundError: "The method does not exist / is not available.", + InvalidParamsError: "Invalid method parameter(s).", + InternalError: "Internal JSON-RPC error.", +} + +// ErrorMessage returns a message for the JSON RPC error code. It returns the empty +// string if the code is unknown. +func ErrorMessage(code int) string { + return errorMessage[code] +} + +type parseError struct{} + +func (e *parseError) ErrorCode() int { + return ParseError +} + +type invalidRequestError struct{} + +func (e *invalidRequestError) ErrorCode() int { + return InvalidRequestError +} + +type methodNotFoundError struct{} + +func (e *methodNotFoundError) ErrorCode() int { + return MethodNotFoundError +} + +type invalidParamsError struct{} + +func (e *invalidParamsError) ErrorCode() int { + return InvalidParamsError +} + +type internalError struct{} + +func (e *internalError) ErrorCode() int { + return InternalError +} diff --git a/transport/http/jsonrpc/request_response_types.go b/transport/http/jsonrpc/request_response_types.go new file mode 100644 index 000000000..d69f8589a --- /dev/null +++ b/transport/http/jsonrpc/request_response_types.go @@ -0,0 +1,28 @@ +package jsonrpc + +import "encoding/json" + +// Request defines a JSON RPC request from the spec +// http://www.jsonrpc.org/specification#request_object +type Request struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` +} + +// Response defines a JSON RPC response from the spec +// http://www.jsonrpc.org/specification#response_object +type Response struct { + JSONRPC string `json:"jsonrpc"` + Result interface{} `json:"result,omitempty"` + Error Error `json:"error,omitemty"` +} + +const ( + // Version defines the version of the JSON RPC implementation + Version string = "2.0" + + // ContentType defines the content type to be served. + ContentType string = "application/json; charset=utf-8" +) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go new file mode 100644 index 000000000..0de3798d0 --- /dev/null +++ b/transport/http/jsonrpc/server.go @@ -0,0 +1,202 @@ +package jsonrpc + +import ( + "context" + "encoding/json" + "io" + "net/http" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + httptransport "github.com/go-kit/kit/transport/http" +) + +// Server wraps an endpoint and implements http.Handler. +type Server struct { + ctx context.Context + ecm EndpointCodecMap + before []httptransport.RequestFunc + after []httptransport.ServerResponseFunc + finalizer httptransport.ServerFinalizerFunc + logger log.Logger +} + +// NewServer constructs a new server, which implements http.Server. +func NewServer( + ctx context.Context, + ecm EndpointCodecMap, + options ...ServerOption, +) *Server { + s := &Server{ + ctx: ctx, + ecm: ecm, + logger: log.NewNopLogger(), + } + for _, option := range options { + option(s) + } + return s +} + +// EndpointCodec defines and Endpoint and its associated codecs +type EndpointCodec struct { + Endpoint endpoint.Endpoint + Decode DecodeRequestFunc + Encode EncodeResponseFunc +} + +// EndpointCodecMap maps the Request.Method to the proper EndpointCodec +type EndpointCodecMap map[string]EndpointCodec + +// ServerOption sets an optional parameter for servers. +type ServerOption func(*Server) + +// ServerBefore functions are executed on the HTTP request object before the +// request is decoded. +func ServerBefore(before ...httptransport.RequestFunc) ServerOption { + return func(s *Server) { s.before = before } +} + +// ServerAfter functions are executed on the HTTP response writer after the +// endpoint is invoked, but before anything is written to the client. +func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption { + return func(s *Server) { s.after = after } +} + +// ServerErrorLogger is used to log non-terminal errors. By default, no errors +// are logged. This is intended as a diagnostic measure. Finer-grained control +// of error handling, including logging in more detail, should be performed in a +// custom ServerErrorEncoder or ServerFinalizer, both of which have access to +// the context. +func ServerErrorLogger(logger log.Logger) ServerOption { + return func(s *Server) { s.logger = logger } +} + +// ServerFinalizer is executed at the end of every HTTP request. +// By default, no finalizer is registered. +func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption { + return func(s *Server) { s.finalizer = f } +} + +// ServeHTTP implements http.Handler. +func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") + w.WriteHeader(http.StatusMethodNotAllowed) + io.WriteString(w, "405 must POST\n") + return + } + ctx := s.ctx + + if s.finalizer != nil { + iw := &interceptingWriter{w, http.StatusOK} + defer func() { s.finalizer(ctx, iw.code, r) }() + w = iw + } + + for _, f := range s.before { + ctx = f(ctx, r) + } + + // Decode the body into an object + var req Request + err := json.NewDecoder(r.Body).Decode(&req) + if err != nil { + s.logger.Log("err", err) + rpcErrorEncoder(ctx, err, w) + return + } + + // Get the endpoint and codecs from the map using the method + // defined in the JSON object + ecm := s.ecm[req.Method] + + // TODO: Need to handle unregistered methods + + // Decode the JSON "params" + reqParams, err := ecm.Decode(ctx, req.Params) + if err != nil { + s.logger.Log("err", err) + rpcErrorEncoder(ctx, err, w) + return + } + + // Call the Endpoint with the params + response, err := ecm.Endpoint(ctx, reqParams) + if err != nil { + s.logger.Log("err", err) + rpcErrorEncoder(ctx, err, w) + return + } + + for _, f := range s.after { + ctx = f(ctx, w) + } + + res := Response{ + Error: Error{}, + } + + // Encode the response from the Endpoint + resParams, err := ecm.Encode(ctx, response) + if err != nil { + s.logger.Log("err", err) + rpcErrorEncoder(ctx, err, w) + return + } + + res.Result = resParams + + json.NewEncoder(w).Encode(res) +} + +// ErrorEncoder writes the error to the ResponseWriter, by default a +// content type of text/plain, a body of the plain text of the error, and a +// status code of 500. If the error implements Headerer, the provided headers +// will be applied to the response. If the error implements json.Marshaler, and +// the marshaling succeeds, a content type of application/json and the JSON +// encoded form of the error will be used. If the error implements StatusCoder, +// the provided StatusCode will be used instead of 500. +func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { + body := []byte(err.Error()) + + w.Header().Set("Content-Type", ContentType) + if headerer, ok := err.(httptransport.Headerer); ok { + for k := range headerer.Headers() { + w.Header().Set(k, headerer.Headers().Get(k)) + } + } + + e := Error{ + Code: InternalError, + Message: string(body), + } + if sc, ok := err.(ErrorCoder); ok { + e.Code = sc.ErrorCode() + } + + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(Response{ + JSONRPC: Version, + Error: e, + }) +} + +// ErrorCoder is checked by DefaultErrorEncoder. If an error value implements +// ErrorCoder, the Error will be used when encoding the error. By default, +// InternalError (-32603) is used. +type ErrorCoder interface { + ErrorCode() int +} + +type interceptingWriter struct { + http.ResponseWriter + code int +} + +// WriteHeader may not be explicitly called, so care must be taken to +// initialize w.code to its default value of http.StatusOK. +func (w *interceptingWriter) WriteHeader(code int) { + w.code = code + w.ResponseWriter.WriteHeader(code) +} From 53b2faa6ded84f9362ec2c8faeb9be3b81d3910e Mon Sep 17 00:00:00 2001 From: Blain Smith Date: Tue, 7 Feb 2017 10:51:30 -0500 Subject: [PATCH 02/40] example implementation of JSON RPC over HTTP --- examples/addsvc/transport_http.go | 40 +++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/examples/addsvc/transport_http.go b/examples/addsvc/transport_http.go index 75a9b839b..f49d9e026 100644 --- a/examples/addsvc/transport_http.go +++ b/examples/addsvc/transport_http.go @@ -16,6 +16,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/tracing/opentracing" httptransport "github.com/go-kit/kit/transport/http" + httpjsonrpctransport "github.com/go-kit/kit/transport/http/jsonrpc" ) // MakeHTTPHandler returns a handler that makes a set of endpoints available @@ -38,6 +39,25 @@ func MakeHTTPHandler(endpoints Endpoints, tracer stdopentracing.Tracer, logger l EncodeHTTPGenericResponse, append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., )) + + s := httpjsonrpctransport.NewServer( + ctx, + httpjsonrpctransport.EndpointCodecMap{ + "sum": httpjsonrpctransport.EndpointCodec{ + Endpoint: endpoints.SumEndpoint, + Decode: DecodeRPCHTTPConcatRequest, + Encode: EncodeRPCHTTPGenericResponse, + }, + "concat": httpjsonrpctransport.EndpointCodec{ + Endpoint: endpoints.SumEndpoint, + Decode: DecodeRPCHTTPConcatRequest, + Encode: EncodeRPCHTTPGenericResponse, + }, + }, + ) + + m.Handle("/rpc", s) + return m } @@ -128,3 +148,23 @@ func EncodeHTTPGenericRequest(_ context.Context, r *http.Request, request interf func EncodeHTTPGenericResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { return json.NewEncoder(w).Encode(response) } + +// DecodeRPCHTTPSumRequest ... +func DecodeRPCHTTPSumRequest(_ context.Context, params json.RawMessage) (interface{}, error) { + var req sumRequest + err := json.Unmarshal(params, &req) + return req, err +} + +// DecodeRPCHTTPConcatRequest ... +func DecodeRPCHTTPConcatRequest(_ context.Context, params json.RawMessage) (interface{}, error) { + var req concatRequest + err := json.Unmarshal(params, req) + return req, err +} + +// EncodeRPCHTTPGenericResponse ... +func EncodeRPCHTTPGenericResponse(_ context.Context, params interface{}) (json.RawMessage, error) { + res, err := json.Marshal(params) + return res, err +} From 7cfc4f12c813d913fafa148ac97ea9056c9ee58b Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 13 Jul 2017 15:15:09 +0100 Subject: [PATCH 03/40] Add ID type for JSON RPC Request, with tests. --- .../http/jsonrpc/request_response_types.go | 44 ++++++- .../jsonrpc/request_response_types_test.go | 107 ++++++++++++++++++ 2 files changed, 150 insertions(+), 1 deletion(-) create mode 100644 transport/http/jsonrpc/request_response_types_test.go diff --git a/transport/http/jsonrpc/request_response_types.go b/transport/http/jsonrpc/request_response_types.go index d69f8589a..90c9edb65 100644 --- a/transport/http/jsonrpc/request_response_types.go +++ b/transport/http/jsonrpc/request_response_types.go @@ -8,7 +8,49 @@ type Request struct { JSONRPC string `json:"jsonrpc"` Method string `json:"method"` Params json.RawMessage `json:"params"` - ID interface{} `json:"id"` + ID *RequestID `json:"id"` +} + +// RequestID defines a request ID that can be string, number, or null. +// An identifier established by the Client that MUST contain a String, +// Number, or NULL value if included. +// If it is not included it is assumed to be a notification. +// The value SHOULD normally not be Null and +// Numbers SHOULD NOT contain fractional parts. +type RequestID struct { + intValue int + intError error + floatValue float32 + floatError error + stringValue string + stringError error +} + +// UnmarshalJSON satisfies json.Unmarshaler +func (id *RequestID) UnmarshalJSON(b []byte) error { + id.intError = json.Unmarshal(b, &id.intValue) + id.floatError = json.Unmarshal(b, &id.floatValue) + id.stringError = json.Unmarshal(b, &id.stringValue) + + return nil +} + +// Int returns the ID as an integer value. +// An error is returned if the ID can't be treated as an int. +func (id *RequestID) Int() (int, error) { + return id.intValue, id.intError +} + +// Float32 returns the ID as a float value. +// An error is returned if the ID can't be treated as an float. +func (id *RequestID) Float32() (float32, error) { + return id.floatValue, id.floatError +} + +// String returns the ID as a string value. +// An error is returned if the ID can't be treated as an string. +func (id *RequestID) String() (string, error) { + return id.stringValue, id.stringError } // Response defines a JSON RPC response from the spec diff --git a/transport/http/jsonrpc/request_response_types_test.go b/transport/http/jsonrpc/request_response_types_test.go new file mode 100644 index 000000000..ef3e4b4c2 --- /dev/null +++ b/transport/http/jsonrpc/request_response_types_test.go @@ -0,0 +1,107 @@ +package jsonrpc + +import ( + "encoding/json" + "fmt" + "testing" +) + +func TestCanUnMarshalID(t *testing.T) { + cases := []struct { + JSON string + expType string + expValue interface{} + }{ + {`12345`, "int", 12345}, + {`12345.6`, "float", 12345.6}, + {`"stringaling"`, "string", "stringaling"}, + } + + for _, c := range cases { + r := Request{} + JSON := fmt.Sprintf(`{"id":%s}`, c.JSON) + + var foo interface{} + _ = json.Unmarshal([]byte(JSON), &foo) + fmt.Printf("foo = %t %+v\n", foo, foo) + + err := json.Unmarshal([]byte(JSON), &r) + if err != nil { + t.Fatalf("Unexpected error unmarshaling JSON into request: %s\n", err) + } + id := r.ID + + switch c.expType { + case "int": + want := c.expValue.(int) + got, err := id.Int() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Fatalf("'%s' Int(): want %d, got %d.", c.JSON, want, got) + } + + // Allow an int ID to be interpreted as a float. + wantf := float32(c.expValue.(int)) + gotf, err := id.Float32() + if gotf != wantf { + t.Fatalf("'%s' Int value as Float32(): want %f, got %f.", c.JSON, wantf, gotf) + } + + _, err = id.String() + if err == nil { + t.Fatal("Expected String() to error for int value. Didn't.") + } + case "string": + want := c.expValue.(string) + got, err := id.String() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Fatalf("'%s' String(): want %s, got %s.", c.JSON, want, got) + } + + _, err = id.Int() + if err == nil { + t.Fatal("Expected Int() to error for string value. Didn't.") + } + _, err = id.Float32() + if err == nil { + t.Fatal("Expected Float32() to error for string value. Didn't.") + } + case "float32": + want := c.expValue.(float32) + got, err := id.Float32() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Fatalf("'%s' Float32(): want %f, got %f.", c.JSON, want, got) + } + + _, err = id.String() + if err == nil { + t.Fatal("Expected String() to error for float value. Didn't.") + } + _, err = id.Int() + if err == nil { + t.Fatal("Expected Int() to error for float value. Didn't.") + } + } + } +} + +func TestCanUnmarshalNullID(t *testing.T) { + r := Request{} + JSON := `{"id":null}` + err := json.Unmarshal([]byte(JSON), &r) + if err != nil { + t.Fatalf("Unexpected error unmarshaling JSON into request: %s\n", err) + } + + if r.ID != nil { + t.Fatalf("Expected ID to be nil, got %+v.\n", r.ID) + } +} From ed2fbcf79c895cac371b839ae21fc95b144d1368 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 14 Jul 2017 10:26:25 +0100 Subject: [PATCH 04/40] Add basic server testing for JSON RPC. Add basic server tests, following example from http transport. Switch Response.Error to pointer, to make absence clearer. --- transport/http/jsonrpc/encode_decode.go | 2 +- .../http/jsonrpc/request_response_types.go | 2 +- transport/http/jsonrpc/server.go | 6 +- transport/http/jsonrpc/server_test.go | 141 ++++++++++++++++++ 4 files changed, 145 insertions(+), 6 deletions(-) create mode 100644 transport/http/jsonrpc/server_test.go diff --git a/transport/http/jsonrpc/encode_decode.go b/transport/http/jsonrpc/encode_decode.go index cda66dbff..7c64e6ea2 100644 --- a/transport/http/jsonrpc/encode_decode.go +++ b/transport/http/jsonrpc/encode_decode.go @@ -3,7 +3,7 @@ package jsonrpc import ( "encoding/json" - "golang.org/x/net/context" + "context" ) // DecodeRequestFunc extracts a user-domain request object from an HTTP diff --git a/transport/http/jsonrpc/request_response_types.go b/transport/http/jsonrpc/request_response_types.go index 90c9edb65..474158e63 100644 --- a/transport/http/jsonrpc/request_response_types.go +++ b/transport/http/jsonrpc/request_response_types.go @@ -58,7 +58,7 @@ func (id *RequestID) String() (string, error) { type Response struct { JSONRPC string `json:"jsonrpc"` Result interface{} `json:"result,omitempty"` - Error Error `json:"error,omitemty"` + Error *Error `json:"error,omitemty"` } const ( diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 0de3798d0..7cf1f80f6 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -133,9 +133,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx = f(ctx, w) } - res := Response{ - Error: Error{}, - } + res := Response{} // Encode the response from the Endpoint resParams, err := ecm.Encode(ctx, response) @@ -178,7 +176,7 @@ func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { w.WriteHeader(http.StatusOK) json.NewEncoder(w).Encode(Response{ JSONRPC: Version, - Error: e, + Error: &e, }) } diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go new file mode 100644 index 000000000..046c74428 --- /dev/null +++ b/transport/http/jsonrpc/server_test.go @@ -0,0 +1,141 @@ +package jsonrpc_test + +import ( + "context" + "encoding/json" + "errors" + "io" + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/rossmcf/kit/transport/http/jsonrpc" +) + +func addBody() io.Reader { + return strings.NewReader(`{"jsonrpc": "2.0", "method": "add", "params": [3, 2], "id": 1}`) +} + +func expectErrorCode(t *testing.T, want int, body []byte) { + var r jsonrpc.Response + err := json.Unmarshal(body, &r) + if err != nil { + t.Fatalf("Cant' decode response. err=%s, body=%s", err, body) + } + if r.Error == nil { + t.Fatalf("Expected error on response. Got none: %s", body) + } + if have := r.Error.Code; want != have { + t.Fatalf("Unexpected error code. Want %d, have %d: %s", want, have, body) + } +} + +func TestServerBadDecode(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, errors.New("oof") }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, nil }, + }, + } + handler := jsonrpc.NewServer(context.TODO(), ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", addBody()) + buf, _ := ioutil.ReadAll(resp.Body) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d: %s", want, have, buf) + } + expectErrorCode(t, jsonrpc.InternalError, buf) +} + +func TestServerBadEndpoint(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("oof") }, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, nil }, + }, + } + handler := jsonrpc.NewServer(context.TODO(), ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", addBody()) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + buf, _ := ioutil.ReadAll(resp.Body) + expectErrorCode(t, jsonrpc.InternalError, buf) +} + +func TestServerBadEncode(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + }, + } + handler := jsonrpc.NewServer(context.TODO(), ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", addBody()) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + buf, _ := ioutil.ReadAll(resp.Body) + expectErrorCode(t, jsonrpc.InternalError, buf) +} + +func TestServerHappyPath(t *testing.T) { + step, response := testServer(t) + step() + resp := <-response + defer resp.Body.Close() // nolint + buf, _ := ioutil.ReadAll(resp.Body) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d (%s)", want, have, buf) + } + var r jsonrpc.Response + err := json.Unmarshal(buf, &r) + if err != nil { + t.Fatalf("Cant' decode response. err=%s, body=%s", err, buf) + } + if r.Error != nil { + t.Fatalf("Unxpected error on response: %s", buf) + } +} + +func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { + var ( + stepch = make(chan bool) + endpoint = func(ctx context.Context, request interface{}) (response interface{}, err error) { + <-stepch + return struct{}{}, nil + } + response = make(chan *http.Response) + ctx = context.TODO() + ecm = jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: endpoint, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte("[]"), nil }, + }, + } + handler = jsonrpc.NewServer(ctx, ecm) + ) + go func() { + server := httptest.NewServer(handler) + defer server.Close() + rb := strings.NewReader(`{"jsonrpc": "2.0", "method": "add", "params": [3, 2], "id": 1}`) + resp, err := http.Post(server.URL, "application/json", rb) + if err != nil { + t.Error(err) + return + } + response <- resp + }() + return func() { stepch <- true }, response +} From 4adaf395c0d345f493dd38ba0b16542e1f8b4f63 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 14 Jul 2017 10:41:45 +0100 Subject: [PATCH 05/40] Handle unregistered JSON RPC methods. --- transport/http/jsonrpc/error.go | 10 +++++++--- transport/http/jsonrpc/server.go | 15 +++++++++------ transport/http/jsonrpc/server_test.go | 13 +++++++++++++ 3 files changed, 29 insertions(+), 9 deletions(-) diff --git a/transport/http/jsonrpc/error.go b/transport/http/jsonrpc/error.go index c1d42b34d..783c35917 100644 --- a/transport/http/jsonrpc/error.go +++ b/transport/http/jsonrpc/error.go @@ -53,15 +53,19 @@ func (e *invalidRequestError) ErrorCode() int { return InvalidRequestError } -type methodNotFoundError struct{} +type methodNotFoundError string -func (e *methodNotFoundError) ErrorCode() int { +func (e methodNotFoundError) Error() string { + return string(e) +} + +func (e methodNotFoundError) ErrorCode() int { return MethodNotFoundError } type invalidParamsError struct{} -func (e *invalidParamsError) ErrorCode() int { +func (e invalidParamsError) ErrorCode() int { return InvalidParamsError } diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 7cf1f80f6..c6bd242a6 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -3,6 +3,7 @@ package jsonrpc import ( "context" "encoding/json" + "fmt" "io" "net/http" @@ -109,9 +110,13 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // Get the endpoint and codecs from the map using the method // defined in the JSON object - ecm := s.ecm[req.Method] - - // TODO: Need to handle unregistered methods + ecm, ok := s.ecm[req.Method] + if !ok { + err := methodNotFoundError(fmt.Sprintf("Method %s was not found.", req.Method)) + s.logger.Log("err", err) + rpcErrorEncoder(ctx, err, w) + return + } // Decode the JSON "params" reqParams, err := ecm.Decode(ctx, req.Params) @@ -156,8 +161,6 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { // encoded form of the error will be used. If the error implements StatusCoder, // the provided StatusCode will be used instead of 500. func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { - body := []byte(err.Error()) - w.Header().Set("Content-Type", ContentType) if headerer, ok := err.(httptransport.Headerer); ok { for k := range headerer.Headers() { @@ -167,7 +170,7 @@ func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { e := Error{ Code: InternalError, - Message: string(body), + Message: err.Error(), } if sc, ok := err.(ErrorCoder); ok { e.Code = sc.ErrorCode() diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index 046c74428..01e8cd5b5 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -89,6 +89,19 @@ func TestServerBadEncode(t *testing.T) { expectErrorCode(t, jsonrpc.InternalError, buf) } +func TestServerUnregisteredMethod(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{} + handler := jsonrpc.NewServer(context.TODO(), ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", addBody()) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + buf, _ := ioutil.ReadAll(resp.Body) + expectErrorCode(t, jsonrpc.MethodNotFoundError, buf) +} + func TestServerHappyPath(t *testing.T) { step, response := testServer(t) step() From ea68d6ad264d857dabeda3d88a5eef01ad5193a5 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 14 Jul 2017 10:46:07 +0100 Subject: [PATCH 06/40] Package tidy-up. --- transport/http/jsonrpc/request_response_types_test.go | 8 +++++--- transport/http/jsonrpc/server_test.go | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/transport/http/jsonrpc/request_response_types_test.go b/transport/http/jsonrpc/request_response_types_test.go index ef3e4b4c2..d39f5e957 100644 --- a/transport/http/jsonrpc/request_response_types_test.go +++ b/transport/http/jsonrpc/request_response_types_test.go @@ -1,9 +1,11 @@ -package jsonrpc +package jsonrpc_test import ( "encoding/json" "fmt" "testing" + + "github.com/go-kit/kit/transport/http/jsonrpc" ) func TestCanUnMarshalID(t *testing.T) { @@ -18,7 +20,7 @@ func TestCanUnMarshalID(t *testing.T) { } for _, c := range cases { - r := Request{} + r := jsonrpc.Request{} JSON := fmt.Sprintf(`{"id":%s}`, c.JSON) var foo interface{} @@ -94,7 +96,7 @@ func TestCanUnMarshalID(t *testing.T) { } func TestCanUnmarshalNullID(t *testing.T) { - r := Request{} + r := jsonrpc.Request{} JSON := `{"id":null}` err := json.Unmarshal([]byte(JSON), &r) if err != nil { diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index 01e8cd5b5..e351de21a 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -11,7 +11,7 @@ import ( "strings" "testing" - "github.com/rossmcf/kit/transport/http/jsonrpc" + "github.com/go-kit/kit/transport/http/jsonrpc" ) func addBody() io.Reader { From 111107ec5237b6846e73833968c4b6caddad06e9 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 14 Jul 2017 12:23:04 +0100 Subject: [PATCH 07/40] Test ServerBefore / ServerAfter for JSON RPC. --- transport/http/jsonrpc/server.go | 4 +- transport/http/jsonrpc/server_test.go | 94 +++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index c6bd242a6..e2d2eedab 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -55,13 +55,13 @@ type ServerOption func(*Server) // ServerBefore functions are executed on the HTTP request object before the // request is decoded. func ServerBefore(before ...httptransport.RequestFunc) ServerOption { - return func(s *Server) { s.before = before } + return func(s *Server) { s.before = append(s.before, before...) } } // ServerAfter functions are executed on the HTTP response writer after the // endpoint is invoked, but before anything is written to the client. func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption { - return func(s *Server) { s.after = after } + return func(s *Server) { s.after = append(s.after, after...) } } // ServerErrorLogger is used to log non-terminal errors. By default, no errors diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index e351de21a..99f1d2850 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -10,7 +10,9 @@ import ( "net/http/httptest" "strings" "testing" + "time" + "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/transport/http/jsonrpc" ) @@ -89,6 +91,24 @@ func TestServerBadEncode(t *testing.T) { expectErrorCode(t, jsonrpc.InternalError, buf) } +func TestServerErrorEncoder(t *testing.T) { + //ecm := jsonrpc.EndpointCodecMap{ + //"add": jsonrpc.EndpointCodec{ + //Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + //Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + //Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + //}, + //} + //handler := jsonrpc.NewServer(context.TODO(), ecm) + //server := httptest.NewServer(handler) + //defer server.Close() + //resp, _ := http.Post(server.URL, "application/json", addBody()) + //if want, have := http.StatusOK, resp.StatusCode; want != have { + //t.Errorf("want %d, have %d", want, have) + //} + //buf, _ := ioutil.ReadAll(resp.Body) + //expectErrorCode(t, jsonrpc.InternalError, buf) +} func TestServerUnregisteredMethod(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{} handler := jsonrpc.NewServer(context.TODO(), ecm) @@ -121,6 +141,80 @@ func TestServerHappyPath(t *testing.T) { } } +func TestMultipleServerBefore(t *testing.T) { + var done = make(chan struct{}) + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: endpoint.Nop, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + }, + } + handler := jsonrpc.NewServer( + context.TODO(), + ecm, + jsonrpc.ServerBefore(func(ctx context.Context, r *http.Request) context.Context { + ctx = context.WithValue(ctx, "one", 1) + + return ctx + }), + jsonrpc.ServerBefore(func(ctx context.Context, r *http.Request) context.Context { + if _, ok := ctx.Value("one").(int); !ok { + t.Error("Value was not set properly when multiple ServerBefores are used") + } + + close(done) + return ctx + }), + ) + server := httptest.NewServer(handler) + defer server.Close() + http.Post(server.URL, "application/json", addBody()) // nolint + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for finalizer") + } +} + +func TestMultipleServerAfter(t *testing.T) { + var done = make(chan struct{}) + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: endpoint.Nop, + Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + }, + } + handler := jsonrpc.NewServer( + context.TODO(), + ecm, + jsonrpc.ServerAfter(func(ctx context.Context, w http.ResponseWriter) context.Context { + ctx = context.WithValue(ctx, "one", 1) + + return ctx + }), + jsonrpc.ServerAfter(func(ctx context.Context, w http.ResponseWriter) context.Context { + if _, ok := ctx.Value("one").(int); !ok { + t.Error("Value was not set properly when multiple ServerAfters are used") + } + + close(done) + return ctx + }), + ) + server := httptest.NewServer(handler) + defer server.Close() + http.Post(server.URL, "application/json", addBody()) // nolint + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for finalizer") + } +} + func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { var ( stepch = make(chan bool) From 0090caa263fa9e1c16de80aded430dc927b5de51 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 17 Jul 2017 10:40:18 +0100 Subject: [PATCH 08/40] More JSON RPC tests. --- transport/http/jsonrpc/server.go | 55 ++++++++++------- transport/http/jsonrpc/server_test.go | 86 ++++++++++++++++++--------- 2 files changed, 89 insertions(+), 52 deletions(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index e2d2eedab..655078d20 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -14,12 +14,13 @@ import ( // Server wraps an endpoint and implements http.Handler. type Server struct { - ctx context.Context - ecm EndpointCodecMap - before []httptransport.RequestFunc - after []httptransport.ServerResponseFunc - finalizer httptransport.ServerFinalizerFunc - logger log.Logger + ctx context.Context + ecm EndpointCodecMap + before []httptransport.RequestFunc + after []httptransport.ServerResponseFunc + errorEncoder httptransport.ErrorEncoder + finalizer httptransport.ServerFinalizerFunc + logger log.Logger } // NewServer constructs a new server, which implements http.Server. @@ -29,9 +30,10 @@ func NewServer( options ...ServerOption, ) *Server { s := &Server{ - ctx: ctx, - ecm: ecm, - logger: log.NewNopLogger(), + ctx: ctx, + ecm: ecm, + errorEncoder: DefaultErrorEncoder, + logger: log.NewNopLogger(), } for _, option := range options { option(s) @@ -64,6 +66,14 @@ func ServerAfter(after ...httptransport.ServerResponseFunc) ServerOption { return func(s *Server) { s.after = append(s.after, after...) } } +// ServerErrorEncoder is used to encode errors to the http.ResponseWriter +// whenever they're encountered in the processing of a request. Clients can +// use this to provide custom error formatting and response codes. By default, +// errors will be written with the DefaultErrorEncoder. +func ServerErrorEncoder(ee httptransport.ErrorEncoder) ServerOption { + return func(s *Server) { s.errorEncoder = ee } +} + // ServerErrorLogger is used to log non-terminal errors. By default, no errors // are logged. This is intended as a diagnostic measure. Finer-grained control // of error handling, including logging in more detail, should be performed in a @@ -104,7 +114,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { err := json.NewDecoder(r.Body).Decode(&req) if err != nil { s.logger.Log("err", err) - rpcErrorEncoder(ctx, err, w) + s.errorEncoder(ctx, err, w) return } @@ -114,15 +124,15 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if !ok { err := methodNotFoundError(fmt.Sprintf("Method %s was not found.", req.Method)) s.logger.Log("err", err) - rpcErrorEncoder(ctx, err, w) + s.errorEncoder(ctx, err, w) return } - // Decode the JSON "params" + // Decode the JSON "params" reqParams, err := ecm.Decode(ctx, req.Params) if err != nil { s.logger.Log("err", err) - rpcErrorEncoder(ctx, err, w) + s.errorEncoder(ctx, err, w) return } @@ -130,7 +140,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { response, err := ecm.Endpoint(ctx, reqParams) if err != nil { s.logger.Log("err", err) - rpcErrorEncoder(ctx, err, w) + s.errorEncoder(ctx, err, w) return } @@ -144,7 +154,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { resParams, err := ecm.Encode(ctx, response) if err != nil { s.logger.Log("err", err) - rpcErrorEncoder(ctx, err, w) + s.errorEncoder(ctx, err, w) return } @@ -153,14 +163,13 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(res) } -// ErrorEncoder writes the error to the ResponseWriter, by default a -// content type of text/plain, a body of the plain text of the error, and a -// status code of 500. If the error implements Headerer, the provided headers -// will be applied to the response. If the error implements json.Marshaler, and -// the marshaling succeeds, a content type of application/json and the JSON -// encoded form of the error will be used. If the error implements StatusCoder, -// the provided StatusCode will be used instead of 500. -func rpcErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { +// DefaultErrorEncoder writes the error to the ResponseWriter, +// as a json-rpc error response, with an InternalError status code. +// The Error() string of the error will be used as the response error message. +// If the error implements ErrorCoder, the provided code will be set on the +// response error. +// If the error implements Headerer, the given headers will be set. +func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { w.Header().Set("Content-Type", ContentType) if headerer, ok := err.(httptransport.Headerer); ok { for k := range headerer.Headers() { diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index 99f1d2850..e163effc5 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -34,15 +34,30 @@ func expectErrorCode(t *testing.T, want int, body []byte) { } } +func nopDecoder(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil } +func nopEncoder(context.Context, interface{}) (json.RawMessage, error) { return []byte("[]"), nil } + +type mockLogger struct { + Called bool + LastArgs []interface{} +} + +func (l *mockLogger) Log(keyvals ...interface{}) error { + l.Called = true + l.LastArgs = append(l.LastArgs, keyvals) + return nil +} + func TestServerBadDecode(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ - Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, + Endpoint: endpoint.Nop, Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, errors.New("oof") }, - Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, nil }, + Encode: nopEncoder, }, } - handler := jsonrpc.NewServer(context.TODO(), ecm) + logger := mockLogger{} + handler := jsonrpc.NewServer(context.TODO(), ecm, jsonrpc.ServerErrorLogger(&logger)) server := httptest.NewServer(handler) defer server.Close() resp, _ := http.Post(server.URL, "application/json", addBody()) @@ -51,14 +66,17 @@ func TestServerBadDecode(t *testing.T) { t.Errorf("want %d, have %d: %s", want, have, buf) } expectErrorCode(t, jsonrpc.InternalError, buf) + if !logger.Called { + t.Fatal("Expected logger to be called with error. Wasn't.") + } } func TestServerBadEndpoint(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errors.New("oof") }, - Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, - Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, nil }, + Decode: nopDecoder, + Encode: nopEncoder, }, } handler := jsonrpc.NewServer(context.TODO(), ecm) @@ -75,8 +93,8 @@ func TestServerBadEndpoint(t *testing.T) { func TestServerBadEncode(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ - Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, - Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, + Endpoint: endpoint.Nop, + Decode: nopDecoder, Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, }, } @@ -92,23 +110,33 @@ func TestServerBadEncode(t *testing.T) { } func TestServerErrorEncoder(t *testing.T) { - //ecm := jsonrpc.EndpointCodecMap{ - //"add": jsonrpc.EndpointCodec{ - //Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, nil }, - //Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, - //Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, - //}, - //} - //handler := jsonrpc.NewServer(context.TODO(), ecm) - //server := httptest.NewServer(handler) - //defer server.Close() - //resp, _ := http.Post(server.URL, "application/json", addBody()) - //if want, have := http.StatusOK, resp.StatusCode; want != have { - //t.Errorf("want %d, have %d", want, have) - //} - //buf, _ := ioutil.ReadAll(resp.Body) - //expectErrorCode(t, jsonrpc.InternalError, buf) + errTeapot := errors.New("teapot") + code := func(err error) int { + if err == errTeapot { + return http.StatusTeapot + } + return http.StatusInternalServerError + } + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: func(context.Context, interface{}) (interface{}, error) { return struct{}{}, errTeapot }, + Decode: nopDecoder, + Encode: nopEncoder, + }, + } + handler := jsonrpc.NewServer( + context.TODO(), + ecm, + jsonrpc.ServerErrorEncoder(func(_ context.Context, err error, w http.ResponseWriter) { w.WriteHeader(code(err)) }), + ) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", addBody()) + if want, have := http.StatusTeapot, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } } + func TestServerUnregisteredMethod(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{} handler := jsonrpc.NewServer(context.TODO(), ecm) @@ -146,8 +174,8 @@ func TestMultipleServerBefore(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ Endpoint: endpoint.Nop, - Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, - Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + Decode: nopDecoder, + Encode: nopEncoder, }, } handler := jsonrpc.NewServer( @@ -183,8 +211,8 @@ func TestMultipleServerAfter(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ Endpoint: endpoint.Nop, - Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, - Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, + Decode: nopDecoder, + Encode: nopEncoder, }, } handler := jsonrpc.NewServer( @@ -227,8 +255,8 @@ func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { ecm = jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ Endpoint: endpoint, - Decode: func(context.Context, json.RawMessage) (interface{}, error) { return struct{}{}, nil }, - Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte("[]"), nil }, + Decode: nopDecoder, + Encode: nopEncoder, }, } handler = jsonrpc.NewServer(ctx, ecm) From 40485ea4ed9494d29f0f1c9c080cf291a3397224 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 17 Jul 2017 11:23:56 +0100 Subject: [PATCH 09/40] Remove JSON RPC from addsvc example, pending full JSON RPC example. --- examples/addsvc/transport_http.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/examples/addsvc/transport_http.go b/examples/addsvc/transport_http.go index f49d9e026..3f92839ad 100644 --- a/examples/addsvc/transport_http.go +++ b/examples/addsvc/transport_http.go @@ -16,7 +16,6 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/tracing/opentracing" httptransport "github.com/go-kit/kit/transport/http" - httpjsonrpctransport "github.com/go-kit/kit/transport/http/jsonrpc" ) // MakeHTTPHandler returns a handler that makes a set of endpoints available @@ -39,25 +38,6 @@ func MakeHTTPHandler(endpoints Endpoints, tracer stdopentracing.Tracer, logger l EncodeHTTPGenericResponse, append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., )) - - s := httpjsonrpctransport.NewServer( - ctx, - httpjsonrpctransport.EndpointCodecMap{ - "sum": httpjsonrpctransport.EndpointCodec{ - Endpoint: endpoints.SumEndpoint, - Decode: DecodeRPCHTTPConcatRequest, - Encode: EncodeRPCHTTPGenericResponse, - }, - "concat": httpjsonrpctransport.EndpointCodec{ - Endpoint: endpoints.SumEndpoint, - Decode: DecodeRPCHTTPConcatRequest, - Encode: EncodeRPCHTTPGenericResponse, - }, - }, - ) - - m.Handle("/rpc", s) - return m } From 08a25d81f57a34dc75fa5ec7d61addbce552e3dc Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 17 Jul 2017 11:25:44 +0100 Subject: [PATCH 10/40] Remove JSON RPC from addsvc example, pending full JSON RPC example. --- examples/addsvc/transport_http.go | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/examples/addsvc/transport_http.go b/examples/addsvc/transport_http.go index 3f92839ad..75a9b839b 100644 --- a/examples/addsvc/transport_http.go +++ b/examples/addsvc/transport_http.go @@ -128,23 +128,3 @@ func EncodeHTTPGenericRequest(_ context.Context, r *http.Request, request interf func EncodeHTTPGenericResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { return json.NewEncoder(w).Encode(response) } - -// DecodeRPCHTTPSumRequest ... -func DecodeRPCHTTPSumRequest(_ context.Context, params json.RawMessage) (interface{}, error) { - var req sumRequest - err := json.Unmarshal(params, &req) - return req, err -} - -// DecodeRPCHTTPConcatRequest ... -func DecodeRPCHTTPConcatRequest(_ context.Context, params json.RawMessage) (interface{}, error) { - var req concatRequest - err := json.Unmarshal(params, req) - return req, err -} - -// EncodeRPCHTTPGenericResponse ... -func EncodeRPCHTTPGenericResponse(_ context.Context, params interface{}) (json.RawMessage, error) { - res, err := json.Marshal(params) - return res, err -} From 0588817135c030cce7f1b6cb596969ce157b1842 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 17 Jul 2017 14:07:17 +0100 Subject: [PATCH 11/40] Remove context field from jsonrpc.Server. --- transport/http/jsonrpc/server.go | 4 +--- transport/http/jsonrpc/server_test.go | 14 +++++--------- 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 655078d20..c67371222 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -25,12 +25,10 @@ type Server struct { // NewServer constructs a new server, which implements http.Server. func NewServer( - ctx context.Context, ecm EndpointCodecMap, options ...ServerOption, ) *Server { s := &Server{ - ctx: ctx, ecm: ecm, errorEncoder: DefaultErrorEncoder, logger: log.NewNopLogger(), @@ -97,7 +95,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { io.WriteString(w, "405 must POST\n") return } - ctx := s.ctx + ctx := r.Context() if s.finalizer != nil { iw := &interceptingWriter{w, http.StatusOK} diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index e163effc5..d2ea793be 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -57,7 +57,7 @@ func TestServerBadDecode(t *testing.T) { }, } logger := mockLogger{} - handler := jsonrpc.NewServer(context.TODO(), ecm, jsonrpc.ServerErrorLogger(&logger)) + handler := jsonrpc.NewServer(ecm, jsonrpc.ServerErrorLogger(&logger)) server := httptest.NewServer(handler) defer server.Close() resp, _ := http.Post(server.URL, "application/json", addBody()) @@ -79,7 +79,7 @@ func TestServerBadEndpoint(t *testing.T) { Encode: nopEncoder, }, } - handler := jsonrpc.NewServer(context.TODO(), ecm) + handler := jsonrpc.NewServer(ecm) server := httptest.NewServer(handler) defer server.Close() resp, _ := http.Post(server.URL, "application/json", addBody()) @@ -98,7 +98,7 @@ func TestServerBadEncode(t *testing.T) { Encode: func(context.Context, interface{}) (json.RawMessage, error) { return []byte{}, errors.New("oof") }, }, } - handler := jsonrpc.NewServer(context.TODO(), ecm) + handler := jsonrpc.NewServer(ecm) server := httptest.NewServer(handler) defer server.Close() resp, _ := http.Post(server.URL, "application/json", addBody()) @@ -125,7 +125,6 @@ func TestServerErrorEncoder(t *testing.T) { }, } handler := jsonrpc.NewServer( - context.TODO(), ecm, jsonrpc.ServerErrorEncoder(func(_ context.Context, err error, w http.ResponseWriter) { w.WriteHeader(code(err)) }), ) @@ -139,7 +138,7 @@ func TestServerErrorEncoder(t *testing.T) { func TestServerUnregisteredMethod(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{} - handler := jsonrpc.NewServer(context.TODO(), ecm) + handler := jsonrpc.NewServer(ecm) server := httptest.NewServer(handler) defer server.Close() resp, _ := http.Post(server.URL, "application/json", addBody()) @@ -179,7 +178,6 @@ func TestMultipleServerBefore(t *testing.T) { }, } handler := jsonrpc.NewServer( - context.TODO(), ecm, jsonrpc.ServerBefore(func(ctx context.Context, r *http.Request) context.Context { ctx = context.WithValue(ctx, "one", 1) @@ -216,7 +214,6 @@ func TestMultipleServerAfter(t *testing.T) { }, } handler := jsonrpc.NewServer( - context.TODO(), ecm, jsonrpc.ServerAfter(func(ctx context.Context, w http.ResponseWriter) context.Context { ctx = context.WithValue(ctx, "one", 1) @@ -251,7 +248,6 @@ func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { return struct{}{}, nil } response = make(chan *http.Response) - ctx = context.TODO() ecm = jsonrpc.EndpointCodecMap{ "add": jsonrpc.EndpointCodec{ Endpoint: endpoint, @@ -259,7 +255,7 @@ func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { Encode: nopEncoder, }, } - handler = jsonrpc.NewServer(ctx, ecm) + handler = jsonrpc.NewServer(ecm) ) go func() { server := httptest.NewServer(handler) From 5be17f5e99c9b46d4f117e37f3d7334f136e4b35 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 20 Jul 2017 15:27:53 +0100 Subject: [PATCH 12/40] Add JSON content type to all JSON RPC responses. --- transport/http/jsonrpc/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index c67371222..7a4952f04 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -89,8 +89,8 @@ func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption { // ServeHTTP implements http.Handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") if r.Method != http.MethodPost { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) io.WriteString(w, "405 must POST\n") return From bcdbd542f1e0ac0766aaad1aba357e85f29a4fe6 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 20 Jul 2017 15:39:49 +0100 Subject: [PATCH 13/40] Add JSON content type to all JSON RPC responses. --- transport/http/jsonrpc/server.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 7a4952f04..01a84c0bf 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -89,8 +89,8 @@ func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption { // ServeHTTP implements http.Handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "text/plain; charset=utf-8") if r.Method != http.MethodPost { + w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) io.WriteString(w, "405 must POST\n") return @@ -158,6 +158,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { res.Result = resParams + w.Header().Set("Content-Type", ContentType) json.NewEncoder(w).Encode(res) } From 526b570e300634e0047fff20c00bdd06d544caf4 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 3 Aug 2017 19:40:02 +0100 Subject: [PATCH 14/40] Remove client-side JSON RPC funcs for now. --- transport/http/jsonrpc/encode_decode.go | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/transport/http/jsonrpc/encode_decode.go b/transport/http/jsonrpc/encode_decode.go index 7c64e6ea2..49e80b26b 100644 --- a/transport/http/jsonrpc/encode_decode.go +++ b/transport/http/jsonrpc/encode_decode.go @@ -12,20 +12,8 @@ import ( // JSON decodes from the request body to the concrete response type. type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error) -// EncodeRequestFunc encodes the passed request object into the HTTP request -// object. It's designed to be used in HTTP clients, for client-side -// endpoints. One straightforward EncodeRequestFunc could something that JSON -// encodes the object directly to the request body. -// type EncodeRequestFunc func(context.Context, *http.Request, interface{}) error - // EncodeResponseFunc encodes the passed response object to the HTTP response // writer. It's designed to be used in HTTP servers, for server-side // endpoints. One straightforward EncodeResponseFunc could be something that // JSON encodes the object directly to the response body. type EncodeResponseFunc func(context.Context, interface{}) (response json.RawMessage, err error) - -// DecodeResponseFunc extracts a user-domain response object from an HTTP -// response object. It's designed to be used in HTTP clients, for client-side -// endpoints. One straightforward DecodeResponseFunc could be something that -// JSON decodes from the response body to the concrete response type. -// type DecodeResponseFunc func(context.Context, *http.Response) (response interface{}, err error) From 773fed533c2c6f4b5f940a784db7c1d4263663d0 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 3 Aug 2017 19:40:14 +0100 Subject: [PATCH 15/40] Document interceptingWriter --- transport/http/jsonrpc/server.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 01a84c0bf..d9fb0258b 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -198,6 +198,8 @@ type ErrorCoder interface { ErrorCode() int } +// interceptingWriter intercepts calls to WriteHeader, so that a finalizer +// can be given the correct status code. type interceptingWriter struct { http.ResponseWriter code int From 13719b2b4d0a7145c2c715ce5daa8013e3d8b724 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 3 Aug 2017 19:43:24 +0100 Subject: [PATCH 16/40] Add JSON RPC doc.go. --- transport/http/jsonrpc/doc.go | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 transport/http/jsonrpc/doc.go diff --git a/transport/http/jsonrpc/doc.go b/transport/http/jsonrpc/doc.go new file mode 100644 index 000000000..0e2bd52a1 --- /dev/null +++ b/transport/http/jsonrpc/doc.go @@ -0,0 +1,3 @@ +// Package jsonrpc provides a JSON RPC (v2.0) binding for endpoints. +// See http://www.jsonrpc.org/specification +package jsonrpc From dc16302f3619a6ee2c5a362d5ad1e05d7d91854d Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Thu, 3 Aug 2017 21:06:44 +0100 Subject: [PATCH 17/40] Add README for JSON RPC. --- transport/http/jsonrpc/README.md | 92 ++++++++++++++++++++++++++++++++ 1 file changed, 92 insertions(+) create mode 100644 transport/http/jsonrpc/README.md diff --git a/transport/http/jsonrpc/README.md b/transport/http/jsonrpc/README.md new file mode 100644 index 000000000..6bbd64d6d --- /dev/null +++ b/transport/http/jsonrpc/README.md @@ -0,0 +1,92 @@ +# JSON RPC + +[JSON RPC](http://www.jsonrpc.org) is "A light weight remote procedure call protocol". It allows for the creation of simple RPC-style APIs with human-readable messages that are front-end friendly. + +## Using JSON RPC with Go-Kit +Using JSON RPC and go-kit together is quite simple. + +A JSON RPC _server_ acts as an [HTTP Handler](https://godoc.org/net/http#Handler), receiving all requests to the JSON RPC's URL. The server looks at the `method` property of the [Request Object](http://www.jsonrpc.org/specification#request_object), and routes it to the corresponding code. + +Each JSON RPC _method_ is implemented as an `EndpointCodec`, a go-kit [Endpoint](https://godoc.org/github.com/go-kit/kit/endpoint#Endpoint), sandwiched between a decoder and encoder. The decoder picks apart the JSON RPC request params, which can be passed to your endpoint. The encoder receives the output from the endpoint and encodes a JSON-RPC result. + +## Example — Add Service +Let's say we want a service that adds two ints together. We'll serve this at `http://localhost/rpc`. So a request to our `sum` method will be a POST to `http://localhost/rpc` with a request body of: + + { + "id": 123, + "jsonrpc": "2.0", + "method": "sum", + "params": { + "A": 2, + "B": 2 + } + } + +### `EndpointCodecMap` +The routing table for incoming JSON RPC requests is the `EndpointCodecMap`. The key of the map is the JSON RPC method name. Here, we're routing the `sum` method to an `EndpointCodec` wrapped around `sumEndpoint`. + + jsonrpc.EndpointCodecMap{ + "sum": jsonrpc.EndpointCodec{ + Endpoint: sumEndpoint, + Decode: decodeSumRequest, + Encode: encodeSumResponse, + }, + } + +### Decoder + type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error) + +A `DecodeRequestFunc` is given the raw JSON from the `params` property of the Request object, _not_ the whole request object. It returns an object that will be the input to the Endpoint. For our purposes, the output should be a SumRequest, like this: + + type SumRequest struct { + A, B int + } + +So here's our decoder: + + func decodeSumRequest(ctx context.Context, msg json.RawMessage) (interface{}, error) { + var req SumRequest + err := json.Unmarshal(msg, &req) + if err != nil { + return nil, err + } + return req, nil + } + +So our `SumRequest` will now be passed to the endpoint. Once the endpoint has done its work, we hand over to the… + +### Encoder +The encoder takes the output of the endpoint, and builds the raw JSON message that will form the `result` field of a [Response Object](http://www.jsonrpc.org/specification#response_object). Our result is going to be a plain int. Here's our encoder: + + func encodeSumResponse(ctx context.Context, result interface{}) (json.RawMessage, error) { + sum, ok := result.(int) + if !ok { + return nil, errors.New("result is not an int") + } + b, err := json.Marshal(sum) + if err != nil { + return nil, err + } + return b, nil + } + +### Server +Now that we have an EndpointCodec with decoder, endpoint, and encoder, we can wire up the server: + + handler := jsonrpc.NewServer(jsonrpc.EndpointCodecMap{ + "sum": jsonrpc.EndpointCodec{ + Endpoint: sumEndpoint, + Decode: decodeSumRequest, + Encode: encodeSumResponse, + }, + }) + http.Handle("/v1/rpc", handler) + http.ListenAndServe(":80", nil) + +With all of this done, our example request above should result in a response like this: + + { + "jsonrpc": "2.0", + "result": 4, + "error": null + } \ No newline at end of file From f0cd734b0edab8f23678305b6c0b555c149ea293 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 13 Oct 2017 16:50:25 +0100 Subject: [PATCH 18/40] Wire in JSON RPC addsvc. --- examples/addsvc/cmd/addsvc/addsvc.go | 25 +++++++-- examples/addsvc/pkg/addtransport/jsonrpc.go | 58 +++++++++++++++++++++ transport/http/jsonrpc/error.go | 13 +++++ 3 files changed, 91 insertions(+), 5 deletions(-) create mode 100644 examples/addsvc/pkg/addtransport/jsonrpc.go diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go index b1886e2f7..71fe836b7 100644 --- a/examples/addsvc/cmd/addsvc/addsvc.go +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -42,6 +42,7 @@ func main() { httpAddr = fs.String("http-addr", ":8081", "HTTP listen address") grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address") thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address") + jsonRPCAddr = fs.String("jsonrpc-addr", ":8084", "JSON RPC listen address") thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") @@ -135,11 +136,12 @@ func main() { // the interfaces that the transports expect. Note that we're not binding // them to ports or anything yet; we'll do that next. var ( - service = addservice.New(logger, ints, chars) - endpoints = addendpoint.New(service, logger, duration, tracer) - httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) - grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) - thriftServer = addtransport.NewThriftServer(endpoints) + service = addservice.New(logger, ints, chars) + endpoints = addendpoint.New(service, logger, duration, tracer) + httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) + grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) + thriftServer = addtransport.NewThriftServer(endpoints) + jsonrpcHandler = addtransport.NewJSONRPCHandler(endpoints, logger) ) // Now we're to the part of the func main where we want to start actually @@ -244,6 +246,19 @@ func main() { thriftSocket.Close() }) } + { + httpListener, err := net.Listen("tcp", *jsonRPCAddr) + if err != nil { + logger.Log("transport", "JSONRPC over HTTP", "during", "Listen", "err", err) + os.Exit(1) + } + g.Add(func() error { + logger.Log("transport", "JSONRPC over HTTP", "addr", *jsonRPCAddr) + return http.Serve(httpListener, jsonrpcHandler) + }, func(error) { + httpListener.Close() + }) + } { // This function just sits and waits for ctrl-C. cancelInterrupt := make(chan struct{}) diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go new file mode 100644 index 000000000..c7a077fe7 --- /dev/null +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -0,0 +1,58 @@ +package addtransport + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/transport/http/jsonrpc" +) + +// NewJSONRPCHandler returns a JSON RPC Server/Handler that can be passed to http.Handle() +func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Server { + handler := jsonrpc.NewServer( + makeEndpointCodecMap(endpoints), + jsonrpc.ServerErrorLogger(logger), + ) + return handler +} + +// makeEndpointCodecMap returns a codec map configured for the addsvc. +func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap { + return jsonrpc.EndpointCodecMap{ + "sum": jsonrpc.EndpointCodec{ + Endpoint: endpoints.SumEndpoint, + Decode: decodeSumRequest, + Encode: encodeSumResponse, + }, + } +} + +func decodeSumRequest(_ context.Context, msg json.RawMessage) (interface{}, error) { + var req addendpoint.SumRequest + err := json.Unmarshal(msg, &req) + if err != nil { + return nil, &jsonrpc.Error{ + Code: -32000, + Message: fmt.Sprintf("couldn't unmarshal body to sum request: %s", err), + } + } + return req, nil +} + +func encodeSumResponse(_ context.Context, obj interface{}) (json.RawMessage, error) { + res, ok := obj.(addendpoint.SumResponse) + if !ok { + return nil, &jsonrpc.Error{ + Code: -32000, + Message: fmt.Sprintf("Asserting result to *SumResponse failed. Got %T, %+v", obj, obj), + } + } + b, err := json.Marshal(res) + if err != nil { + return nil, fmt.Errorf("couldn't marshal response: %s", err) + } + return b, nil +} diff --git a/transport/http/jsonrpc/error.go b/transport/http/jsonrpc/error.go index 783c35917..a276271ba 100644 --- a/transport/http/jsonrpc/error.go +++ b/transport/http/jsonrpc/error.go @@ -9,6 +9,19 @@ type Error struct { Data interface{} `json:"data,omitempty"` } +// Error implements error. +func (e Error) Error() string { + if e.Message != "" { + return e.Message + } + return errorMessage[e.Code] +} + +// ErrorCode returns the JSON RPC error code associated with the error. +func (e Error) ErrorCode() int { + return e.Code +} + const ( // ParseError defines invalid JSON was received by the server. // An error occurred on the server while parsing the JSON text. From 18442f2197f8269c62242286145d19237fd0d96e Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 16 Oct 2017 09:38:24 +0100 Subject: [PATCH 19/40] Add JSON RPC to Addsvc CLI. --- examples/addsvc/cmd/addcli/addcli.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index e2f0f94d0..938eb5324 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -34,9 +34,10 @@ func main() { // see profilesvc. fs := flag.NewFlagSet("addcli", flag.ExitOnError) var ( - httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") - grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") - thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") + httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") + grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") + thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") + //jsonRPCAddr = fs.String("jsonrpc-addr", "", "JSON RPC address of addsvc") thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") From 9e5d7eed40543ab05e6ffd03609ad29b4a045263 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Tue, 17 Oct 2017 15:26:22 +0100 Subject: [PATCH 20/40] Set JSONRPC version in responses. --- transport/http/jsonrpc/server.go | 4 +++- transport/http/jsonrpc/server_test.go | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index d9fb0258b..8832d6878 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -146,7 +146,9 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx = f(ctx, w) } - res := Response{} + res := Response{ + JSONRPC: Version, + } // Encode the response from the Endpoint resParams, err := ecm.Encode(ctx, response) diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index d2ea793be..b3e51826d 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -163,6 +163,9 @@ func TestServerHappyPath(t *testing.T) { if err != nil { t.Fatalf("Cant' decode response. err=%s, body=%s", err, buf) } + if r.JSONRPC != jsonrpc.Version { + t.Fatalf("JSONRPC Version: want=%s, got=%s", jsonrpc.Version, r.JSONRPC) + } if r.Error != nil { t.Fatalf("Unxpected error on response: %s", buf) } From e86e36edce2d49e7e5c3f024d543e86de7cc9f48 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 15 Dec 2017 16:31:23 +0000 Subject: [PATCH 21/40] Add JSON RPC client to addcli example. --- examples/addsvc/cmd/addcli/addcli.go | 11 +- examples/addsvc/pkg/addtransport/jsonrpc.go | 55 ++++++ transport/http/jsonrpc/client.go | 176 ++++++++++++++++++ transport/http/jsonrpc/encode_decode.go | 45 ++++- .../http/jsonrpc/request_response_types.go | 6 +- transport/http/jsonrpc/server.go | 14 +- 6 files changed, 280 insertions(+), 27 deletions(-) create mode 100644 transport/http/jsonrpc/client.go diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index 938eb5324..21314efc8 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -34,10 +34,10 @@ func main() { // see profilesvc. fs := flag.NewFlagSet("addcli", flag.ExitOnError) var ( - httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") - grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") - thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") - //jsonRPCAddr = fs.String("jsonrpc-addr", "", "JSON RPC address of addsvc") + httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") + grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") + thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") + jsonRPCAddr = fs.String("jsonrpc-addr", "", "JSON RPC address of addsvc") thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") @@ -103,6 +103,9 @@ func main() { } defer conn.Close() svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) + } else if *jsonRPCAddr != "" { + // TODO: Add tracer + svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, log.NewNopLogger()) } else if *thriftAddr != "" { // It's necessary to do all of this construction in the func main, // because (among other reasons) we need to control the lifecycle of the diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go index c7a077fe7..910e3d1be 100644 --- a/examples/addsvc/pkg/addtransport/jsonrpc.go +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -4,8 +4,11 @@ import ( "context" "encoding/json" "fmt" + "net/url" + "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" "github.com/go-kit/kit/log" "github.com/go-kit/kit/transport/http/jsonrpc" ) @@ -19,6 +22,37 @@ func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Se return handler } +// NewJSONRPCClient returns an addservice backed by a JSON RPC over HTTP server +// living at the remote instance. We expect instance to come from a service +// discovery system, so likely of the form "host:port". We bake-in certain +// middlewares, implementing the client library pattern. +func NewJSONRPCClient(instance string, logger log.Logger) (addservice.Service, error) { + tgt, err := url.Parse(instance) + if err != nil { + return nil, err + } + + var sumEndpoint endpoint.Endpoint + { + c := jsonrpc.NewClient(tgt, "sum", encodeSumRequest, decodeSumResponse) + sumEndpoint = c.Endpoint() + // TODO: Add middlewares. + } + + var concatEndpoint endpoint.Endpoint + { + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + }, nil + +} + // makeEndpointCodecMap returns a codec map configured for the addsvc. func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap { return jsonrpc.EndpointCodecMap{ @@ -56,3 +90,24 @@ func encodeSumResponse(_ context.Context, obj interface{}) (json.RawMessage, err } return b, nil } + +func decodeSumResponse(_ context.Context, msg json.RawMessage) (interface{}, error) { + var res addendpoint.SumResponse + err := json.Unmarshal(msg, &res) + if err != nil { + return nil, fmt.Errorf("couldn't unmarshal body to SumResponse: %s", err) + } + return res, nil +} + +func encodeSumRequest(_ context.Context, obj interface{}) (json.RawMessage, error) { + req, ok := obj.(addendpoint.SumRequest) + if !ok { + return nil, fmt.Errorf("couldn't assert request as SumRequest, got %T", obj) + } + b, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("couldn't marshal request: %s", err) + } + return b, nil +} diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go new file mode 100644 index 000000000..f2f999704 --- /dev/null +++ b/transport/http/jsonrpc/client.go @@ -0,0 +1,176 @@ +package jsonrpc + +import ( + "bytes" + "context" + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + + "github.com/go-kit/kit/endpoint" + httptransport "github.com/go-kit/kit/transport/http" +) + +// Client wraps a JSON RPC method and provides a method that implements endpoint.Endpoint. +type Client struct { + client *http.Client + + // JSON RPC endpoint URL + tgt *url.URL + + // JSON RPC method name. + method string + + enc EncodeRequestFunc + dec DecodeResponseFunc + before []httptransport.RequestFunc + after []httptransport.ClientResponseFunc + finalizer httptransport.ClientFinalizerFunc + bufferedStream bool +} + +// NewClient constructs a usable Client for a single remote method. +func NewClient( + tgt *url.URL, + method string, + enc EncodeRequestFunc, + dec DecodeResponseFunc, + options ...ClientOption, +) *Client { + c := &Client{ + client: http.DefaultClient, + method: method, + tgt: tgt, + enc: enc, + dec: dec, + before: []httptransport.RequestFunc{}, + after: []httptransport.ClientResponseFunc{}, + bufferedStream: false, + } + for _, option := range options { + option(c) + } + return c +} + +// ClientOption sets an optional parameter for clients. +type ClientOption func(*Client) + +// SetClient sets the underlying HTTP client used for requests. +// By default, http.DefaultClient is used. +func SetClient(client *http.Client) ClientOption { + return func(c *Client) { c.client = client } +} + +// ClientBefore sets the RequestFuncs that are applied to the outgoing HTTP +// request before it's invoked. +func ClientBefore(before ...httptransport.RequestFunc) ClientOption { + return func(c *Client) { c.before = append(c.before, before...) } +} + +// ClientAfter sets the ClientResponseFuncs applied to the incoming HTTP +// request prior to it being decoded. This is useful for obtaining anything off +// of the response and adding onto the context prior to decoding. +func ClientAfter(after ...httptransport.ClientResponseFunc) ClientOption { + return func(c *Client) { c.after = append(c.after, after...) } +} + +// ClientFinalizer is executed at the end of every HTTP request. +// By default, no finalizer is registered. +func ClientFinalizer(f httptransport.ClientFinalizerFunc) ClientOption { + return func(s *Client) { s.finalizer = f } +} + +// BufferedStream sets whether the Response.Body is left open, allowing it +// to be read from later. Useful for transporting a file as a buffered stream. +func BufferedStream(buffered bool) ClientOption { + return func(c *Client) { c.bufferedStream = buffered } +} + +func (c Client) requestID() *RequestID { + return &RequestID{ + // TODO: Auto-increment. + intValue: 1, + } +} + +// Endpoint returns a usable endpoint that invokes the remote endpoint. +func (c Client) Endpoint() endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var ( + resp *http.Response + err error + ) + if c.finalizer != nil { + defer func() { + if resp != nil { + ctx = context.WithValue(ctx, httptransport.ContextKeyResponseHeaders, resp.Header) + ctx = context.WithValue(ctx, httptransport.ContextKeyResponseSize, resp.ContentLength) + } + c.finalizer(ctx, err) + }() + } + + var params json.RawMessage + if params, err = c.enc(ctx, request); err != nil { + return nil, err + } + rpcReq := Request{ + JSONRPC: "", + Method: c.method, + Params: params, + ID: c.requestID(), + } + + req, err := http.NewRequest("POST", c.tgt.String(), nil) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json; charset=utf-8") + var b bytes.Buffer + req.Body = ioutil.NopCloser(&b) + err = json.NewEncoder(&b).Encode(rpcReq) + if err != nil { + return nil, err + } + + for _, f := range c.before { + ctx = f(ctx, req) + } + + resp, err = c.client.Do(req.WithContext(ctx)) + if err != nil { + return nil, err + } + + if !c.bufferedStream { + defer resp.Body.Close() + } + + // Decode the body into an object + var rpcRes Response + err = json.NewDecoder(resp.Body).Decode(&rpcRes) + if err != nil { + return nil, err + } + + for _, f := range c.after { + ctx = f(ctx, resp) + } + + return c.dec(ctx, rpcRes.Result) + } +} + +// ClientFinalizerFunc can be used to perform work at the end of a client HTTP +// request, after the response is returned. The principal +// intended use is for error logging. Additional response parameters are +// provided in the context under keys with the ContextKeyResponse prefix. +// Note: err may be nil. There maybe also no additional response parameters depending on +// when an error occurs. +type ClientFinalizerFunc func(ctx context.Context, err error) diff --git a/transport/http/jsonrpc/encode_decode.go b/transport/http/jsonrpc/encode_decode.go index 49e80b26b..ab7612e5b 100644 --- a/transport/http/jsonrpc/encode_decode.go +++ b/transport/http/jsonrpc/encode_decode.go @@ -3,17 +3,46 @@ package jsonrpc import ( "encoding/json" + "github.com/go-kit/kit/endpoint" + "context" ) -// DecodeRequestFunc extracts a user-domain request object from an HTTP -// request object. It's designed to be used in HTTP servers, for server-side -// endpoints. One straightforward DecodeRequestFunc could be something that -// JSON decodes from the request body to the concrete response type. +// Server-Side Codec + +// EndpointCodec defines a server Endpoint and its associated codecs +type EndpointCodec struct { + Endpoint endpoint.Endpoint + Decode DecodeRequestFunc + Encode EncodeResponseFunc +} + +// EndpointCodecMap maps the Request.Method to the proper EndpointCodec +type EndpointCodecMap map[string]EndpointCodec + +// DecodeRequestFunc extracts a user-domain request object from an raw JSON +// It's designed to be used in HTTP servers, for server-side endpoints. +// One straightforward DecodeRequestFunc could be something that unmarshals +// JSON from the request body to the concrete request type. type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error) -// EncodeResponseFunc encodes the passed response object to the HTTP response -// writer. It's designed to be used in HTTP servers, for server-side -// endpoints. One straightforward EncodeResponseFunc could be something that -// JSON encodes the object directly to the response body. +// EncodeResponseFunc encodes the passed response object to a JSON RPC response. +// It's designed to be used in HTTP servers, for server-side endpoints. +// One straightforward EncodeResponseFunc could be something that JSON encodes +// the object directly. type EncodeResponseFunc func(context.Context, interface{}) (response json.RawMessage, err error) + +// Client-Side Codec + +// EncodeRequestFunc encodes the passed request object to raw JSON. +// It's designed to be used in JSON RPC clients, for client-side +// endpoints. One straightforward EncodeResponseFunc could be something that +// JSON encodes the object directly. +type EncodeRequestFunc func(context.Context, interface{}) (request json.RawMessage, err error) + +// DecodeResponseFunc extracts a user-domain response object from an HTTP +// request object. It's designed to be used in JSON RPC clients, for +// client-side endpoints. One straightforward DecodeRequestFunc could be +// something that JSON decodes from the request body to the concrete +// response type. +type DecodeResponseFunc func(context.Context, json.RawMessage) (response interface{}, err error) diff --git a/transport/http/jsonrpc/request_response_types.go b/transport/http/jsonrpc/request_response_types.go index 474158e63..8ea7ddc38 100644 --- a/transport/http/jsonrpc/request_response_types.go +++ b/transport/http/jsonrpc/request_response_types.go @@ -56,9 +56,9 @@ func (id *RequestID) String() (string, error) { // Response defines a JSON RPC response from the spec // http://www.jsonrpc.org/specification#response_object type Response struct { - JSONRPC string `json:"jsonrpc"` - Result interface{} `json:"result,omitempty"` - Error *Error `json:"error,omitemty"` + JSONRPC string `json:"jsonrpc"` + Result json.RawMessage `json:"result,omitempty"` + Error *Error `json:"error,omitemty"` } const ( diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 8832d6878..3dcf34459 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -7,7 +7,6 @@ import ( "io" "net/http" - "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/log" httptransport "github.com/go-kit/kit/transport/http" ) @@ -39,22 +38,12 @@ func NewServer( return s } -// EndpointCodec defines and Endpoint and its associated codecs -type EndpointCodec struct { - Endpoint endpoint.Endpoint - Decode DecodeRequestFunc - Encode EncodeResponseFunc -} - -// EndpointCodecMap maps the Request.Method to the proper EndpointCodec -type EndpointCodecMap map[string]EndpointCodec - // ServerOption sets an optional parameter for servers. type ServerOption func(*Server) // ServerBefore functions are executed on the HTTP request object before the -// request is decoded. func ServerBefore(before ...httptransport.RequestFunc) ServerOption { + // request is decoded. return func(s *Server) { s.before = append(s.before, before...) } } @@ -89,6 +78,7 @@ func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption { // ServeHTTP implements http.Handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + fmt.Println("HERE!") if r.Method != http.MethodPost { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) From 1bcf828008e4563dbe661df36a81e9e4c084d0c0 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 15 Dec 2017 16:41:34 +0000 Subject: [PATCH 22/40] Wire in client middlewares for JSON RPC addsvc example. --- examples/addsvc/cmd/addcli/addcli.go | 3 +- examples/addsvc/pkg/addtransport/jsonrpc.go | 38 ++++++++++++++++++--- transport/http/jsonrpc/server.go | 1 - 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/examples/addsvc/cmd/addcli/addcli.go b/examples/addsvc/cmd/addcli/addcli.go index 21314efc8..f2110a0a3 100644 --- a/examples/addsvc/cmd/addcli/addcli.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -104,8 +104,7 @@ func main() { defer conn.Close() svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) } else if *jsonRPCAddr != "" { - // TODO: Add tracer - svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, log.NewNopLogger()) + svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, tracer, log.NewNopLogger()) } else if *thriftAddr != "" { // It's necessary to do all of this construction in the func main, // because (among other reasons) we need to control the lifecycle of the diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go index 910e3d1be..3b3112eaf 100644 --- a/examples/addsvc/pkg/addtransport/jsonrpc.go +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -5,12 +5,20 @@ import ( "encoding/json" "fmt" "net/url" + "strings" + "time" + "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addservice" "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" "github.com/go-kit/kit/transport/http/jsonrpc" + jujuratelimit "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" ) // NewJSONRPCHandler returns a JSON RPC Server/Handler that can be passed to http.Handle() @@ -26,17 +34,37 @@ func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Se // living at the remote instance. We expect instance to come from a service // discovery system, so likely of the form "host:port". We bake-in certain // middlewares, implementing the client library pattern. -func NewJSONRPCClient(instance string, logger log.Logger) (addservice.Service, error) { - tgt, err := url.Parse(instance) +func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { + // Quickly sanitize the instance string. + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + u, err := url.Parse(instance) if err != nil { return nil, err } + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + var sumEndpoint endpoint.Endpoint { - c := jsonrpc.NewClient(tgt, "sum", encodeSumRequest, decodeSumResponse) - sumEndpoint = c.Endpoint() - // TODO: Add middlewares. + sumEndpoint = jsonrpc.NewClient( + u, + "sum", + encodeSumRequest, + decodeSumResponse, + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) } var concatEndpoint endpoint.Endpoint diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 3dcf34459..8f5460e2e 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -78,7 +78,6 @@ func ServerFinalizer(f httptransport.ServerFinalizerFunc) ServerOption { // ServeHTTP implements http.Handler. func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { - fmt.Println("HERE!") if r.Method != http.MethodPost { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) From 9b31488f24b5a2ba9ba54275bbb860964455b008 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 15 Dec 2017 17:10:12 +0000 Subject: [PATCH 23/40] Fix rate limiter dependency. --- examples/addsvc/pkg/addtransport/jsonrpc.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go index 3b3112eaf..5db50a44d 100644 --- a/examples/addsvc/pkg/addtransport/jsonrpc.go +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -8,6 +8,8 @@ import ( "strings" "time" + "golang.org/x/time/rate" + "github.com/go-kit/kit/circuitbreaker" "github.com/go-kit/kit/endpoint" "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" @@ -16,7 +18,6 @@ import ( "github.com/go-kit/kit/ratelimit" "github.com/go-kit/kit/tracing/opentracing" "github.com/go-kit/kit/transport/http/jsonrpc" - jujuratelimit "github.com/juju/ratelimit" stdopentracing "github.com/opentracing/opentracing-go" "github.com/sony/gobreaker" ) @@ -49,7 +50,7 @@ func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log. // construct per-endpoint circuitbreaker middlewares to demonstrate how // that's done, although they could easily be combined into a single breaker // for the entire remote instance, too. - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100)) var sumEndpoint endpoint.Endpoint { From 7d4a753b00a83b96c924475fcc28bbd8fa329cd2 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 5 Jan 2018 16:03:47 +0000 Subject: [PATCH 24/40] Add concat JSON RPC method. --- examples/addsvc/pkg/addtransport/jsonrpc.go | 65 +++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go index 5db50a44d..2fc1b8b55 100644 --- a/examples/addsvc/pkg/addtransport/jsonrpc.go +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -70,6 +70,18 @@ func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log. var concatEndpoint endpoint.Endpoint { + concatEndpoint = jsonrpc.NewClient( + u, + "concat", + encodeConcatRequest, + decodeConcatResponse, + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 30 * time.Second, + }))(concatEndpoint) } // Returning the endpoint.Set as a service.Service relies on the @@ -90,6 +102,11 @@ func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap { Decode: decodeSumRequest, Encode: encodeSumResponse, }, + "concat": jsonrpc.EndpointCodec{ + Endpoint: endpoints.ConcatEndpoint, + Decode: decodeConcatRequest, + Encode: encodeConcatResponse, + }, } } @@ -140,3 +157,51 @@ func encodeSumRequest(_ context.Context, obj interface{}) (json.RawMessage, erro } return b, nil } + +func decodeConcatRequest(_ context.Context, msg json.RawMessage) (interface{}, error) { + var req addendpoint.ConcatRequest + err := json.Unmarshal(msg, &req) + if err != nil { + return nil, &jsonrpc.Error{ + Code: -32000, + Message: fmt.Sprintf("couldn't unmarshal body to concat request: %s", err), + } + } + return req, nil +} + +func encodeConcatResponse(_ context.Context, obj interface{}) (json.RawMessage, error) { + res, ok := obj.(addendpoint.ConcatResponse) + if !ok { + return nil, &jsonrpc.Error{ + Code: -32000, + Message: fmt.Sprintf("Asserting result to *ConcatResponse failed. Got %T, %+v", obj, obj), + } + } + b, err := json.Marshal(res) + if err != nil { + return nil, fmt.Errorf("couldn't marshal response: %s", err) + } + return b, nil +} + +func decodeConcatResponse(_ context.Context, msg json.RawMessage) (interface{}, error) { + var res addendpoint.ConcatResponse + err := json.Unmarshal(msg, &res) + if err != nil { + return nil, fmt.Errorf("couldn't unmarshal body to ConcatResponse: %s", err) + } + return res, nil +} + +func encodeConcatRequest(_ context.Context, obj interface{}) (json.RawMessage, error) { + req, ok := obj.(addendpoint.ConcatRequest) + if !ok { + return nil, fmt.Errorf("couldn't assert request as ConcatRequest, got %T", obj) + } + b, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("couldn't marshal request: %s", err) + } + return b, nil +} From d8a8b11122e07f69f51c332243a433be485cc31b Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 5 Jan 2018 16:45:49 +0000 Subject: [PATCH 25/40] Improve JSON RPC server test coverage. --- transport/http/jsonrpc/error.go | 27 ++++++++---- transport/http/jsonrpc/server.go | 5 ++- transport/http/jsonrpc/server_test.go | 62 ++++++++++++++++++++++++++- 3 files changed, 83 insertions(+), 11 deletions(-) diff --git a/transport/http/jsonrpc/error.go b/transport/http/jsonrpc/error.go index a276271ba..f3b9e3a3e 100644 --- a/transport/http/jsonrpc/error.go +++ b/transport/http/jsonrpc/error.go @@ -54,15 +54,21 @@ func ErrorMessage(code int) string { return errorMessage[code] } -type parseError struct{} +type parseError string -func (e *parseError) ErrorCode() int { +func (e parseError) Error() string { + return string(e) +} +func (e parseError) ErrorCode() int { return ParseError } -type invalidRequestError struct{} +type invalidRequestError string -func (e *invalidRequestError) ErrorCode() int { +func (e invalidRequestError) Error() string { + return string(e) +} +func (e invalidRequestError) ErrorCode() int { return InvalidRequestError } @@ -71,19 +77,24 @@ type methodNotFoundError string func (e methodNotFoundError) Error() string { return string(e) } - func (e methodNotFoundError) ErrorCode() int { return MethodNotFoundError } -type invalidParamsError struct{} +type invalidParamsError string +func (e invalidParamsError) Error() string { + return string(e) +} func (e invalidParamsError) ErrorCode() int { return InvalidParamsError } -type internalError struct{} +type internalError string -func (e *internalError) ErrorCode() int { +func (e internalError) Error() string { + return string(e) +} +func (e internalError) ErrorCode() int { return InternalError } diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 8f5460e2e..88df7c2e3 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -100,8 +100,9 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { var req Request err := json.NewDecoder(r.Body).Decode(&req) if err != nil { - s.logger.Log("err", err) - s.errorEncoder(ctx, err, w) + rpcerr := parseError("JSON could not be decoded: " + err.Error()) + s.logger.Log("err", rpcerr) + s.errorEncoder(ctx, rpcerr, w) return } diff --git a/transport/http/jsonrpc/server_test.go b/transport/http/jsonrpc/server_test.go index b3e51826d..d7960fe05 100644 --- a/transport/http/jsonrpc/server_test.go +++ b/transport/http/jsonrpc/server_test.go @@ -17,7 +17,11 @@ import ( ) func addBody() io.Reader { - return strings.NewReader(`{"jsonrpc": "2.0", "method": "add", "params": [3, 2], "id": 1}`) + return body(`{"jsonrpc": "2.0", "method": "add", "params": [3, 2], "id": 1}`) +} + +func body(in string) io.Reader { + return strings.NewReader(in) } func expectErrorCode(t *testing.T, want int, body []byte) { @@ -136,6 +140,30 @@ func TestServerErrorEncoder(t *testing.T) { } } +func TestCanRejectNonPostRequest(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{} + handler := jsonrpc.NewServer(ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Get(server.URL) + if want, have := http.StatusMethodNotAllowed, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } +} + +func TestCanRejectInvalidJSON(t *testing.T) { + ecm := jsonrpc.EndpointCodecMap{} + handler := jsonrpc.NewServer(ecm) + server := httptest.NewServer(handler) + defer server.Close() + resp, _ := http.Post(server.URL, "application/json", body("clearlynotjson")) + if want, have := http.StatusOK, resp.StatusCode; want != have { + t.Errorf("want %d, have %d", want, have) + } + buf, _ := ioutil.ReadAll(resp.Body) + expectErrorCode(t, jsonrpc.ParseError, buf) +} + func TestServerUnregisteredMethod(t *testing.T) { ecm := jsonrpc.EndpointCodecMap{} handler := jsonrpc.NewServer(ecm) @@ -243,6 +271,38 @@ func TestMultipleServerAfter(t *testing.T) { } } +func TestCanFinalize(t *testing.T) { + var done = make(chan struct{}) + var finalizerCalled bool + ecm := jsonrpc.EndpointCodecMap{ + "add": jsonrpc.EndpointCodec{ + Endpoint: endpoint.Nop, + Decode: nopDecoder, + Encode: nopEncoder, + }, + } + handler := jsonrpc.NewServer( + ecm, + jsonrpc.ServerFinalizer(func(ctx context.Context, code int, req *http.Request) { + finalizerCalled = true + close(done) + }), + ) + server := httptest.NewServer(handler) + defer server.Close() + http.Post(server.URL, "application/json", addBody()) // nolint + + select { + case <-done: + case <-time.After(time.Second): + t.Fatal("timeout waiting for finalizer") + } + + if !finalizerCalled { + t.Fatal("Finalizer was not called.") + } +} + func testServer(t *testing.T) (step func(), resp <-chan *http.Response) { var ( stepch = make(chan bool) From 55c8e3fc19375febbd8a44aa2d0098162b5c8f83 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 5 Jan 2018 17:23:49 +0000 Subject: [PATCH 26/40] Add error tests. --- transport/http/jsonrpc/error_test.go | 31 ++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 transport/http/jsonrpc/error_test.go diff --git a/transport/http/jsonrpc/error_test.go b/transport/http/jsonrpc/error_test.go new file mode 100644 index 000000000..efbec2c98 --- /dev/null +++ b/transport/http/jsonrpc/error_test.go @@ -0,0 +1,31 @@ +package jsonrpc + +import "testing" + +func TestErrorsSatisfyError(t *testing.T) { + errs := []interface{}{ + parseError("parseError"), + invalidRequestError("invalidRequestError"), + methodNotFoundError("methodNotFoundError"), + invalidParamsError("invalidParamsError"), + internalError("internalError"), + } + for _, e := range errs { + err, ok := e.(error) + if !ok { + t.Fatalf("Couldn't assert %s as error.", e) + } + errString := err.Error() + if errString == "" { + t.Fatal("Empty error string") + } + + ec, ok := e.(ErrorCoder) + if !ok { + t.Fatalf("Couldn't assert %s as ErrorCoder.", e) + } + if ErrorMessage(ec.ErrorCode()) == "" { + t.Fatalf("Error type %s returned code of %d, which does not map to error string", e, ec.ErrorCode()) + } + } +} From f90631bd5bb26f6b2e9f31c40da0f7a6639f755e Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 8 Jan 2018 09:13:52 +0000 Subject: [PATCH 27/40] Clarify ErrorCoder in comment. --- transport/http/jsonrpc/server.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index 88df7c2e3..cda85892e 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -184,8 +184,10 @@ func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { } // ErrorCoder is checked by DefaultErrorEncoder. If an error value implements -// ErrorCoder, the Error will be used when encoding the error. By default, -// InternalError (-32603) is used. +// ErrorCoder, the integer result of ErrorCode() will be used as the JSONRPC +// error code when encoding the error. +// +// By default, InternalError (-32603) is used. type ErrorCoder interface { ErrorCode() int } From bd64ffe9688e85b04389d5efa35a7a290864bb34 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 8 Jan 2018 09:20:53 +0000 Subject: [PATCH 28/40] Make endpoint consistent in README. --- transport/http/jsonrpc/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/transport/http/jsonrpc/README.md b/transport/http/jsonrpc/README.md index 6bbd64d6d..29b0ac939 100644 --- a/transport/http/jsonrpc/README.md +++ b/transport/http/jsonrpc/README.md @@ -80,7 +80,7 @@ Now that we have an EndpointCodec with decoder, endpoint, and encoder, we can wi Encode: encodeSumResponse, }, }) - http.Handle("/v1/rpc", handler) + http.Handle("/rpc", handler) http.ListenAndServe(":80", nil) With all of this done, our example request above should result in a response like this: @@ -89,4 +89,4 @@ With all of this done, our example request above should result in a response lik "jsonrpc": "2.0", "result": 4, "error": null - } \ No newline at end of file + } From f391b57ed1da853f7d5b6604d7b5d81cc69c2e36 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 8 Jan 2018 09:23:12 +0000 Subject: [PATCH 29/40] Gofmt handler example in README. --- transport/http/jsonrpc/README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/transport/http/jsonrpc/README.md b/transport/http/jsonrpc/README.md index 29b0ac939..d4140faff 100644 --- a/transport/http/jsonrpc/README.md +++ b/transport/http/jsonrpc/README.md @@ -74,11 +74,11 @@ The encoder takes the output of the endpoint, and builds the raw JSON message th Now that we have an EndpointCodec with decoder, endpoint, and encoder, we can wire up the server: handler := jsonrpc.NewServer(jsonrpc.EndpointCodecMap{ - "sum": jsonrpc.EndpointCodec{ - Endpoint: sumEndpoint, - Decode: decodeSumRequest, - Encode: encodeSumResponse, - }, + "sum": jsonrpc.EndpointCodec{ + Endpoint: sumEndpoint, + Decode: decodeSumRequest, + Encode: encodeSumResponse, + }, }) http.Handle("/rpc", handler) http.ListenAndServe(":80", nil) From 01f4ca85fe124e92948fff47099381c369ed9cf5 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 8 Jan 2018 09:56:35 +0000 Subject: [PATCH 30/40] Auto-increment client IDs. Allow for customisation. --- transport/http/jsonrpc/client.go | 42 +++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index f2f999704..e58031507 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -27,6 +27,7 @@ type Client struct { before []httptransport.RequestFunc after []httptransport.ClientResponseFunc finalizer httptransport.ClientFinalizerFunc + requestID requestIDGenerator bufferedStream bool } @@ -46,6 +47,7 @@ func NewClient( dec: dec, before: []httptransport.RequestFunc{}, after: []httptransport.ClientResponseFunc{}, + requestID: new(AutoIncrementRequestID), bufferedStream: false, } for _, option := range options { @@ -79,7 +81,19 @@ func ClientAfter(after ...httptransport.ClientResponseFunc) ClientOption { // ClientFinalizer is executed at the end of every HTTP request. // By default, no finalizer is registered. func ClientFinalizer(f httptransport.ClientFinalizerFunc) ClientOption { - return func(s *Client) { s.finalizer = f } + return func(c *Client) { c.finalizer = f } +} + +// requestIDGenerator returns an ID for the request. +type requestIDGenerator interface { + Generate() *RequestID +} + +// ClientRequestIDGenerator is executed before each request to generate an ID +// for the request. +// By default, AutoIncrementRequestID is used. +func ClientRequestIDGenerator(g requestIDGenerator) ClientOption { + return func(c *Client) { c.requestID = g } } // BufferedStream sets whether the Response.Body is left open, allowing it @@ -88,13 +102,6 @@ func BufferedStream(buffered bool) ClientOption { return func(c *Client) { c.bufferedStream = buffered } } -func (c Client) requestID() *RequestID { - return &RequestID{ - // TODO: Auto-increment. - intValue: 1, - } -} - // Endpoint returns a usable endpoint that invokes the remote endpoint. func (c Client) Endpoint() endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { @@ -123,7 +130,7 @@ func (c Client) Endpoint() endpoint.Endpoint { JSONRPC: "", Method: c.method, Params: params, - ID: c.requestID(), + ID: c.requestID.Generate(), } req, err := http.NewRequest("POST", c.tgt.String(), nil) @@ -171,6 +178,19 @@ func (c Client) Endpoint() endpoint.Endpoint { // request, after the response is returned. The principal // intended use is for error logging. Additional response parameters are // provided in the context under keys with the ContextKeyResponse prefix. -// Note: err may be nil. There maybe also no additional response parameters depending on -// when an error occurs. +// Note: err may be nil. There maybe also no additional response parameters +// depending on when an error occurs. type ClientFinalizerFunc func(ctx context.Context, err error) + +// AutoIncrementRequestID is a RequestIDGenerator that generates +// auto-incrementing integer IDs. +type AutoIncrementRequestID int + +// Generate satisfies RequestIDGenerator +func (i *AutoIncrementRequestID) Generate() *RequestID { + id := *i + *i++ + return &RequestID{ + intValue: int(id), + } +} From c1abdf656263ae986129b5cd6af6f8ccc1799a6a Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 2 Feb 2018 15:54:52 +0000 Subject: [PATCH 31/40] Add happy-path test for JSON RPC client. --- transport/http/jsonrpc/client.go | 21 +- transport/http/jsonrpc/client_test.go | 231 ++++++++++++++++++ .../jsonrpc/request_response_types_test.go | 1 - 3 files changed, 249 insertions(+), 4 deletions(-) create mode 100644 transport/http/jsonrpc/client_test.go diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index e58031507..f2cea6a4f 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -56,6 +56,21 @@ func NewClient( return c } +// DefaultRequestEncoder marshals the given request to JSON. +func DefaultRequestEncoder(_ context.Context, req interface{}) (json.RawMessage, error) { + return json.Marshal(req) +} + +// DefaultResponseDecoder unmarshals the given JSON to interface{}. +func DefaultResponseDecoder(_ context.Context, res json.RawMessage) (interface{}, error) { + var result interface{} + err := json.Unmarshal(res, &result) + if err != nil { + return nil, err + } + return result, nil +} + // ClientOption sets an optional parameter for clients. type ClientOption func(*Client) @@ -71,9 +86,9 @@ func ClientBefore(before ...httptransport.RequestFunc) ClientOption { return func(c *Client) { c.before = append(c.before, before...) } } -// ClientAfter sets the ClientResponseFuncs applied to the incoming HTTP -// request prior to it being decoded. This is useful for obtaining anything off -// of the response and adding onto the context prior to decoding. +// ClientAfter sets the ClientResponseFuncs applied to the server's HTTP +// response prior to it being decoded. This is useful for obtaining anything +// from the response and adding onto the context prior to decoding. func ClientAfter(after ...httptransport.ClientResponseFunc) ClientOption { return func(c *Client) { c.after = append(c.after, after...) } } diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go new file mode 100644 index 000000000..94bb17f05 --- /dev/null +++ b/transport/http/jsonrpc/client_test.go @@ -0,0 +1,231 @@ +package jsonrpc_test + +import ( + "context" + "encoding/json" + "io" + "net/http" + "net/http/httptest" + "net/url" + "testing" + + "github.com/go-kit/kit/transport/http/jsonrpc" +) + +type TestResponse struct { + Body io.ReadCloser + String string +} + +func TestCanCallBeforeFunc(t *testing.T) { + called := false + u, _ := url.Parse("http://senseye.io/jsonrpc") + sut := jsonrpc.NewClient( + u, + "add", + nopEncoder, + nopDecoder, + jsonrpc.ClientBefore(func(ctx context.Context, req *http.Request) context.Context { + called = true + return ctx + }), + ) + + sut.Endpoint()(context.TODO(), "foo") + + if !called { + t.Fatal("Expected client before func to be called. Wasn't.") + } +} + +//func TestCanCallAfterFunc(t *testing.T) { +//called := false +//u, _ := url.Parse("http://senseye.io/jsonrpc") +//sut := jsonrpc.NewClient( +//u, +//"add", +//nopEncoder, +//nopDecoder, +//jsonrpc.ClientAfter(func(ctx context.Context, req *http.Response) context.Context { +//called = true +//return ctx +//}), +//) + +//_, err := sut.Endpoint()(context.TODO(), "foo") +//if err != nil { +//t.Fatal(err) +//} + +//if !called { +//t.Fatal("Expected client after func to be called. Wasn't.") +//} +//} + +func TestClientHappyPath(t *testing.T) { + var ( + testbody = `{"jsonrpc":"2.0", "result":5}` + encode = func(_ context.Context, req interface{}) (json.RawMessage, error) { + return json.Marshal(req) + } + decode = func(ctx context.Context, res json.RawMessage) (interface{}, error) { + if ac := ctx.Value("afterCalled"); ac == nil { + t.Fatal("after not called") + } + var result int + err := json.Unmarshal(res, &result) + if err != nil { + return nil, err + } + return result, nil + } + afterFunc = func(ctx context.Context, r *http.Response) context.Context { + return context.WithValue(ctx, "afterCalled", true) + } + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte(testbody)) + })) + + sut := jsonrpc.NewClient( + mustParse(server.URL), + "add", + encode, + decode, + jsonrpc.ClientAfter(afterFunc), + ) + + result, err := sut.Endpoint()(context.Background(), struct{}{}) + if err != nil { + t.Fatal(err) + } + ri, ok := result.(int) + if !ok { + t.Fatalf("result is not int: (%T)%+v", result, result) + } + if ri != 5 { + t.Fatalf("want=5, got=%d", ri) + } +} + +//func TestClientFinalizer(t *testing.T) { +//var ( +//headerKey = "X-Henlo-Lizer" +//headerVal = "Helllo you stinky lizard" +//responseBody = "go eat a fly ugly\n" +//done = make(chan struct{}) +//encode = func(context.Context, *http.Request, interface{}) error { return nil } +//decode = func(_ context.Context, r *http.Response) (interface{}, error) { +//return TestResponse{r.Body, ""}, nil +//} +//) + +//server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +//w.Header().Set(headerKey, headerVal) +//w.Write([]byte(responseBody)) +//})) +//defer server.Close() + +//client := httptransport.NewClient( +//"GET", +//mustParse(server.URL), +//encode, +//decode, +//httptransport.ClientFinalizer(func(ctx context.Context, err error) { +//responseHeader := ctx.Value(httptransport.ContextKeyResponseHeaders).(http.Header) +//if want, have := headerVal, responseHeader.Get(headerKey); want != have { +//t.Errorf("%s: want %q, have %q", headerKey, want, have) +//} + +//responseSize := ctx.Value(httptransport.ContextKeyResponseSize).(int64) +//if want, have := int64(len(responseBody)), responseSize; want != have { +//t.Errorf("response size: want %d, have %d", want, have) +//} + +//close(done) +//}), +//) + +//_, err := client.Endpoint()(context.Background(), struct{}{}) +//if err != nil { +//t.Fatal(err) +//} + +//select { +//case <-done: +//case <-time.After(time.Second): +//t.Fatal("timeout waiting for finalizer") +//} +//} + +//func TestEncodeJSONRequest(t *testing.T) { +//var header http.Header +//var body string + +//server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +//b, err := ioutil.ReadAll(r.Body) +//if err != nil && err != io.EOF { +//t.Fatal(err) +//} +//header = r.Header +//body = string(b) +//})) + +//defer server.Close() + +//serverURL, err := url.Parse(server.URL) + +//if err != nil { +//t.Fatal(err) +//} + +//client := httptransport.NewClient( +//"POST", +//serverURL, +//httptransport.EncodeJSONRequest, +//func(context.Context, *http.Response) (interface{}, error) { return nil, nil }, +//).Endpoint() + +//for _, test := range []struct { +//value interface{} +//body string +//}{ +//{nil, "null\n"}, +//{12, "12\n"}, +//{1.2, "1.2\n"}, +//{true, "true\n"}, +//{"test", "\"test\"\n"}, +//{enhancedRequest{Foo: "foo"}, "{\"foo\":\"foo\"}\n"}, +//} { +//if _, err := client(context.Background(), test.value); err != nil { +//t.Error(err) +//continue +//} + +//if body != test.body { +//t.Errorf("%v: actual %#v, expected %#v", test.value, body, test.body) +//} +//} + +//if _, err := client(context.Background(), enhancedRequest{Foo: "foo"}); err != nil { +//t.Fatal(err) +//} + +//if _, ok := header["X-Edward"]; !ok { +//t.Fatalf("X-Edward value: actual %v, expected %v", nil, []string{"Snowden"}) +//} + +//if v := header.Get("X-Edward"); v != "Snowden" { +//t.Errorf("X-Edward string: actual %v, expected %v", v, "Snowden") +//} +//} + +func mustParse(s string) *url.URL { + u, err := url.Parse(s) + if err != nil { + panic(err) + } + return u +} diff --git a/transport/http/jsonrpc/request_response_types_test.go b/transport/http/jsonrpc/request_response_types_test.go index d39f5e957..7fc4b1671 100644 --- a/transport/http/jsonrpc/request_response_types_test.go +++ b/transport/http/jsonrpc/request_response_types_test.go @@ -25,7 +25,6 @@ func TestCanUnMarshalID(t *testing.T) { var foo interface{} _ = json.Unmarshal([]byte(JSON), &foo) - fmt.Printf("foo = %t %+v\n", foo, foo) err := json.Unmarshal([]byte(JSON), &r) if err != nil { From b6532754bc233bc723b20b252c49115d92ba7d9d Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 2 Feb 2018 17:12:09 +0000 Subject: [PATCH 32/40] Provide default encoder/decoder in JSON RPC client. --- examples/addsvc/pkg/addtransport/jsonrpc.go | 8 +- transport/http/client.go | 4 +- transport/http/jsonrpc/client.go | 18 +- transport/http/jsonrpc/client_test.go | 272 +++++++++----------- 4 files changed, 146 insertions(+), 156 deletions(-) diff --git a/examples/addsvc/pkg/addtransport/jsonrpc.go b/examples/addsvc/pkg/addtransport/jsonrpc.go index 2fc1b8b55..9508e81e1 100644 --- a/examples/addsvc/pkg/addtransport/jsonrpc.go +++ b/examples/addsvc/pkg/addtransport/jsonrpc.go @@ -57,8 +57,8 @@ func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log. sumEndpoint = jsonrpc.NewClient( u, "sum", - encodeSumRequest, - decodeSumResponse, + jsonrpc.ClientRequestEncoder(encodeSumRequest), + jsonrpc.ClientResponseDecoder(decodeSumResponse), ).Endpoint() sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) sumEndpoint = limiter(sumEndpoint) @@ -73,8 +73,8 @@ func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log. concatEndpoint = jsonrpc.NewClient( u, "concat", - encodeConcatRequest, - decodeConcatResponse, + jsonrpc.ClientRequestEncoder(encodeConcatRequest), + jsonrpc.ClientResponseDecoder(decodeConcatResponse), ).Endpoint() concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) concatEndpoint = limiter(concatEndpoint) diff --git a/transport/http/client.go b/transport/http/client.go index f1ca9c3a4..25c078a58 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -143,8 +143,8 @@ func (c Client) Endpoint() endpoint.Endpoint { // request, after the response is returned. The principal // intended use is for error logging. Additional response parameters are // provided in the context under keys with the ContextKeyResponse prefix. -// Note: err may be nil. There maybe also no additional response parameters depending on -// when an error occurs. +// Note: err may be nil. There maybe also no additional response parameters +// depending on when an error occurs. type ClientFinalizerFunc func(ctx context.Context, err error) // EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index f2cea6a4f..7c4873f9f 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -35,16 +35,14 @@ type Client struct { func NewClient( tgt *url.URL, method string, - enc EncodeRequestFunc, - dec DecodeResponseFunc, options ...ClientOption, ) *Client { c := &Client{ client: http.DefaultClient, method: method, tgt: tgt, - enc: enc, - dec: dec, + enc: DefaultRequestEncoder, + dec: DefaultResponseDecoder, before: []httptransport.RequestFunc{}, after: []httptransport.ClientResponseFunc{}, requestID: new(AutoIncrementRequestID), @@ -99,6 +97,18 @@ func ClientFinalizer(f httptransport.ClientFinalizerFunc) ClientOption { return func(c *Client) { c.finalizer = f } } +// ClientRequestEncoder sets the func used to encode the request params to JSON. +// If not set, DefaultRequestEncoder is used. +func ClientRequestEncoder(enc EncodeRequestFunc) ClientOption { + return func(c *Client) { c.enc = enc } +} + +// ClientResponseEncoder sets the func used to decode the response params to JSON. +// If not set, DefaultResponseDecoder is used. +func ClientResponseDecoder(dec DecodeResponseFunc) ClientOption { + return func(c *Client) { c.dec = dec } +} + // requestIDGenerator returns an ID for the request. type requestIDGenerator interface { Generate() *RequestID diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index 94bb17f05..d5b195d85 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "io" + "io/ioutil" "net/http" "net/http/httptest" "net/url" @@ -23,8 +24,6 @@ func TestCanCallBeforeFunc(t *testing.T) { sut := jsonrpc.NewClient( u, "add", - nopEncoder, - nopDecoder, jsonrpc.ClientBefore(func(ctx context.Context, req *http.Request) context.Context { called = true return ctx @@ -38,38 +37,25 @@ func TestCanCallBeforeFunc(t *testing.T) { } } -//func TestCanCallAfterFunc(t *testing.T) { -//called := false -//u, _ := url.Parse("http://senseye.io/jsonrpc") -//sut := jsonrpc.NewClient( -//u, -//"add", -//nopEncoder, -//nopDecoder, -//jsonrpc.ClientAfter(func(ctx context.Context, req *http.Response) context.Context { -//called = true -//return ctx -//}), -//) - -//_, err := sut.Endpoint()(context.TODO(), "foo") -//if err != nil { -//t.Fatal(err) -//} - -//if !called { -//t.Fatal("Expected client after func to be called. Wasn't.") -//} -//} - func TestClientHappyPath(t *testing.T) { var ( - testbody = `{"jsonrpc":"2.0", "result":5}` - encode = func(_ context.Context, req interface{}) (json.RawMessage, error) { + afterCalledKey = "AC" + beforeHeaderKey = "BF" + beforeHeaderValue = "beforeFuncWozEre" + testbody = `{"jsonrpc":"2.0", "result":5}` + requestBody []byte + beforeFunc = func(ctx context.Context, r *http.Request) context.Context { + r.Header.Add(beforeHeaderKey, beforeHeaderValue) + return ctx + } + encode = func(ctx context.Context, req interface{}) (json.RawMessage, error) { return json.Marshal(req) } + afterFunc = func(ctx context.Context, r *http.Response) context.Context { + return context.WithValue(ctx, afterCalledKey, true) + } decode = func(ctx context.Context, res json.RawMessage) (interface{}, error) { - if ac := ctx.Value("afterCalled"); ac == nil { + if ac := ctx.Value(afterCalledKey); ac == nil { t.Fatal("after not called") } var result int @@ -79,12 +65,19 @@ func TestClientHappyPath(t *testing.T) { } return result, nil } - afterFunc = func(ctx context.Context, r *http.Response) context.Context { - return context.WithValue(ctx, "afterCalled", true) - } ) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Header.Get(beforeHeaderKey) != beforeHeaderValue { + t.Fatal("Header not set by before func.") + } + + b, err := ioutil.ReadAll(r.Body) + if err != nil && err != io.EOF { + t.Fatal(err) + } + requestBody = b + w.WriteHeader(http.StatusOK) w.Write([]byte(testbody)) })) @@ -92,12 +85,20 @@ func TestClientHappyPath(t *testing.T) { sut := jsonrpc.NewClient( mustParse(server.URL), "add", - encode, - decode, + jsonrpc.ClientRequestEncoder(encode), + jsonrpc.ClientResponseDecoder(decode), + jsonrpc.ClientBefore(beforeFunc), jsonrpc.ClientAfter(afterFunc), ) - result, err := sut.Endpoint()(context.Background(), struct{}{}) + type addRequest struct { + A int + B int + } + + in := addRequest{2, 2} + + result, err := sut.Endpoint()(context.Background(), in) if err != nil { t.Fatal(err) } @@ -108,119 +109,98 @@ func TestClientHappyPath(t *testing.T) { if ri != 5 { t.Fatalf("want=5, got=%d", ri) } + + var requestAtServer jsonrpc.Request + err = json.Unmarshal(requestBody, &requestAtServer) + if err != nil { + t.Fatal(err) + } + if id, _ := requestAtServer.ID.Int(); id != 0 { + t.Fatalf("Request ID at server: want=0, got=%d", requestAtServer.ID) + } + + var paramsAtServer addRequest + err = json.Unmarshal(requestAtServer.Params, ¶msAtServer) + if err != nil { + t.Fatal(err) + } + + if paramsAtServer != in { + t.Fatalf("want=%+v, got=%+v", in, paramsAtServer) + } } -//func TestClientFinalizer(t *testing.T) { -//var ( -//headerKey = "X-Henlo-Lizer" -//headerVal = "Helllo you stinky lizard" -//responseBody = "go eat a fly ugly\n" -//done = make(chan struct{}) -//encode = func(context.Context, *http.Request, interface{}) error { return nil } -//decode = func(_ context.Context, r *http.Response) (interface{}, error) { -//return TestResponse{r.Body, ""}, nil -//} -//) - -//server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -//w.Header().Set(headerKey, headerVal) -//w.Write([]byte(responseBody)) -//})) -//defer server.Close() - -//client := httptransport.NewClient( -//"GET", -//mustParse(server.URL), -//encode, -//decode, -//httptransport.ClientFinalizer(func(ctx context.Context, err error) { -//responseHeader := ctx.Value(httptransport.ContextKeyResponseHeaders).(http.Header) -//if want, have := headerVal, responseHeader.Get(headerKey); want != have { -//t.Errorf("%s: want %q, have %q", headerKey, want, have) -//} - -//responseSize := ctx.Value(httptransport.ContextKeyResponseSize).(int64) -//if want, have := int64(len(responseBody)), responseSize; want != have { -//t.Errorf("response size: want %d, have %d", want, have) -//} - -//close(done) -//}), -//) - -//_, err := client.Endpoint()(context.Background(), struct{}{}) -//if err != nil { -//t.Fatal(err) -//} - -//select { -//case <-done: -//case <-time.After(time.Second): -//t.Fatal("timeout waiting for finalizer") -//} -//} - -//func TestEncodeJSONRequest(t *testing.T) { -//var header http.Header -//var body string - -//server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { -//b, err := ioutil.ReadAll(r.Body) -//if err != nil && err != io.EOF { -//t.Fatal(err) -//} -//header = r.Header -//body = string(b) -//})) - -//defer server.Close() - -//serverURL, err := url.Parse(server.URL) - -//if err != nil { -//t.Fatal(err) -//} - -//client := httptransport.NewClient( -//"POST", -//serverURL, -//httptransport.EncodeJSONRequest, -//func(context.Context, *http.Response) (interface{}, error) { return nil, nil }, -//).Endpoint() - -//for _, test := range []struct { -//value interface{} -//body string -//}{ -//{nil, "null\n"}, -//{12, "12\n"}, -//{1.2, "1.2\n"}, -//{true, "true\n"}, -//{"test", "\"test\"\n"}, -//{enhancedRequest{Foo: "foo"}, "{\"foo\":\"foo\"}\n"}, -//} { -//if _, err := client(context.Background(), test.value); err != nil { -//t.Error(err) -//continue -//} - -//if body != test.body { -//t.Errorf("%v: actual %#v, expected %#v", test.value, body, test.body) -//} -//} - -//if _, err := client(context.Background(), enhancedRequest{Foo: "foo"}); err != nil { -//t.Fatal(err) -//} - -//if _, ok := header["X-Edward"]; !ok { -//t.Fatalf("X-Edward value: actual %v, expected %v", nil, []string{"Snowden"}) -//} - -//if v := header.Get("X-Edward"); v != "Snowden" { -//t.Errorf("X-Edward string: actual %v, expected %v", v, "Snowden") -//} -//} +func TestCanUseDefaults(t *testing.T) { + var ( + testbody = `{"jsonrpc":"2.0", "result":"boogaloo"}` + requestBody []byte + ) + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + b, err := ioutil.ReadAll(r.Body) + if err != nil && err != io.EOF { + t.Fatal(err) + } + requestBody = b + + w.WriteHeader(http.StatusOK) + w.Write([]byte(testbody)) + })) + + sut := jsonrpc.NewClient( + mustParse(server.URL), + "add", + ) + + type addRequest struct { + A int + B int + } + + in := addRequest{2, 2} + + result, err := sut.Endpoint()(context.Background(), in) + if err != nil { + t.Fatal(err) + } + rs, ok := result.(string) + if !ok { + t.Fatalf("result is not string: (%T)%+v", result, result) + } + if rs != "boogaloo" { + t.Fatalf("want=boogaloo, got=%d", rs) + } + + var requestAtServer jsonrpc.Request + err = json.Unmarshal(requestBody, &requestAtServer) + if err != nil { + t.Fatal(err) + } + var paramsAtServer addRequest + err = json.Unmarshal(requestAtServer.Params, ¶msAtServer) + if err != nil { + t.Fatal(err) + } + + if paramsAtServer != in { + t.Fatalf("want=%+v, got=%+v", in, paramsAtServer) + } +} + +func TestDefaultAutoIncrementer(t *testing.T) { + sut := new(jsonrpc.AutoIncrementRequestID) + for want := 0; want < 100; want++ { + id := sut.Generate() + + got, err := id.Int() + if err != nil { + t.Fatal(err) + } + if got != want { + t.Fatalf("want=%d, got=%d", want, got) + } + } +} func mustParse(s string) *url.URL { u, err := url.Parse(s) From 75f0455685a271440f7e88547b636d51eee79c12 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 14:14:36 +0000 Subject: [PATCH 33/40] Fix comment line. --- transport/http/jsonrpc/server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index cda85892e..cccd4404f 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -42,8 +42,8 @@ func NewServer( type ServerOption func(*Server) // ServerBefore functions are executed on the HTTP request object before the +// request is decoded. func ServerBefore(before ...httptransport.RequestFunc) ServerOption { - // request is decoded. return func(s *Server) { s.before = append(s.before, before...) } } From 6de79e1f8d1a5b6b65ca409816a75d85397368be Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 14:46:10 +0000 Subject: [PATCH 34/40] RequestIDGenerator tidy-up. Make auto-incrementing IDs goroutine safe. Make RequestIDGenerator interface public. --- transport/http/jsonrpc/client.go | 34 +++++++++++++++++---------- transport/http/jsonrpc/client_test.go | 2 +- 2 files changed, 23 insertions(+), 13 deletions(-) diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index 7c4873f9f..10e2a2769 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "net/http" "net/url" + "sync/atomic" "github.com/go-kit/kit/endpoint" httptransport "github.com/go-kit/kit/transport/http" @@ -27,7 +28,7 @@ type Client struct { before []httptransport.RequestFunc after []httptransport.ClientResponseFunc finalizer httptransport.ClientFinalizerFunc - requestID requestIDGenerator + requestID RequestIDGenerator bufferedStream bool } @@ -45,7 +46,7 @@ func NewClient( dec: DefaultResponseDecoder, before: []httptransport.RequestFunc{}, after: []httptransport.ClientResponseFunc{}, - requestID: new(AutoIncrementRequestID), + requestID: NewAutoIncrementID(0), bufferedStream: false, } for _, option := range options { @@ -103,21 +104,21 @@ func ClientRequestEncoder(enc EncodeRequestFunc) ClientOption { return func(c *Client) { c.enc = enc } } -// ClientResponseEncoder sets the func used to decode the response params to JSON. -// If not set, DefaultResponseDecoder is used. +// ClientResponseDecoder sets the func used to decode the response params from +// JSON. If not set, DefaultResponseDecoder is used. func ClientResponseDecoder(dec DecodeResponseFunc) ClientOption { return func(c *Client) { c.dec = dec } } -// requestIDGenerator returns an ID for the request. -type requestIDGenerator interface { +// RequestIDGenerator returns an ID for the request. +type RequestIDGenerator interface { Generate() *RequestID } // ClientRequestIDGenerator is executed before each request to generate an ID // for the request. // By default, AutoIncrementRequestID is used. -func ClientRequestIDGenerator(g requestIDGenerator) ClientOption { +func ClientRequestIDGenerator(g RequestIDGenerator) ClientOption { return func(c *Client) { c.requestID = g } } @@ -207,14 +208,23 @@ func (c Client) Endpoint() endpoint.Endpoint { // depending on when an error occurs. type ClientFinalizerFunc func(ctx context.Context, err error) -// AutoIncrementRequestID is a RequestIDGenerator that generates +// autoIncrementID is a RequestIDGenerator that generates // auto-incrementing integer IDs. -type AutoIncrementRequestID int +type autoIncrementID struct { + v *int32 +} + +// NewAutoIncrementID returns an auto-incrementing request ID generator, +// initialised with the given value. +func NewAutoIncrementID(init int32) RequestIDGenerator { + // Offset by one so that the first generated value = init. + v := init - 1 + return &autoIncrementID{v: &v} +} // Generate satisfies RequestIDGenerator -func (i *AutoIncrementRequestID) Generate() *RequestID { - id := *i - *i++ +func (i *autoIncrementID) Generate() *RequestID { + id := atomic.AddInt32(i.v, 1) return &RequestID{ intValue: int(id), } diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index d5b195d85..314de2b54 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -188,7 +188,7 @@ func TestCanUseDefaults(t *testing.T) { } func TestDefaultAutoIncrementer(t *testing.T) { - sut := new(jsonrpc.AutoIncrementRequestID) + sut := jsonrpc.NewAutoIncrementID(0) for want := 0; want < 100; want++ { id := sut.Generate() From af7f7b5c075fed8fb6deb5319e6f169586b412ca Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 15:31:23 +0000 Subject: [PATCH 35/40] Fix client ID creation. The client had been using the RequestID type in requests. Making this serialize in a deterministic and predictable way was going to be fiddly, so I decided to allow interface{} for IDs, client-side. --- transport/http/jsonrpc/client.go | 23 ++++++++++++++--------- transport/http/jsonrpc/client_test.go | 12 ++++-------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/transport/http/jsonrpc/client.go b/transport/http/jsonrpc/client.go index 10e2a2769..ca57bbf1d 100644 --- a/transport/http/jsonrpc/client.go +++ b/transport/http/jsonrpc/client.go @@ -32,6 +32,13 @@ type Client struct { bufferedStream bool } +type clientRequest struct { + JSONRPC string `json:"jsonrpc"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` + ID interface{} `json:"id"` +} + // NewClient constructs a usable Client for a single remote method. func NewClient( tgt *url.URL, @@ -112,7 +119,7 @@ func ClientResponseDecoder(dec DecodeResponseFunc) ClientOption { // RequestIDGenerator returns an ID for the request. type RequestIDGenerator interface { - Generate() *RequestID + Generate() interface{} } // ClientRequestIDGenerator is executed before each request to generate an ID @@ -152,7 +159,7 @@ func (c Client) Endpoint() endpoint.Endpoint { if params, err = c.enc(ctx, request); err != nil { return nil, err } - rpcReq := Request{ + rpcReq := clientRequest{ JSONRPC: "", Method: c.method, Params: params, @@ -211,21 +218,19 @@ type ClientFinalizerFunc func(ctx context.Context, err error) // autoIncrementID is a RequestIDGenerator that generates // auto-incrementing integer IDs. type autoIncrementID struct { - v *int32 + v *uint64 } // NewAutoIncrementID returns an auto-incrementing request ID generator, // initialised with the given value. -func NewAutoIncrementID(init int32) RequestIDGenerator { +func NewAutoIncrementID(init uint64) RequestIDGenerator { // Offset by one so that the first generated value = init. v := init - 1 return &autoIncrementID{v: &v} } // Generate satisfies RequestIDGenerator -func (i *autoIncrementID) Generate() *RequestID { - id := atomic.AddInt32(i.v, 1) - return &RequestID{ - intValue: int(id), - } +func (i *autoIncrementID) Generate() interface{} { + id := atomic.AddUint64(i.v, 1) + return id } diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index 314de2b54..8c5eb7510 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -116,7 +116,7 @@ func TestClientHappyPath(t *testing.T) { t.Fatal(err) } if id, _ := requestAtServer.ID.Int(); id != 0 { - t.Fatalf("Request ID at server: want=0, got=%d", requestAtServer.ID) + t.Fatalf("Request ID at server: want=0, got=%d", id) } var paramsAtServer addRequest @@ -189,13 +189,9 @@ func TestCanUseDefaults(t *testing.T) { func TestDefaultAutoIncrementer(t *testing.T) { sut := jsonrpc.NewAutoIncrementID(0) - for want := 0; want < 100; want++ { - id := sut.Generate() - - got, err := id.Int() - if err != nil { - t.Fatal(err) - } + var want uint64 + for ; want < 100; want++ { + got := sut.Generate() if got != want { t.Fatalf("want=%d, got=%d", want, got) } From 033ad9cde293b93da97d157a6c70126f27e988fd Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 15:47:41 +0000 Subject: [PATCH 36/40] Test client request ID more effectively. --- transport/http/jsonrpc/client_test.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index 8c5eb7510..2df934b76 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -37,6 +37,10 @@ func TestCanCallBeforeFunc(t *testing.T) { } } +type staticIDGenerator int + +func (g staticIDGenerator) Generate() interface{} { return g } + func TestClientHappyPath(t *testing.T) { var ( afterCalledKey = "AC" @@ -65,6 +69,9 @@ func TestClientHappyPath(t *testing.T) { } return result, nil } + + wantID = 666 + gen = staticIDGenerator(wantID) ) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -89,6 +96,7 @@ func TestClientHappyPath(t *testing.T) { jsonrpc.ClientResponseDecoder(decode), jsonrpc.ClientBefore(beforeFunc), jsonrpc.ClientAfter(afterFunc), + jsonrpc.ClientRequestIDGenerator(gen), ) type addRequest struct { @@ -115,8 +123,8 @@ func TestClientHappyPath(t *testing.T) { if err != nil { t.Fatal(err) } - if id, _ := requestAtServer.ID.Int(); id != 0 { - t.Fatalf("Request ID at server: want=0, got=%d", id) + if id, _ := requestAtServer.ID.Int(); id != wantID { + t.Fatalf("Request ID at server: want=%d, got=%d", wantID, id) } var paramsAtServer addRequest From 422cffae7cbb0fd1d00aee185f99fd19546668c7 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 16:06:58 +0000 Subject: [PATCH 37/40] Cover client options in test. --- transport/http/jsonrpc/client_test.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index 2df934b76..98feee658 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -58,6 +58,10 @@ func TestClientHappyPath(t *testing.T) { afterFunc = func(ctx context.Context, r *http.Response) context.Context { return context.WithValue(ctx, afterCalledKey, true) } + finalizerCalled = false + fin = func(ctx context.Context, err error) { + finalizerCalled = true + } decode = func(ctx context.Context, res json.RawMessage) (interface{}, error) { if ac := ctx.Value(afterCalledKey); ac == nil { t.Fatal("after not called") @@ -97,6 +101,9 @@ func TestClientHappyPath(t *testing.T) { jsonrpc.ClientBefore(beforeFunc), jsonrpc.ClientAfter(afterFunc), jsonrpc.ClientRequestIDGenerator(gen), + jsonrpc.ClientFinalizer(fin), + jsonrpc.SetClient(http.DefaultClient), + jsonrpc.BufferedStream(false), ) type addRequest struct { @@ -136,6 +143,10 @@ func TestClientHappyPath(t *testing.T) { if paramsAtServer != in { t.Fatalf("want=%+v, got=%+v", in, paramsAtServer) } + + if !finalizerCalled { + t.Fatal("Expected finalizer to be called. Wasn't.") + } } func TestCanUseDefaults(t *testing.T) { From 7abfb238c1dedf7e4f8c9ac3e0d92a03f979536e Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Fri, 16 Feb 2018 16:29:37 +0000 Subject: [PATCH 38/40] Improve error test coverage. --- transport/http/jsonrpc/error_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/transport/http/jsonrpc/error_test.go b/transport/http/jsonrpc/error_test.go index efbec2c98..02f8bcf29 100644 --- a/transport/http/jsonrpc/error_test.go +++ b/transport/http/jsonrpc/error_test.go @@ -2,6 +2,29 @@ package jsonrpc import "testing" +func TestError(t *testing.T) { + wantCode := ParseError + sut := Error{ + Code: wantCode, + } + + gotCode := sut.ErrorCode() + if gotCode != wantCode { + t.Fatalf("want=%d, got=%d", gotCode, wantCode) + } + + if sut.Error() == "" { + t.Fatal("Empty error string.") + } + + want := "override" + sut.Message = want + got := sut.Error() + if sut.Error() != want { + t.Fatalf("overridden error message: want=%s, got=%s", want, got) + } + +} func TestErrorsSatisfyError(t *testing.T) { errs := []interface{}{ parseError("parseError"), From 1aecd5e1748837d37439148846ecdfa079c6a1f7 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 19 Feb 2018 21:25:44 +0000 Subject: [PATCH 39/40] Fix format spec in test output. --- transport/http/jsonrpc/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http/jsonrpc/client_test.go b/transport/http/jsonrpc/client_test.go index 98feee658..fe42f239d 100644 --- a/transport/http/jsonrpc/client_test.go +++ b/transport/http/jsonrpc/client_test.go @@ -187,7 +187,7 @@ func TestCanUseDefaults(t *testing.T) { t.Fatalf("result is not string: (%T)%+v", result, result) } if rs != "boogaloo" { - t.Fatalf("want=boogaloo, got=%d", rs) + t.Fatalf("want=boogaloo, got=%s", rs) } var requestAtServer jsonrpc.Request From 124b43050760e6fa9a705cf59e84b2b8753c72d9 Mon Sep 17 00:00:00 2001 From: Ross McFarlane Date: Mon, 19 Feb 2018 21:34:22 +0000 Subject: [PATCH 40/40] Tweaks to satisfy the linter. --- transport/http/jsonrpc/request_response_types_test.go | 3 +++ transport/http/jsonrpc/server.go | 7 +++---- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/transport/http/jsonrpc/request_response_types_test.go b/transport/http/jsonrpc/request_response_types_test.go index 7fc4b1671..4f4abf3d6 100644 --- a/transport/http/jsonrpc/request_response_types_test.go +++ b/transport/http/jsonrpc/request_response_types_test.go @@ -46,6 +46,9 @@ func TestCanUnMarshalID(t *testing.T) { // Allow an int ID to be interpreted as a float. wantf := float32(c.expValue.(int)) gotf, err := id.Float32() + if err != nil { + t.Fatal(err) + } if gotf != wantf { t.Fatalf("'%s' Int value as Float32(): want %f, got %f.", c.JSON, wantf, gotf) } diff --git a/transport/http/jsonrpc/server.go b/transport/http/jsonrpc/server.go index cccd4404f..1b49fe7b5 100644 --- a/transport/http/jsonrpc/server.go +++ b/transport/http/jsonrpc/server.go @@ -13,7 +13,6 @@ import ( // Server wraps an endpoint and implements http.Handler. type Server struct { - ctx context.Context ecm EndpointCodecMap before []httptransport.RequestFunc after []httptransport.ServerResponseFunc @@ -81,7 +80,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.Header().Set("Content-Type", "text/plain; charset=utf-8") w.WriteHeader(http.StatusMethodNotAllowed) - io.WriteString(w, "405 must POST\n") + _, _ = io.WriteString(w, "405 must POST\n") return } ctx := r.Context() @@ -151,7 +150,7 @@ func (s Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { res.Result = resParams w.Header().Set("Content-Type", ContentType) - json.NewEncoder(w).Encode(res) + _ = json.NewEncoder(w).Encode(res) } // DefaultErrorEncoder writes the error to the ResponseWriter, @@ -177,7 +176,7 @@ func DefaultErrorEncoder(_ context.Context, err error, w http.ResponseWriter) { } w.WriteHeader(http.StatusOK) - json.NewEncoder(w).Encode(Response{ + _ = json.NewEncoder(w).Encode(Response{ JSONRPC: Version, Error: &e, })