Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
673600c
first pass at JSON RPC HTTP transport
blainsmith Feb 7, 2017
53b2faa
example implementation of JSON RPC over HTTP
blainsmith Feb 7, 2017
7cfc4f1
Add ID type for JSON RPC Request, with tests.
rossmcf Jul 13, 2017
ed2fbcf
Add basic server testing for JSON RPC.
rossmcf Jul 14, 2017
4adaf39
Handle unregistered JSON RPC methods.
rossmcf Jul 14, 2017
ea68d6a
Package tidy-up.
rossmcf Jul 14, 2017
111107e
Test ServerBefore / ServerAfter for JSON RPC.
rossmcf Jul 14, 2017
0090caa
More JSON RPC tests.
rossmcf Jul 17, 2017
40485ea
Remove JSON RPC from addsvc example, pending full JSON RPC example.
rossmcf Jul 17, 2017
08a25d8
Remove JSON RPC from addsvc example, pending full JSON RPC example.
rossmcf Jul 17, 2017
0588817
Remove context field from jsonrpc.Server.
rossmcf Jul 17, 2017
5be17f5
Add JSON content type to all JSON RPC responses.
rossmcf Jul 20, 2017
bcdbd54
Add JSON content type to all JSON RPC responses.
rossmcf Jul 20, 2017
526b570
Remove client-side JSON RPC funcs for now.
rossmcf Aug 3, 2017
773fed5
Document interceptingWriter
rossmcf Aug 3, 2017
13719b2
Add JSON RPC doc.go.
rossmcf Aug 3, 2017
1f0ab0b
Merge branch 'master' into json-rpc-over-http
rossmcf Aug 3, 2017
dc16302
Add README for JSON RPC.
rossmcf Aug 3, 2017
1d5e97d
Merge branch 'master' into json-rpc-over-http
rossmcf Sep 8, 2017
f0cd734
Wire in JSON RPC addsvc.
rossmcf Oct 13, 2017
18442f2
Add JSON RPC to Addsvc CLI.
rossmcf Oct 16, 2017
9e5d7ee
Set JSONRPC version in responses.
rossmcf Oct 17, 2017
e86e36e
Add JSON RPC client to addcli example.
rossmcf Dec 15, 2017
1bcf828
Wire in client middlewares for JSON RPC addsvc example.
rossmcf Dec 15, 2017
b7d5012
Merge branch 'master' of https://github.com/go-kit/kit into json-rpc-…
rossmcf Dec 15, 2017
9b31488
Fix rate limiter dependency.
rossmcf Dec 15, 2017
7d4a753
Add concat JSON RPC method.
rossmcf Jan 5, 2018
d8a8b11
Improve JSON RPC server test coverage.
rossmcf Jan 5, 2018
138e952
Merge branch 'master' of https://github.com/go-kit/kit into json-rpc-…
rossmcf Jan 5, 2018
55c8e3f
Add error tests.
rossmcf Jan 5, 2018
f90631b
Clarify ErrorCoder in comment.
rossmcf Jan 8, 2018
bd64ffe
Make endpoint consistent in README.
rossmcf Jan 8, 2018
f391b57
Gofmt handler example in README.
rossmcf Jan 8, 2018
01f4ca8
Auto-increment client IDs. Allow for customisation.
rossmcf Jan 8, 2018
c1abdf6
Add happy-path test for JSON RPC client.
rossmcf Feb 2, 2018
b653275
Provide default encoder/decoder in JSON RPC client.
rossmcf Feb 2, 2018
75f0455
Fix comment line.
rossmcf Feb 16, 2018
6de79e1
RequestIDGenerator tidy-up.
rossmcf Feb 16, 2018
af7f7b5
Fix client ID creation.
rossmcf Feb 16, 2018
033ad9c
Test client request ID more effectively.
rossmcf Feb 16, 2018
422cffa
Cover client options in test.
rossmcf Feb 16, 2018
7abfb23
Improve error test coverage.
rossmcf Feb 16, 2018
131ff45
Merge branch 'master' of https://github.com/go-kit/kit into json-rpc-…
rossmcf Feb 16, 2018
1aecd5e
Fix format spec in test output.
rossmcf Feb 19, 2018
124b430
Tweaks to satisfy the linter.
rossmcf Feb 19, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/addsvc/cmd/addcli/addcli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func main() {
httpAddr = fs.String("http-addr", "", "HTTP address of addsvc")
grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc")
thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc")
jsonRPCAddr = fs.String("jsonrpc-addr", "", "JSON RPC address of addsvc")
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
Expand Down Expand Up @@ -102,6 +103,8 @@ func main() {
}
defer conn.Close()
svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger())
} else if *jsonRPCAddr != "" {
svc, err = addtransport.NewJSONRPCClient(*jsonRPCAddr, tracer, log.NewNopLogger())
} else if *thriftAddr != "" {
// It's necessary to do all of this construction in the func main,
// because (among other reasons) we need to control the lifecycle of the
Expand Down
25 changes: 20 additions & 5 deletions examples/addsvc/cmd/addsvc/addsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func main() {
httpAddr = fs.String("http-addr", ":8081", "HTTP listen address")
grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address")
thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address")
jsonRPCAddr = fs.String("jsonrpc-addr", ":8084", "JSON RPC listen address")
thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson")
thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered")
thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing")
Expand Down Expand Up @@ -135,11 +136,12 @@ func main() {
// the interfaces that the transports expect. Note that we're not binding
// them to ports or anything yet; we'll do that next.
var (
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
service = addservice.New(logger, ints, chars)
endpoints = addendpoint.New(service, logger, duration, tracer)
httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger)
grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger)
thriftServer = addtransport.NewThriftServer(endpoints)
jsonrpcHandler = addtransport.NewJSONRPCHandler(endpoints, logger)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would like to see tracing added to this transport too in the addsvc example just like HTTP and gRPC.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll need to ponder how best to do this. As it stands, ServerBefore functions are called prior to decoding the request body. This will mean that we can only trace a request to the JSON RPC endpoint; we won't know which method was called, which would hide some useful detail.

I could move the ServerBefore calls to after decoding the JSON, but I'm not sure if/how this would affect other ServerBefore uses, as I'm not really familiar with the use cases for those.

I suppose the other option would be to introduce a tracing-specific option, but that feels a bit icky. Suggestions welcome!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed, I know how to plug into existing framework.. I'll add tracing independent from this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great. Thank you.

)

// Now we're to the part of the func main where we want to start actually
Expand Down Expand Up @@ -244,6 +246,19 @@ func main() {
thriftSocket.Close()
})
}
{
httpListener, err := net.Listen("tcp", *jsonRPCAddr)
if err != nil {
logger.Log("transport", "JSONRPC over HTTP", "during", "Listen", "err", err)
os.Exit(1)
}
g.Add(func() error {
logger.Log("transport", "JSONRPC over HTTP", "addr", *jsonRPCAddr)
return http.Serve(httpListener, jsonrpcHandler)
}, func(error) {
httpListener.Close()
})
}
{
// This function just sits and waits for ctrl-C.
cancelInterrupt := make(chan struct{})
Expand Down
207 changes: 207 additions & 0 deletions examples/addsvc/pkg/addtransport/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package addtransport

import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"

"golang.org/x/time/rate"

"github.com/go-kit/kit/circuitbreaker"
"github.com/go-kit/kit/endpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addendpoint"
"github.com/go-kit/kit/examples/addsvc/pkg/addservice"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/ratelimit"
"github.com/go-kit/kit/tracing/opentracing"
"github.com/go-kit/kit/transport/http/jsonrpc"
stdopentracing "github.com/opentracing/opentracing-go"
"github.com/sony/gobreaker"
)

// NewJSONRPCHandler returns a JSON RPC Server/Handler that can be passed to http.Handle()
func NewJSONRPCHandler(endpoints addendpoint.Set, logger log.Logger) *jsonrpc.Server {
handler := jsonrpc.NewServer(
makeEndpointCodecMap(endpoints),
jsonrpc.ServerErrorLogger(logger),
)
return handler
}

// NewJSONRPCClient returns an addservice backed by a JSON RPC over HTTP server
// living at the remote instance. We expect instance to come from a service
// discovery system, so likely of the form "host:port". We bake-in certain
// middlewares, implementing the client library pattern.
func NewJSONRPCClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) {
// Quickly sanitize the instance string.
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
u, err := url.Parse(instance)
if err != nil {
return nil, err
}

// We construct a single ratelimiter middleware, to limit the total outgoing
// QPS from this client to all methods on the remote instance. We also
// construct per-endpoint circuitbreaker middlewares to demonstrate how
// that's done, although they could easily be combined into a single breaker
// for the entire remote instance, too.
limiter := ratelimit.NewErroringLimiter(rate.NewLimiter(rate.Every(time.Second), 100))

var sumEndpoint endpoint.Endpoint
{
sumEndpoint = jsonrpc.NewClient(
u,
"sum",
jsonrpc.ClientRequestEncoder(encodeSumRequest),
jsonrpc.ClientResponseDecoder(decodeSumResponse),
).Endpoint()
sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint)
sumEndpoint = limiter(sumEndpoint)
sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Sum",
Timeout: 30 * time.Second,
}))(sumEndpoint)
}

var concatEndpoint endpoint.Endpoint
{
concatEndpoint = jsonrpc.NewClient(
u,
"concat",
jsonrpc.ClientRequestEncoder(encodeConcatRequest),
jsonrpc.ClientResponseDecoder(decodeConcatResponse),
).Endpoint()
concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint)
concatEndpoint = limiter(concatEndpoint)
concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{
Name: "Concat",
Timeout: 30 * time.Second,
}))(concatEndpoint)
}

// Returning the endpoint.Set as a service.Service relies on the
// endpoint.Set implementing the Service methods. That's just a simple bit
// of glue code.
return addendpoint.Set{
SumEndpoint: sumEndpoint,
ConcatEndpoint: concatEndpoint,
}, nil

}

// makeEndpointCodecMap returns a codec map configured for the addsvc.
func makeEndpointCodecMap(endpoints addendpoint.Set) jsonrpc.EndpointCodecMap {
return jsonrpc.EndpointCodecMap{
"sum": jsonrpc.EndpointCodec{
Endpoint: endpoints.SumEndpoint,
Decode: decodeSumRequest,
Encode: encodeSumResponse,
},
"concat": jsonrpc.EndpointCodec{
Endpoint: endpoints.ConcatEndpoint,
Decode: decodeConcatRequest,
Encode: encodeConcatResponse,
},
}
}

func decodeSumRequest(_ context.Context, msg json.RawMessage) (interface{}, error) {
var req addendpoint.SumRequest
err := json.Unmarshal(msg, &req)
if err != nil {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("couldn't unmarshal body to sum request: %s", err),
}
}
return req, nil
}

func encodeSumResponse(_ context.Context, obj interface{}) (json.RawMessage, error) {
res, ok := obj.(addendpoint.SumResponse)
if !ok {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("Asserting result to *SumResponse failed. Got %T, %+v", obj, obj),
}
}
b, err := json.Marshal(res)
if err != nil {
return nil, fmt.Errorf("couldn't marshal response: %s", err)
}
return b, nil
}

func decodeSumResponse(_ context.Context, msg json.RawMessage) (interface{}, error) {
var res addendpoint.SumResponse
err := json.Unmarshal(msg, &res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal body to SumResponse: %s", err)
}
return res, nil
}

func encodeSumRequest(_ context.Context, obj interface{}) (json.RawMessage, error) {
req, ok := obj.(addendpoint.SumRequest)
if !ok {
return nil, fmt.Errorf("couldn't assert request as SumRequest, got %T", obj)
}
b, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("couldn't marshal request: %s", err)
}
return b, nil
}

func decodeConcatRequest(_ context.Context, msg json.RawMessage) (interface{}, error) {
var req addendpoint.ConcatRequest
err := json.Unmarshal(msg, &req)
if err != nil {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("couldn't unmarshal body to concat request: %s", err),
}
}
return req, nil
}

func encodeConcatResponse(_ context.Context, obj interface{}) (json.RawMessage, error) {
res, ok := obj.(addendpoint.ConcatResponse)
if !ok {
return nil, &jsonrpc.Error{
Code: -32000,
Message: fmt.Sprintf("Asserting result to *ConcatResponse failed. Got %T, %+v", obj, obj),
}
}
b, err := json.Marshal(res)
if err != nil {
return nil, fmt.Errorf("couldn't marshal response: %s", err)
}
return b, nil
}

func decodeConcatResponse(_ context.Context, msg json.RawMessage) (interface{}, error) {
var res addendpoint.ConcatResponse
err := json.Unmarshal(msg, &res)
if err != nil {
return nil, fmt.Errorf("couldn't unmarshal body to ConcatResponse: %s", err)
}
return res, nil
}

func encodeConcatRequest(_ context.Context, obj interface{}) (json.RawMessage, error) {
req, ok := obj.(addendpoint.ConcatRequest)
if !ok {
return nil, fmt.Errorf("couldn't assert request as ConcatRequest, got %T", obj)
}
b, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("couldn't marshal request: %s", err)
}
return b, nil
}
4 changes: 2 additions & 2 deletions transport/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (c Client) Endpoint() endpoint.Endpoint {
// request, after the response is returned. The principal
// intended use is for error logging. Additional response parameters are
// provided in the context under keys with the ContextKeyResponse prefix.
// Note: err may be nil. There maybe also no additional response parameters depending on
// when an error occurs.
// Note: err may be nil. There maybe also no additional response parameters
// depending on when an error occurs.
type ClientFinalizerFunc func(ctx context.Context, err error)

// EncodeJSONRequest is an EncodeRequestFunc that serializes the request as a
Expand Down
92 changes: 92 additions & 0 deletions transport/http/jsonrpc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# JSON RPC

[JSON RPC](http://www.jsonrpc.org) is "A light weight remote procedure call protocol". It allows for the creation of simple RPC-style APIs with human-readable messages that are front-end friendly.

## Using JSON RPC with Go-Kit
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent, thanks for this documentation.

Using JSON RPC and go-kit together is quite simple.

A JSON RPC _server_ acts as an [HTTP Handler](https://godoc.org/net/http#Handler), receiving all requests to the JSON RPC's URL. The server looks at the `method` property of the [Request Object](http://www.jsonrpc.org/specification#request_object), and routes it to the corresponding code.

Each JSON RPC _method_ is implemented as an `EndpointCodec`, a go-kit [Endpoint](https://godoc.org/github.com/go-kit/kit/endpoint#Endpoint), sandwiched between a decoder and encoder. The decoder picks apart the JSON RPC request params, which can be passed to your endpoint. The encoder receives the output from the endpoint and encodes a JSON-RPC result.

## Example — Add Service
Let's say we want a service that adds two ints together. We'll serve this at `http://localhost/rpc`. So a request to our `sum` method will be a POST to `http://localhost/rpc` with a request body of:

{
"id": 123,
"jsonrpc": "2.0",
"method": "sum",
"params": {
"A": 2,
"B": 2
}
}

### `EndpointCodecMap`
The routing table for incoming JSON RPC requests is the `EndpointCodecMap`. The key of the map is the JSON RPC method name. Here, we're routing the `sum` method to an `EndpointCodec` wrapped around `sumEndpoint`.

jsonrpc.EndpointCodecMap{
"sum": jsonrpc.EndpointCodec{
Endpoint: sumEndpoint,
Decode: decodeSumRequest,
Encode: encodeSumResponse,
},
}

### Decoder
type DecodeRequestFunc func(context.Context, json.RawMessage) (request interface{}, err error)

A `DecodeRequestFunc` is given the raw JSON from the `params` property of the Request object, _not_ the whole request object. It returns an object that will be the input to the Endpoint. For our purposes, the output should be a SumRequest, like this:

type SumRequest struct {
A, B int
}

So here's our decoder:

func decodeSumRequest(ctx context.Context, msg json.RawMessage) (interface{}, error) {
var req SumRequest
err := json.Unmarshal(msg, &req)
if err != nil {
return nil, err
}
return req, nil
}

So our `SumRequest` will now be passed to the endpoint. Once the endpoint has done its work, we hand over to the…

### Encoder
The encoder takes the output of the endpoint, and builds the raw JSON message that will form the `result` field of a [Response Object](http://www.jsonrpc.org/specification#response_object). Our result is going to be a plain int. Here's our encoder:

func encodeSumResponse(ctx context.Context, result interface{}) (json.RawMessage, error) {
sum, ok := result.(int)
if !ok {
return nil, errors.New("result is not an int")
}
b, err := json.Marshal(sum)
if err != nil {
return nil, err
}
return b, nil
}

### Server
Now that we have an EndpointCodec with decoder, endpoint, and encoder, we can wire up the server:

handler := jsonrpc.NewServer(jsonrpc.EndpointCodecMap{
"sum": jsonrpc.EndpointCodec{
Endpoint: sumEndpoint,
Decode: decodeSumRequest,
Encode: encodeSumResponse,
},
})
http.Handle("/rpc", handler)
http.ListenAndServe(":80", nil)

With all of this done, our example request above should result in a response like this:

{
"jsonrpc": "2.0",
"result": 4,
"error": null
}
Loading