diff --git a/circle.yml b/circle.yml index ad9e45af3..29520e694 100644 --- a/circle.yml +++ b/circle.yml @@ -17,7 +17,7 @@ test: - mkdir -p /home/ubuntu/.go_workspace/src/github.com/go-kit - mv /home/ubuntu/kit /home/ubuntu/.go_workspace/src/github.com/go-kit - ln -s /home/ubuntu/.go_workspace/src/github.com/go-kit/kit /home/ubuntu/kit - - go get github.com/go-kit/kit/... + - go get -t github.com/go-kit/kit/... override: - go test -v -race -tags integration github.com/go-kit/kit/...: environment: diff --git a/examples/addsvc/README.md b/examples/addsvc/README.md new file mode 100644 index 000000000..080800609 --- /dev/null +++ b/examples/addsvc/README.md @@ -0,0 +1,17 @@ +# addsvc + +addsvc is an example microservice which takes full advantage of most of Go +kit's features, including both service- and transport-level middlewares, +speaking multiple transports simultaneously, distributed tracing, and rich +error definitions. The server binary is available in cmd/addsvc. The client +binary is available in cmd/addcli. + +Finally, the addtransport package provides both server and clients for each +supported transport. The client structs bake-in certain middlewares, in order to +demonstrate the _client library pattern_. But beware: client libraries are +generally a bad idea, because they easily lead to the + [distributed monolith antipattern](https://www.microservices.com/talks/dont-build-a-distributed-monolith/). +If you don't _know_ you need to use one in your organization, it's probably best +avoided: prefer moving that logic to consumers, and relying on + [contract testing](https://docs.pact.io/best_practices/contract_tests_not_functional_tests.html) +to detect incompatibilities. diff --git a/examples/addsvc/client/grpc/client.go b/examples/addsvc/client/grpc/client.go deleted file mode 100644 index 2abcdae27..000000000 --- a/examples/addsvc/client/grpc/client.go +++ /dev/null @@ -1,75 +0,0 @@ -// Package grpc provides a gRPC client for the add service. -package grpc - -import ( - "time" - - jujuratelimit "github.com/juju/ratelimit" - stdopentracing "github.com/opentracing/opentracing-go" - "github.com/sony/gobreaker" - "google.golang.org/grpc" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc" - "github.com/go-kit/kit/examples/addsvc/pb" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/ratelimit" - "github.com/go-kit/kit/tracing/opentracing" - grpctransport "github.com/go-kit/kit/transport/grpc" -) - -// New returns an AddService backed by a gRPC client connection. It is the -// responsibility of the caller to dial, and later close, the connection. -func New(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addsvc.Service { - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = grpctransport.NewClient( - conn, - "pb.Add", - "Sum", - addsvc.EncodeGRPCSumRequest, - addsvc.DecodeGRPCSumResponse, - pb.SumReply{}, - grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), - ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = grpctransport.NewClient( - conn, - "pb.Add", - "Concat", - addsvc.EncodeGRPCConcatRequest, - addsvc.DecodeGRPCConcatResponse, - pb.ConcatReply{}, - grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), - ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 30 * time.Second, - }))(concatEndpoint) - } - - return addsvc.Endpoints{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } -} diff --git a/examples/addsvc/client/http/client.go b/examples/addsvc/client/http/client.go deleted file mode 100644 index be8d8e24d..000000000 --- a/examples/addsvc/client/http/client.go +++ /dev/null @@ -1,86 +0,0 @@ -// Package http provides an HTTP client for the add service. -package http - -import ( - "net/url" - "strings" - "time" - - jujuratelimit "github.com/juju/ratelimit" - stdopentracing "github.com/opentracing/opentracing-go" - "github.com/sony/gobreaker" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/ratelimit" - "github.com/go-kit/kit/tracing/opentracing" - httptransport "github.com/go-kit/kit/transport/http" -) - -// New returns an AddService backed by an HTTP server living at the remote -// instance. We expect instance to come from a service discovery system, so -// likely of the form "host:port". -func New(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addsvc.Service, error) { - if !strings.HasPrefix(instance, "http") { - instance = "http://" + instance - } - u, err := url.Parse(instance) - if err != nil { - return nil, err - } - - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = httptransport.NewClient( - "POST", - copyURL(u, "/sum"), - addsvc.EncodeHTTPGenericRequest, - addsvc.DecodeHTTPSumResponse, - httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), - ).Endpoint() - sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = httptransport.NewClient( - "POST", - copyURL(u, "/concat"), - addsvc.EncodeHTTPGenericRequest, - addsvc.DecodeHTTPConcatResponse, - httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), - ).Endpoint() - concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 30 * time.Second, - }))(concatEndpoint) - } - - return addsvc.Endpoints{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - }, nil -} - -func copyURL(base *url.URL, path string) *url.URL { - next := *base - next.Path = path - return &next -} diff --git a/examples/addsvc/client/thrift/client.go b/examples/addsvc/client/thrift/client.go deleted file mode 100644 index 4463ae843..000000000 --- a/examples/addsvc/client/thrift/client.go +++ /dev/null @@ -1,55 +0,0 @@ -// Package thrift provides a Thrift client for the add service. -package thrift - -import ( - "time" - - jujuratelimit "github.com/juju/ratelimit" - "github.com/sony/gobreaker" - - "github.com/go-kit/kit/circuitbreaker" - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc" - thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" - "github.com/go-kit/kit/ratelimit" -) - -// New returns an AddService backed by a Thrift server described by the provided -// client. The caller is responsible for constructing the client, and eventually -// closing the underlying transport. -func New(client *thriftadd.AddServiceClient) addsvc.Service { - // We construct a single ratelimiter middleware, to limit the total outgoing - // QPS from this client to all methods on the remote instance. We also - // construct per-endpoint circuitbreaker middlewares to demonstrate how - // that's done, although they could easily be combined into a single breaker - // for the entire remote instance, too. - - limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) - - // Thrift does not currently have tracer bindings, so we skip tracing. - - var sumEndpoint endpoint.Endpoint - { - sumEndpoint = addsvc.MakeThriftSumEndpoint(client) - sumEndpoint = limiter(sumEndpoint) - sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Sum", - Timeout: 30 * time.Second, - }))(sumEndpoint) - } - - var concatEndpoint endpoint.Endpoint - { - concatEndpoint = addsvc.MakeThriftConcatEndpoint(client) - concatEndpoint = limiter(concatEndpoint) - concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ - Name: "Concat", - Timeout: 30 * time.Second, - }))(concatEndpoint) - } - - return addsvc.Endpoints{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } -} diff --git a/examples/addsvc/cmd/addcli/main.go b/examples/addsvc/cmd/addcli/addcli.go similarity index 57% rename from examples/addsvc/cmd/addcli/main.go rename to examples/addsvc/cmd/addcli/addcli.go index c3a861d96..e2f0f94d0 100644 --- a/examples/addsvc/cmd/addcli/main.go +++ b/examples/addsvc/cmd/addcli/addcli.go @@ -6,23 +6,23 @@ import ( "fmt" "os" "strconv" - "strings" + "text/tabwriter" "time" + "google.golang.org/grpc" + "github.com/apache/thrift/lib/go/thrift" - "github.com/lightstep/lightstep-tracer-go" + lightstep "github.com/lightstep/lightstep-tracer-go" stdopentracing "github.com/opentracing/opentracing-go" zipkin "github.com/openzipkin/zipkin-go-opentracing" - "google.golang.org/grpc" "sourcegraph.com/sourcegraph/appdash" appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" - "github.com/go-kit/kit/examples/addsvc" - grpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" - httpclient "github.com/go-kit/kit/examples/addsvc/client/http" - thriftclient "github.com/go-kit/kit/examples/addsvc/client/thrift" - thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" "github.com/go-kit/kit/log" + + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc/pkg/addtransport" + addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" ) func main() { @@ -32,24 +32,23 @@ func main() { // and various client constructors both expect host:port strings. For an // example service with a client built on top of a service discovery system, // see profilesvc. - + fs := flag.NewFlagSet("addcli", flag.ExitOnError) var ( - httpAddr = flag.String("http.addr", "", "HTTP address of addsvc") - grpcAddr = flag.String("grpc.addr", "", "gRPC (HTTP) address of addsvc") - thriftAddr = flag.String("thrift.addr", "", "Thrift address of addsvc") - thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") - thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") - thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") - zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Zipkin HTTP Collector endpoint") - zipkinKafkaAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port") - appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") - lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") - method = flag.String("method", "sum", "sum, concat") + httpAddr = fs.String("http-addr", "", "HTTP address of addsvc") + grpcAddr = fs.String("grpc-addr", "", "gRPC address of addsvc") + thriftAddr = fs.String("thrift-addr", "", "Thrift address of addsvc") + thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") + thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") + thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") + zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + method = fs.String("method", "sum", "sum, concat") ) - flag.Parse() - - if len(flag.Args()) != 2 { - fmt.Fprintf(os.Stderr, "usage: addcli [flags] \n") + fs.Usage = usageFor(fs, os.Args[0]+" [flags] ") + fs.Parse(os.Args[1:]) + if len(fs.Args()) != 2 { + fs.Usage() os.Exit(1) } @@ -57,47 +56,31 @@ func main() { // Your clients will probably just use one tracer. var tracer stdopentracing.Tracer { - if *zipkinAddr != "" { - // endpoint typically looks like: http://zipkinhost:9411/api/v1/spans - collector, err := zipkin.NewHTTPCollector(*zipkinAddr) + if *zipkinURL != "" { + collector, err := zipkin.NewHTTPCollector(*zipkinURL) if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } defer collector.Close() - - tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, false, "0.0.0.0:0", "addcli"), + var ( + debug = false + hostPort = "localhost:80" + serviceName = "addsvc" ) + recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) + tracer, err = zipkin.NewTracer(recorder) if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) + fmt.Fprintln(os.Stderr, err.Error()) os.Exit(1) } - } else if *zipkinKafkaAddr != "" { - collector, err := zipkin.NewKafkaCollector( - strings.Split(*zipkinKafkaAddr, ","), - zipkin.KafkaLogger(log.NewNopLogger()), - ) - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - os.Exit(1) - } - defer collector.Close() - - tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, false, "0.0.0.0:0", "addcli"), - ) - if err != nil { - fmt.Fprintf(os.Stderr, "%v\n", err) - os.Exit(1) - } - } else if *appdashAddr != "" { - tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else if *lightstepToken != "" { tracer = lightstep.NewTracer(lightstep.Options{ AccessToken: *lightstepToken, }) defer lightstep.FlushLightStepTracer(tracer) + } else if *appdashAddr != "" { + tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) } else { tracer = stdopentracing.GlobalTracer() // no-op } @@ -105,13 +88,12 @@ func main() { // This is a demonstration client, which supports multiple transports. // Your clients will probably just define and stick with 1 transport. - var ( - service addsvc.Service - err error + svc addservice.Service + err error ) if *httpAddr != "" { - service, err = httpclient.New(*httpAddr, tracer, log.NewNopLogger()) + svc, err = addtransport.NewHTTPClient(*httpAddr, tracer, log.NewNopLogger()) } else if *grpcAddr != "" { conn, err := grpc.Dial(*grpcAddr, grpc.WithInsecure(), grpc.WithTimeout(time.Second)) if err != nil { @@ -119,7 +101,7 @@ func main() { os.Exit(1) } defer conn.Close() - service = grpcclient.New(conn, tracer, log.NewNopLogger()) + svc = addtransport.NewGRPCClient(conn, tracer, log.NewNopLogger()) } else if *thriftAddr != "" { // It's necessary to do all of this construction in the func main, // because (among other reasons) we need to control the lifecycle of the @@ -139,8 +121,8 @@ func main() { os.Exit(1) } var transportFactory thrift.TTransportFactory - if *thriftBufferSize > 0 { - transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize) + if *thriftBuffer > 0 { + transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer) } else { transportFactory = thrift.NewTTransportFactory() } @@ -162,8 +144,8 @@ func main() { os.Exit(1) } defer transport.Close() - client := thriftadd.NewAddServiceClientFactory(transport, protocolFactory) - service = thriftclient.New(client) + client := addthrift.NewAddServiceClientFactory(transport, protocolFactory) + svc = addtransport.NewThriftClient(client) } else { fmt.Fprintf(os.Stderr, "error: no remote address specified\n") os.Exit(1) @@ -175,9 +157,9 @@ func main() { switch *method { case "sum": - a, _ := strconv.ParseInt(flag.Args()[0], 10, 64) - b, _ := strconv.ParseInt(flag.Args()[1], 10, 64) - v, err := service.Sum(context.Background(), int(a), int(b)) + a, _ := strconv.ParseInt(fs.Args()[0], 10, 64) + b, _ := strconv.ParseInt(fs.Args()[1], 10, 64) + v, err := svc.Sum(context.Background(), int(a), int(b)) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) @@ -185,9 +167,9 @@ func main() { fmt.Fprintf(os.Stdout, "%d + %d = %d\n", a, b, v) case "concat": - a := flag.Args()[0] - b := flag.Args()[1] - v, err := service.Concat(context.Background(), a, b) + a := fs.Args()[0] + b := fs.Args()[1] + v, err := svc.Concat(context.Background(), a, b) if err != nil { fmt.Fprintf(os.Stderr, "error: %v\n", err) os.Exit(1) @@ -199,3 +181,18 @@ func main() { os.Exit(1) } } + +func usageFor(fs *flag.FlagSet, short string) func() { + return func() { + fmt.Fprintf(os.Stderr, "USAGE\n") + fmt.Fprintf(os.Stderr, " %s\n", short) + fmt.Fprintf(os.Stderr, "\n") + fmt.Fprintf(os.Stderr, "FLAGS\n") + w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0) + fs.VisitAll(func(f *flag.Flag) { + fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage) + }) + w.Flush() + fmt.Fprintf(os.Stderr, "\n") + } +} diff --git a/examples/addsvc/cmd/addsvc/addsvc.go b/examples/addsvc/cmd/addsvc/addsvc.go new file mode 100644 index 000000000..eedaf0722 --- /dev/null +++ b/examples/addsvc/cmd/addsvc/addsvc.go @@ -0,0 +1,280 @@ +package main + +import ( + "context" + "flag" + "fmt" + "net" + "net/http" + "os" + "os/signal" + "syscall" + "text/tabwriter" + + "github.com/apache/thrift/lib/go/thrift" + lightstep "github.com/lightstep/lightstep-tracer-go" + "github.com/oklog/oklog/pkg/group" + stdopentracing "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go-opentracing" + stdprometheus "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" + "google.golang.org/grpc" + "sourcegraph.com/sourcegraph/appdash" + appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/metrics/prometheus" + + addpb "github.com/go-kit/kit/examples/addsvc/pb" + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc/pkg/addtransport" + addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" +) + +func main() { + // Define our flags. Your service probably won't need to bind listeners for + // *all* supported transports, or support both Zipkin and LightStep, and so + // on, but we do it here for demonstration purposes. + fs := flag.NewFlagSet("addsvc", flag.ExitOnError) + var ( + debugAddr = fs.String("debug.addr", ":8080", "Debug and metrics listen address") + httpAddr = fs.String("http-addr", ":8081", "HTTP listen address") + grpcAddr = fs.String("grpc-addr", ":8082", "gRPC listen address") + thriftAddr = fs.String("thrift-addr", ":8083", "Thrift listen address") + thriftProtocol = fs.String("thrift-protocol", "binary", "binary, compact, json, simplejson") + thriftBuffer = fs.Int("thrift-buffer", 0, "0 for unbuffered") + thriftFramed = fs.Bool("thrift-framed", false, "true to enable framing") + zipkinURL = fs.String("zipkin-url", "", "Enable Zipkin tracing via a collector URL e.g. http://localhost:9411/api/v1/spans") + lightstepToken = flag.String("lightstep-token", "", "Enable LightStep tracing via a LightStep access token") + appdashAddr = flag.String("appdash-addr", "", "Enable Appdash tracing via an Appdash server host:port") + ) + fs.Usage = usageFor(fs, os.Args[0]+" [flags]") + fs.Parse(os.Args[1:]) + + // Create a single logger, which we'll use and give to other components. + var logger log.Logger + { + logger = log.NewLogfmtLogger(os.Stderr) + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + logger = log.With(logger, "caller", log.DefaultCaller) + } + + // Determine which tracer to use. We'll pass the tracer to all the + // components that use it, as a dependency. + var tracer stdopentracing.Tracer + { + if *zipkinURL != "" { + logger.Log("tracer", "Zipkin", "URL", *zipkinURL) + collector, err := zipkin.NewHTTPCollector(*zipkinURL) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + defer collector.Close() + var ( + debug = false + hostPort = "localhost:80" + serviceName = "addsvc" + ) + recorder := zipkin.NewRecorder(collector, debug, hostPort, serviceName) + tracer, err = zipkin.NewTracer(recorder) + if err != nil { + logger.Log("err", err) + os.Exit(1) + } + } else if *lightstepToken != "" { + logger.Log("tracer", "LightStep") // probably don't want to print out the token :) + tracer = lightstep.NewTracer(lightstep.Options{ + AccessToken: *lightstepToken, + }) + defer lightstep.FlushLightStepTracer(tracer) + } else if *appdashAddr != "" { + logger.Log("tracer", "Appdash", "addr", *appdashAddr) + tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) + } else { + logger.Log("tracer", "none") + tracer = stdopentracing.GlobalTracer() // no-op + } + } + + // Create the (sparse) metrics we'll use in the service. They, too, are + // dependencies that we pass to components that use them. + var ints, chars metrics.Counter + { + // Business-level metrics. + ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "example", + Subsystem: "addsvc", + Name: "integers_summed", + Help: "Total count of integers summed via the Sum method.", + }, []string{}) + chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ + Namespace: "example", + Subsystem: "addsvc", + Name: "characters_concatenated", + Help: "Total count of characters concatenated via the Concat method.", + }, []string{}) + } + var duration metrics.Histogram + { + // Endpoint-level metrics. + duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ + Namespace: "example", + Subsystem: "addsvc", + Name: "request_duration_seconds", + Help: "Request duration in seconds.", + }, []string{"method", "success"}) + } + http.DefaultServeMux.Handle("/metrics", promhttp.Handler()) + + // Build the layers of the service "onion" from the inside out. First, the + // business logic service; then, the set of endpoints that wrap the service; + // and finally, a series of concrete transport adapters. The adapters, like + // the HTTP handler or the gRPC server, are the bridge between Go kit and + // the interfaces that the transports expect. Note that we're not binding + // them to ports or anything yet; we'll do that next. + var ( + service = addservice.New(logger, ints, chars) + endpoints = addendpoint.New(service, logger, duration, tracer) + httpHandler = addtransport.NewHTTPHandler(endpoints, tracer, logger) + grpcServer = addtransport.NewGRPCServer(endpoints, tracer, logger) + thriftServer = addtransport.NewThriftServer(context.Background(), endpoints) + ) + + // Now we're to the part of the func main where we want to start actually + // running things, like servers bound to listeners to receive connections. + // + // The method is the same for each component: add a new actor to the group + // struct, which is a combination of 2 anonymous functions: the first + // function actually runs the component, and the second function should + // interrupt the first function and cause it to return. It's in these + // functions that we actually bin the Go kit server/handler structs to the + // concrete transports and start them running. + // + // Putting each component into its own block is mostly for aesthetics: it + // clearly demarcates the scope in which each listener/socket may be used. + var g group.Group + { + // The debug listener mounts the http.DefaultServeMux, and serves up + // stuff like the Prometheus metrics route, the Go debug and profiling + // routes, and so on. + debugListener, err := net.Listen("tcp", *debugAddr) + if err != nil { + logger.Log("transport", "debug/HTTP", "during", "Listen", "err", err) + os.Exit(1) + } + g.Add(func() error { + logger.Log("transport", "debug/HTTP", "addr", *debugAddr) + return http.Serve(debugListener, http.DefaultServeMux) + }, func(error) { + debugListener.Close() + }) + } + { + // The HTTP listener mounts the Go kit HTTP handler we created. + httpListener, err := net.Listen("tcp", *httpAddr) + if err != nil { + logger.Log("transport", "HTTP", "during", "Listen", "err", err) + os.Exit(1) + } + g.Add(func() error { + logger.Log("transport", "HTTP", "addr", *httpAddr) + return http.Serve(httpListener, httpHandler) + }, func(error) { + httpListener.Close() + }) + } + { + // The gRPC listener mounts the Go kit gRPC server we created. + grpcListener, err := net.Listen("tcp", *grpcAddr) + if err != nil { + logger.Log("transport", "gRPC", "during", "Listen", "err", err) + os.Exit(1) + } + g.Add(func() error { + logger.Log("transport", "gRPC", "addr", *grpcAddr) + baseServer := grpc.NewServer() + addpb.RegisterAddServer(baseServer, grpcServer) + return baseServer.Serve(grpcListener) + }, func(error) { + grpcListener.Close() + }) + } + { + // The Thrift socket mounts the Go kit Thrift server we created earlier. + // There's a lot of boilerplate involved here, related to configuring + // the protocol and transport; blame Thrift. + thriftSocket, err := thrift.NewTServerSocket(*thriftAddr) + if err != nil { + logger.Log("transport", "Thrift", "during", "Listen", "err", err) + os.Exit(1) + } + g.Add(func() error { + logger.Log("transport", "Thrift", "addr", *thriftAddr) + var protocolFactory thrift.TProtocolFactory + switch *thriftProtocol { + case "binary": + protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() + case "compact": + protocolFactory = thrift.NewTCompactProtocolFactory() + case "json": + protocolFactory = thrift.NewTJSONProtocolFactory() + case "simplejson": + protocolFactory = thrift.NewTSimpleJSONProtocolFactory() + default: + return fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol) + } + var transportFactory thrift.TTransportFactory + if *thriftBuffer > 0 { + transportFactory = thrift.NewTBufferedTransportFactory(*thriftBuffer) + } else { + transportFactory = thrift.NewTTransportFactory() + } + if *thriftFramed { + transportFactory = thrift.NewTFramedTransportFactory(transportFactory) + } + return thrift.NewTSimpleServer4( + addthrift.NewAddServiceProcessor(thriftServer), + thriftSocket, + transportFactory, + protocolFactory, + ).Serve() + }, func(error) { + thriftSocket.Close() + }) + } + { + // This function just sits and waits for ctrl-C. + cancelInterrupt := make(chan struct{}) + g.Add(func() error { + c := make(chan os.Signal, 1) + signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) + select { + case sig := <-c: + return fmt.Errorf("received signal %s", sig) + case <-cancelInterrupt: + return nil + } + }, func(error) { + close(cancelInterrupt) + }) + } + logger.Log("exit", g.Run()) +} + +func usageFor(fs *flag.FlagSet, short string) func() { + return func() { + fmt.Fprintf(os.Stderr, "USAGE\n") + fmt.Fprintf(os.Stderr, " %s\n", short) + fmt.Fprintf(os.Stderr, "\n") + fmt.Fprintf(os.Stderr, "FLAGS\n") + w := tabwriter.NewWriter(os.Stderr, 0, 2, 2, ' ', 0) + fs.VisitAll(func(f *flag.Flag) { + fmt.Fprintf(w, "\t-%s %s\t%s\n", f.Name, f.DefValue, f.Usage) + }) + w.Flush() + fmt.Fprintf(os.Stderr, "\n") + } +} diff --git a/examples/addsvc/cmd/addsvc/main.go b/examples/addsvc/cmd/addsvc/main.go deleted file mode 100644 index 722f9efa5..000000000 --- a/examples/addsvc/cmd/addsvc/main.go +++ /dev/null @@ -1,280 +0,0 @@ -package main - -import ( - "context" - "flag" - "fmt" - "net" - "net/http" - "net/http/pprof" - "os" - "os/signal" - "strings" - "syscall" - - "github.com/apache/thrift/lib/go/thrift" - lightstep "github.com/lightstep/lightstep-tracer-go" - stdopentracing "github.com/opentracing/opentracing-go" - zipkin "github.com/openzipkin/zipkin-go-opentracing" - stdprometheus "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promhttp" - "google.golang.org/grpc" - "sourcegraph.com/sourcegraph/appdash" - appdashot "sourcegraph.com/sourcegraph/appdash/opentracing" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc" - "github.com/go-kit/kit/examples/addsvc/pb" - thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" - "github.com/go-kit/kit/metrics/prometheus" - "github.com/go-kit/kit/tracing/opentracing" -) - -func main() { - var ( - debugAddr = flag.String("debug.addr", ":8080", "Debug and metrics listen address") - httpAddr = flag.String("http.addr", ":8081", "HTTP listen address") - grpcAddr = flag.String("grpc.addr", ":8082", "gRPC (HTTP) listen address") - thriftAddr = flag.String("thrift.addr", ":8083", "Thrift listen address") - thriftProtocol = flag.String("thrift.protocol", "binary", "binary, compact, json, simplejson") - thriftBufferSize = flag.Int("thrift.buffer.size", 0, "0 for unbuffered") - thriftFramed = flag.Bool("thrift.framed", false, "true to enable framing") - zipkinAddr = flag.String("zipkin.addr", "", "Enable Zipkin tracing via a Zipkin HTTP Collector endpoint") - zipkinKafkaAddr = flag.String("zipkin.kafka.addr", "", "Enable Zipkin tracing via a Kafka server host:port") - appdashAddr = flag.String("appdash.addr", "", "Enable Appdash tracing via an Appdash server host:port") - lightstepToken = flag.String("lightstep.token", "", "Enable LightStep tracing via a LightStep access token") - ) - flag.Parse() - - // Logging domain. - var logger log.Logger - { - logger = log.NewLogfmtLogger(os.Stdout) - logger = log.With(logger, "ts", log.DefaultTimestampUTC) - logger = log.With(logger, "caller", log.DefaultCaller) - } - logger.Log("msg", "hello") - defer logger.Log("msg", "goodbye") - - // Metrics domain. - var ints, chars metrics.Counter - { - // Business level metrics. - ints = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: "addsvc", - Name: "integers_summed", - Help: "Total count of integers summed via the Sum method.", - }, []string{}) - chars = prometheus.NewCounterFrom(stdprometheus.CounterOpts{ - Namespace: "addsvc", - Name: "characters_concatenated", - Help: "Total count of characters concatenated via the Concat method.", - }, []string{}) - } - var duration metrics.Histogram - { - // Transport level metrics. - duration = prometheus.NewSummaryFrom(stdprometheus.SummaryOpts{ - Namespace: "addsvc", - Name: "request_duration_ns", - Help: "Request duration in nanoseconds.", - }, []string{"method", "success"}) - } - - // Tracing domain. - var tracer stdopentracing.Tracer - { - if *zipkinAddr != "" { - logger := log.With(logger, "tracer", "ZipkinHTTP") - logger.Log("addr", *zipkinAddr) - - // endpoint typically looks like: http://zipkinhost:9411/api/v1/spans - collector, err := zipkin.NewHTTPCollector(*zipkinAddr) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - defer collector.Close() - - tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"), - ) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - } else if *zipkinKafkaAddr != "" { - logger := log.With(logger, "tracer", "ZipkinKafka") - logger.Log("addr", *zipkinKafkaAddr) - - collector, err := zipkin.NewKafkaCollector( - strings.Split(*zipkinKafkaAddr, ","), - zipkin.KafkaLogger(log.NewNopLogger()), - ) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - defer collector.Close() - - tracer, err = zipkin.NewTracer( - zipkin.NewRecorder(collector, false, "localhost:80", "addsvc"), - ) - if err != nil { - logger.Log("err", err) - os.Exit(1) - } - } else if *appdashAddr != "" { - logger := log.With(logger, "tracer", "Appdash") - logger.Log("addr", *appdashAddr) - tracer = appdashot.NewTracer(appdash.NewRemoteCollector(*appdashAddr)) - } else if *lightstepToken != "" { - logger := log.With(logger, "tracer", "LightStep") - logger.Log() // probably don't want to print out the token :) - tracer = lightstep.NewTracer(lightstep.Options{ - AccessToken: *lightstepToken, - }) - defer lightstep.FlushLightStepTracer(tracer) - } else { - logger := log.With(logger, "tracer", "none") - logger.Log() - tracer = stdopentracing.GlobalTracer() // no-op - } - } - - // Business domain. - var service addsvc.Service - { - service = addsvc.NewBasicService() - service = addsvc.ServiceLoggingMiddleware(logger)(service) - service = addsvc.ServiceInstrumentingMiddleware(ints, chars)(service) - } - - // Endpoint domain. - var sumEndpoint endpoint.Endpoint - { - sumDuration := duration.With("method", "Sum") - sumLogger := log.With(logger, "method", "Sum") - - sumEndpoint = addsvc.MakeSumEndpoint(service) - sumEndpoint = opentracing.TraceServer(tracer, "Sum")(sumEndpoint) - sumEndpoint = addsvc.EndpointInstrumentingMiddleware(sumDuration)(sumEndpoint) - sumEndpoint = addsvc.EndpointLoggingMiddleware(sumLogger)(sumEndpoint) - } - var concatEndpoint endpoint.Endpoint - { - concatDuration := duration.With("method", "Concat") - concatLogger := log.With(logger, "method", "Concat") - - concatEndpoint = addsvc.MakeConcatEndpoint(service) - concatEndpoint = opentracing.TraceServer(tracer, "Concat")(concatEndpoint) - concatEndpoint = addsvc.EndpointInstrumentingMiddleware(concatDuration)(concatEndpoint) - concatEndpoint = addsvc.EndpointLoggingMiddleware(concatLogger)(concatEndpoint) - } - endpoints := addsvc.Endpoints{ - SumEndpoint: sumEndpoint, - ConcatEndpoint: concatEndpoint, - } - - // Mechanical domain. - errc := make(chan error) - ctx := context.Background() - - // Interrupt handler. - go func() { - c := make(chan os.Signal, 1) - signal.Notify(c, syscall.SIGINT, syscall.SIGTERM) - errc <- fmt.Errorf("%s", <-c) - }() - - // Debug listener. - go func() { - logger := log.With(logger, "transport", "debug") - - m := http.NewServeMux() - m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index)) - m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline)) - m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile)) - m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol)) - m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace)) - m.Handle("/metrics", promhttp.Handler()) - - logger.Log("addr", *debugAddr) - errc <- http.ListenAndServe(*debugAddr, m) - }() - - // HTTP transport. - go func() { - logger := log.With(logger, "transport", "HTTP") - h := addsvc.MakeHTTPHandler(endpoints, tracer, logger) - logger.Log("addr", *httpAddr) - errc <- http.ListenAndServe(*httpAddr, h) - }() - - // gRPC transport. - go func() { - logger := log.With(logger, "transport", "gRPC") - - ln, err := net.Listen("tcp", *grpcAddr) - if err != nil { - errc <- err - return - } - - srv := addsvc.MakeGRPCServer(endpoints, tracer, logger) - s := grpc.NewServer() - pb.RegisterAddServer(s, srv) - - logger.Log("addr", *grpcAddr) - errc <- s.Serve(ln) - }() - - // Thrift transport. - go func() { - logger := log.With(logger, "transport", "Thrift") - - var protocolFactory thrift.TProtocolFactory - switch *thriftProtocol { - case "binary": - protocolFactory = thrift.NewTBinaryProtocolFactoryDefault() - case "compact": - protocolFactory = thrift.NewTCompactProtocolFactory() - case "json": - protocolFactory = thrift.NewTJSONProtocolFactory() - case "simplejson": - protocolFactory = thrift.NewTSimpleJSONProtocolFactory() - default: - errc <- fmt.Errorf("invalid Thrift protocol %q", *thriftProtocol) - return - } - - var transportFactory thrift.TTransportFactory - if *thriftBufferSize > 0 { - transportFactory = thrift.NewTBufferedTransportFactory(*thriftBufferSize) - } else { - transportFactory = thrift.NewTTransportFactory() - } - if *thriftFramed { - transportFactory = thrift.NewTFramedTransportFactory(transportFactory) - } - - transport, err := thrift.NewTServerSocket(*thriftAddr) - if err != nil { - errc <- err - return - } - - logger.Log("addr", *thriftAddr) - errc <- thrift.NewTSimpleServer4( - thriftadd.NewAddServiceProcessor(addsvc.MakeThriftHandler(ctx, endpoints)), - transport, - transportFactory, - protocolFactory, - ).Serve() - }() - - // Run! - logger.Log("exit", <-errc) -} diff --git a/examples/addsvc/cmd/addsvc/pact_test.go b/examples/addsvc/cmd/addsvc/pact_test.go new file mode 100644 index 000000000..2c709b58b --- /dev/null +++ b/examples/addsvc/cmd/addsvc/pact_test.go @@ -0,0 +1,55 @@ +package main + +import ( + "fmt" + "net/http" + "os" + "strings" + "testing" + + "github.com/pact-foundation/pact-go/dsl" +) + +func TestPactStringsvcUppercase(t *testing.T) { + if os.Getenv("WRITE_PACTS") == "" { + t.Skip("skipping Pact contracts; set WRITE_PACTS environment variable to enable") + } + + pact := dsl.Pact{ + Port: 6666, + Consumer: "addsvc", + Provider: "stringsvc", + } + defer pact.Teardown() + + pact.AddInteraction(). + UponReceiving("stringsvc uppercase"). + WithRequest(dsl.Request{ + Headers: map[string]string{"Content-Type": "application/json; charset=utf-8"}, + Method: "POST", + Path: "/uppercase", + Body: `{"s":"foo"}`, + }). + WillRespondWith(dsl.Response{ + Status: 200, + Headers: map[string]string{"Content-Type": "application/json; charset=utf-8"}, + Body: `{"v":"FOO"}`, + }) + + if err := pact.Verify(func() error { + u := fmt.Sprintf("http://localhost:%d/uppercase", pact.Server.Port) + req, err := http.NewRequest("POST", u, strings.NewReader(`{"s":"foo"}`)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json; charset=utf-8") + if _, err = http.DefaultClient.Do(req); err != nil { + return err + } + return nil + }); err != nil { + t.Fatal(err) + } + + pact.WritePact() +} diff --git a/examples/addsvc/cmd/addsvc/wiring_test.go b/examples/addsvc/cmd/addsvc/wiring_test.go new file mode 100644 index 000000000..ca64bac1f --- /dev/null +++ b/examples/addsvc/cmd/addsvc/wiring_test.go @@ -0,0 +1,40 @@ +package main + +import ( + "io/ioutil" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/opentracing/opentracing-go" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics/discard" + + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc/pkg/addtransport" +) + +func TestHTTP(t *testing.T) { + svc := addservice.New(log.NewNopLogger(), discard.NewCounter(), discard.NewCounter()) + eps := addendpoint.New(svc, log.NewNopLogger(), discard.NewHistogram(), opentracing.GlobalTracer()) + mux := addtransport.NewHTTPHandler(eps, opentracing.GlobalTracer(), log.NewNopLogger()) + srv := httptest.NewServer(mux) + defer srv.Close() + + for _, testcase := range []struct { + method, url, body, want string + }{ + {"GET", srv.URL + "/concat", `{"a":"1","b":"2"}`, `{"v":"12"}`}, + {"GET", srv.URL + "/sum", `{"a":1,"b":2}`, `{"v":3}`}, + } { + req, _ := http.NewRequest(testcase.method, testcase.url, strings.NewReader(testcase.body)) + resp, _ := http.DefaultClient.Do(req) + body, _ := ioutil.ReadAll(resp.Body) + if want, have := testcase.want, strings.TrimSpace(string(body)); want != have { + t.Errorf("%s %s %s: want %q, have %q", testcase.method, testcase.url, testcase.body, want, have) + } + } +} diff --git a/examples/addsvc/doc.go b/examples/addsvc/doc.go deleted file mode 100644 index 584d072cb..000000000 --- a/examples/addsvc/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Package addsvc is an example microservice, useful for education. It can sum -// integers and concatenate strings. A client library is available in the client -// subdirectory. A server binary is available in cmd/addsvc. An example client -// binary is available in cmd/addcli. -package addsvc diff --git a/examples/addsvc/endpoints.go b/examples/addsvc/endpoints.go deleted file mode 100644 index 1da33e41c..000000000 --- a/examples/addsvc/endpoints.go +++ /dev/null @@ -1,134 +0,0 @@ -package addsvc - -// This file contains methods to make individual endpoints from services, -// request and response types to serve those endpoints, as well as encoders and -// decoders for those types, for all of our supported transport serialization -// formats. It also includes endpoint middlewares. - -import ( - "context" - "fmt" - "time" - - "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" -) - -// Endpoints collects all of the endpoints that compose an add service. It's -// meant to be used as a helper struct, to collect all of the endpoints into a -// single parameter. -// -// In a server, it's useful for functions that need to operate on a per-endpoint -// basis. For example, you might pass an Endpoints to a function that produces -// an http.Handler, with each method (endpoint) wired up to a specific path. (It -// is probably a mistake in design to invoke the Service methods on the -// Endpoints struct in a server.) -// -// In a client, it's useful to collect individually constructed endpoints into a -// single type that implements the Service interface. For example, you might -// construct individual endpoints using transport/http.NewClient, combine them -// into an Endpoints, and return it to the caller as a Service. -type Endpoints struct { - SumEndpoint endpoint.Endpoint - ConcatEndpoint endpoint.Endpoint -} - -// Sum implements Service. Primarily useful in a client. -func (e Endpoints) Sum(ctx context.Context, a, b int) (int, error) { - request := sumRequest{A: a, B: b} - response, err := e.SumEndpoint(ctx, request) - if err != nil { - return 0, err - } - return response.(sumResponse).V, response.(sumResponse).Err -} - -// Concat implements Service. Primarily useful in a client. -func (e Endpoints) Concat(ctx context.Context, a, b string) (string, error) { - request := concatRequest{A: a, B: b} - response, err := e.ConcatEndpoint(ctx, request) - if err != nil { - return "", err - } - return response.(concatResponse).V, response.(concatResponse).Err -} - -// MakeSumEndpoint returns an endpoint that invokes Sum on the service. -// Primarily useful in a server. -func MakeSumEndpoint(s Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - sumReq := request.(sumRequest) - v, err := s.Sum(ctx, sumReq.A, sumReq.B) - if err == ErrIntOverflow { - return nil, err // special case; see comment on ErrIntOverflow - } - return sumResponse{ - V: v, - Err: err, - }, nil - } -} - -// MakeConcatEndpoint returns an endpoint that invokes Concat on the service. -// Primarily useful in a server. -func MakeConcatEndpoint(s Service) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - concatReq := request.(concatRequest) - v, err := s.Concat(ctx, concatReq.A, concatReq.B) - return concatResponse{ - V: v, - Err: err, - }, nil - } -} - -// EndpointInstrumentingMiddleware returns an endpoint middleware that records -// the duration of each invocation to the passed histogram. The middleware adds -// a single field: "success", which is "true" if no error is returned, and -// "false" otherwise. -func EndpointInstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - - defer func(begin time.Time) { - duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds()) - }(time.Now()) - return next(ctx, request) - - } - } -} - -// EndpointLoggingMiddleware returns an endpoint middleware that logs the -// duration of each invocation, and the resulting error, if any. -func EndpointLoggingMiddleware(logger log.Logger) endpoint.Middleware { - return func(next endpoint.Endpoint) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (response interface{}, err error) { - - defer func(begin time.Time) { - logger.Log("error", err, "took", time.Since(begin)) - }(time.Now()) - return next(ctx, request) - - } - } -} - -// These types are unexported because they only exist to serve the endpoint -// domain, which is totally encapsulated in this package. They are otherwise -// opaque to all callers. - -type sumRequest struct{ A, B int } - -type sumResponse struct { - V int - Err error -} - -type concatRequest struct{ A, B string } - -type concatResponse struct { - V string - Err error -} diff --git a/examples/addsvc/pb/addsvc.pb.go b/examples/addsvc/pb/addsvc.pb.go index a685eef04..781b50b75 100644 --- a/examples/addsvc/pb/addsvc.pb.go +++ b/examples/addsvc/pb/addsvc.pb.go @@ -1,6 +1,5 @@ -// Code generated by protoc-gen-go. +// Code generated by protoc-gen-go. DO NOT EDIT. // source: addsvc.proto -// DO NOT EDIT! /* Package pb is a generated protocol buffer package. @@ -47,6 +46,20 @@ func (m *SumRequest) String() string { return proto.CompactTextString func (*SumRequest) ProtoMessage() {} func (*SumRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} } +func (m *SumRequest) GetA() int64 { + if m != nil { + return m.A + } + return 0 +} + +func (m *SumRequest) GetB() int64 { + if m != nil { + return m.B + } + return 0 +} + // The sum response contains the result of the calculation. type SumReply struct { V int64 `protobuf:"varint,1,opt,name=v" json:"v,omitempty"` @@ -58,6 +71,20 @@ func (m *SumReply) String() string { return proto.CompactTextString(m func (*SumReply) ProtoMessage() {} func (*SumReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} } +func (m *SumReply) GetV() int64 { + if m != nil { + return m.V + } + return 0 +} + +func (m *SumReply) GetErr() string { + if m != nil { + return m.Err + } + return "" +} + // The Concat request contains two parameters. type ConcatRequest struct { A string `protobuf:"bytes,1,opt,name=a" json:"a,omitempty"` @@ -69,6 +96,20 @@ func (m *ConcatRequest) String() string { return proto.CompactTextStr func (*ConcatRequest) ProtoMessage() {} func (*ConcatRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} } +func (m *ConcatRequest) GetA() string { + if m != nil { + return m.A + } + return "" +} + +func (m *ConcatRequest) GetB() string { + if m != nil { + return m.B + } + return "" +} + // The Concat response contains the result of the concatenation. type ConcatReply struct { V string `protobuf:"bytes,1,opt,name=v" json:"v,omitempty"` @@ -80,6 +121,20 @@ func (m *ConcatReply) String() string { return proto.CompactTextStrin func (*ConcatReply) ProtoMessage() {} func (*ConcatReply) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{3} } +func (m *ConcatReply) GetV() string { + if m != nil { + return m.V + } + return "" +} + +func (m *ConcatReply) GetErr() string { + if m != nil { + return m.Err + } + return "" +} + func init() { proto.RegisterType((*SumRequest)(nil), "pb.SumRequest") proto.RegisterType((*SumReply)(nil), "pb.SumReply") @@ -200,7 +255,7 @@ func init() { proto.RegisterFile("addsvc.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ // 189 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4c, 0x49, 0x29, + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0x49, 0x4c, 0x49, 0x29, 0x2e, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2a, 0x48, 0x52, 0xd2, 0xe0, 0xe2, 0x0a, 0x2e, 0xcd, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0xe2, 0xe1, 0x62, 0x4c, 0x94, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x0e, 0x62, 0x4c, 0x04, 0xf1, 0x92, 0x24, 0x98, 0x20, 0xbc, 0x24, diff --git a/examples/addsvc/pkg/addendpoint/middleware.go b/examples/addsvc/pkg/addendpoint/middleware.go new file mode 100644 index 000000000..c83047b76 --- /dev/null +++ b/examples/addsvc/pkg/addendpoint/middleware.go @@ -0,0 +1,43 @@ +package addendpoint + +import ( + "context" + "fmt" + "time" + + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// InstrumentingMiddleware returns an endpoint middleware that records +// the duration of each invocation to the passed histogram. The middleware adds +// a single field: "success", which is "true" if no error is returned, and +// "false" otherwise. +func InstrumentingMiddleware(duration metrics.Histogram) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + + defer func(begin time.Time) { + duration.With("success", fmt.Sprint(err == nil)).Observe(time.Since(begin).Seconds()) + }(time.Now()) + return next(ctx, request) + + } + } +} + +// LoggingMiddleware returns an endpoint middleware that logs the +// duration of each invocation, and the resulting error, if any. +func LoggingMiddleware(logger log.Logger) endpoint.Middleware { + return func(next endpoint.Endpoint) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + + defer func(begin time.Time) { + logger.Log("transport_error", err, "took", time.Since(begin)) + }(time.Now()) + return next(ctx, request) + + } + } +} diff --git a/examples/addsvc/pkg/addendpoint/set.go b/examples/addsvc/pkg/addendpoint/set.go new file mode 100644 index 000000000..3a65b083b --- /dev/null +++ b/examples/addsvc/pkg/addendpoint/set.go @@ -0,0 +1,128 @@ +package addendpoint + +import ( + "context" + + rl "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" +) + +// Set collects all of the endpoints that compose an add service. It's meant to +// be used as a helper struct, to collect all of the endpoints into a single +// parameter. +type Set struct { + SumEndpoint endpoint.Endpoint + ConcatEndpoint endpoint.Endpoint +} + +// New returns a Set that wraps the provided server, and wires in all of the +// expected endpoint middlewares via the various parameters. +func New(svc addservice.Service, logger log.Logger, duration metrics.Histogram, trace stdopentracing.Tracer) Set { + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = MakeSumEndpoint(svc) + sumEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(1, 1))(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(sumEndpoint) + sumEndpoint = opentracing.TraceServer(trace, "Sum")(sumEndpoint) + sumEndpoint = LoggingMiddleware(log.With(logger, "method", "Sum"))(sumEndpoint) + sumEndpoint = InstrumentingMiddleware(duration.With("method", "Sum"))(sumEndpoint) + } + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = MakeConcatEndpoint(svc) + concatEndpoint = ratelimit.NewTokenBucketLimiter(rl.NewBucketWithRate(100, 100))(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{}))(concatEndpoint) + concatEndpoint = opentracing.TraceServer(trace, "Concat")(concatEndpoint) + concatEndpoint = LoggingMiddleware(log.With(logger, "method", "Concat"))(concatEndpoint) + concatEndpoint = InstrumentingMiddleware(duration.With("method", "Concat"))(concatEndpoint) + } + return Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// Sum implements the service interface, so Set may be used as a service. +// This is primarily useful in the context of a client library. +func (s Set) Sum(ctx context.Context, a, b int) (int, error) { + resp, err := s.SumEndpoint(ctx, SumRequest{A: a, B: b}) + if err != nil { + return 0, err + } + response := resp.(SumResponse) + return response.V, response.Err +} + +// Concat implements the service interface, so Set may be used as a +// service. This is primarily useful in the context of a client library. +func (s Set) Concat(ctx context.Context, a, b string) (string, error) { + resp, err := s.ConcatEndpoint(ctx, ConcatRequest{A: a, B: b}) + if err != nil { + return "", err + } + response := resp.(ConcatResponse) + return response.V, response.Err +} + +// MakeSumEndpoint constructs a Sum endpoint wrapping the service. +func MakeSumEndpoint(s addservice.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + req := request.(SumRequest) + v, err := s.Sum(ctx, req.A, req.B) + return SumResponse{V: v, Err: err}, nil + } +} + +// MakeConcatEndpoint constructs a Concat endpoint wrapping the service. +func MakeConcatEndpoint(s addservice.Service) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (response interface{}, err error) { + req := request.(ConcatRequest) + v, err := s.Concat(ctx, req.A, req.B) + return ConcatResponse{V: v, Err: err}, nil + } +} + +// Failer is an interface that should be implemented by response types. +// Response encoders can check if responses are Failer, and if so if they've +// failed, and if so encode them using a separate write path based on the error. +type Failer interface { + Failed() error +} + +// SumRequest collects the request parameters for the Sum method. +type SumRequest struct { + A, B int +} + +// SumResponse collects the response values for the Sum method. +type SumResponse struct { + V int `json:"v"` + Err error `json:"-"` // should be intercepted by Failed/errorEncoder +} + +// Failed implements Failer. +func (r SumResponse) Failed() error { return r.Err } + +// ConcatRequest collects the request parameters for the Concat method. +type ConcatRequest struct { + A, B string +} + +// ConcatResponse collects the response values for the Concat method. +type ConcatResponse struct { + V string `json:"v"` + Err error `json:"-"` +} + +// Failed implements Failer. +func (r ConcatResponse) Failed() error { return r.Err } diff --git a/examples/addsvc/pkg/addservice/middleware.go b/examples/addsvc/pkg/addservice/middleware.go new file mode 100644 index 000000000..5a1d6ee5b --- /dev/null +++ b/examples/addsvc/pkg/addservice/middleware.go @@ -0,0 +1,69 @@ +package addservice + +import ( + "context" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// Middleware describes a service (as opposed to endpoint) middleware. +type Middleware func(Service) Service + +// LoggingMiddleware takes a logger as a dependency +// and returns a ServiceMiddleware. +func LoggingMiddleware(logger log.Logger) Middleware { + return func(next Service) Service { + return loggingMiddleware{logger, next} + } +} + +type loggingMiddleware struct { + logger log.Logger + next Service +} + +func (mw loggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) { + defer func() { + mw.logger.Log("method", "Sum", "a", a, "b", b, "v", v, "err", err) + }() + return mw.next.Sum(ctx, a, b) +} + +func (mw loggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) { + defer func() { + mw.logger.Log("method", "Concat", "a", a, "b", b, "v", v, "err", err) + }() + return mw.next.Concat(ctx, a, b) +} + +// InstrumentingMiddleware returns a service middleware that instruments +// the number of integers summed and characters concatenated over the lifetime of +// the service. +func InstrumentingMiddleware(ints, chars metrics.Counter) Middleware { + return func(next Service) Service { + return instrumentingMiddleware{ + ints: ints, + chars: chars, + next: next, + } + } +} + +type instrumentingMiddleware struct { + ints metrics.Counter + chars metrics.Counter + next Service +} + +func (mw instrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) { + v, err := mw.next.Sum(ctx, a, b) + mw.ints.Add(float64(v)) + return v, err +} + +func (mw instrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) { + v, err := mw.next.Concat(ctx, a, b) + mw.chars.Add(float64(len(v))) + return v, err +} diff --git a/examples/addsvc/pkg/addservice/service.go b/examples/addsvc/pkg/addservice/service.go new file mode 100644 index 000000000..d884373bf --- /dev/null +++ b/examples/addsvc/pkg/addservice/service.go @@ -0,0 +1,71 @@ +package addservice + +import ( + "context" + "errors" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/metrics" +) + +// Service describes a service that adds things together. +type Service interface { + Sum(ctx context.Context, a, b int) (int, error) + Concat(ctx context.Context, a, b string) (string, error) +} + +// New returns a basic Service with all of the expected middlewares wired in. +func New(logger log.Logger, ints, chars metrics.Counter) Service { + var svc Service + { + svc = NewBasicService() + svc = LoggingMiddleware(logger)(svc) + svc = InstrumentingMiddleware(ints, chars)(svc) + } + return svc +} + +var ( + // ErrTwoZeroes is an arbitrary business rule for the Add method. + ErrTwoZeroes = errors.New("can't sum two zeroes") + + // ErrIntOverflow protects the Add method. We've decided that this error + // indicates a misbehaving service and should count against e.g. circuit + // breakers. So, we return it directly in endpoints, to illustrate the + // difference. In a real service, this probably wouldn't be the case. + ErrIntOverflow = errors.New("integer overflow") + + // ErrMaxSizeExceeded protects the Concat method. + ErrMaxSizeExceeded = errors.New("result exceeds maximum size") +) + +// NewBasicService returns a naïve, stateless implementation of Service. +func NewBasicService() Service { + return basicService{} +} + +type basicService struct{} + +const ( + intMax = 1<<31 - 1 + intMin = -(intMax + 1) + maxLen = 10 +) + +func (s basicService) Sum(_ context.Context, a, b int) (int, error) { + if a == 0 && b == 0 { + return 0, ErrTwoZeroes + } + if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) { + return 0, ErrIntOverflow + } + return a + b, nil +} + +// Concat implements Service. +func (s basicService) Concat(_ context.Context, a, b string) (string, error) { + if len(a)+len(b) > maxLen { + return "", ErrMaxSizeExceeded + } + return a + b, nil +} diff --git a/examples/addsvc/pkg/addtransport/grpc.go b/examples/addsvc/pkg/addtransport/grpc.go new file mode 100644 index 000000000..cfb4243b7 --- /dev/null +++ b/examples/addsvc/pkg/addtransport/grpc.go @@ -0,0 +1,210 @@ +package addtransport + +import ( + "context" + "errors" + "time" + + "google.golang.org/grpc" + + jujuratelimit "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + oldcontext "golang.org/x/net/context" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + grpctransport "github.com/go-kit/kit/transport/grpc" + + "github.com/go-kit/kit/examples/addsvc/pb" + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" +) + +type grpcServer struct { + sum grpctransport.Handler + concat grpctransport.Handler +} + +// NewGRPCServer makes a set of endpoints available as a gRPC AddServer. +func NewGRPCServer(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { + options := []grpctransport.ServerOption{ + grpctransport.ServerErrorLogger(logger), + } + return &grpcServer{ + sum: grpctransport.NewServer( + endpoints.SumEndpoint, + decodeGRPCSumRequest, + encodeGRPCSumResponse, + append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))..., + ), + concat: grpctransport.NewServer( + endpoints.ConcatEndpoint, + decodeGRPCConcatRequest, + encodeGRPCConcatResponse, + append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))..., + ), + } +} + +func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) { + _, rep, err := s.sum.ServeGRPC(ctx, req) + if err != nil { + return nil, err + } + return rep.(*pb.SumReply), nil +} + +func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) { + _, rep, err := s.concat.ServeGRPC(ctx, req) + if err != nil { + return nil, err + } + return rep.(*pb.ConcatReply), nil +} + +// NewGRPCClient returns an AddService backed by a gRPC server at the other end +// of the conn. The caller is responsible for constructing the conn, and +// eventually closing the underlying transport. We bake-in certain middlewares, +// implementing the client library pattern. +func NewGRPCClient(conn *grpc.ClientConn, tracer stdopentracing.Tracer, logger log.Logger) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Sum", + encodeGRPCSumRequest, + decodeGRPCSumResponse, + pb.SumReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = grpctransport.NewClient( + conn, + "pb.Add", + "Concat", + encodeGRPCConcatRequest, + decodeGRPCConcatResponse, + pb.ConcatReply{}, + grpctransport.ClientBefore(opentracing.ToGRPCRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// decodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a +// gRPC sum request to a user-domain sum request. Primarily useful in a server. +func decodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(*pb.SumRequest) + return addendpoint.SumRequest{A: int(req.A), B: int(req.B)}, nil +} + +// decodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a +// gRPC concat request to a user-domain concat request. Primarily useful in a +// server. +func decodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { + req := grpcReq.(*pb.ConcatRequest) + return addendpoint.ConcatRequest{A: req.A, B: req.B}, nil +} + +// decodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a +// gRPC sum reply to a user-domain sum response. Primarily useful in a client. +func decodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { + reply := grpcReply.(*pb.SumReply) + return addendpoint.SumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil +} + +// decodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts +// a gRPC concat reply to a user-domain concat response. Primarily useful in a +// client. +func decodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { + reply := grpcReply.(*pb.ConcatReply) + return addendpoint.ConcatResponse{V: reply.V, Err: str2err(reply.Err)}, nil +} + +// encodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a +// user-domain sum response to a gRPC sum reply. Primarily useful in a server. +func encodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { + resp := response.(addendpoint.SumResponse) + return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil +} + +// encodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts +// a user-domain concat response to a gRPC concat reply. Primarily useful in a +// server. +func encodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { + resp := response.(addendpoint.ConcatResponse) + return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil +} + +// encodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a +// user-domain sum request to a gRPC sum request. Primarily useful in a client. +func encodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.SumRequest) + return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil +} + +// encodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a +// user-domain concat request to a gRPC concat request. Primarily useful in a +// client. +func encodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.ConcatRequest) + return &pb.ConcatRequest{A: req.A, B: req.B}, nil +} + +// These annoying helper functions are required to translate Go error types to +// and from strings, which is the type we use in our IDLs to represent errors. +// There is special casing to treat empty strings as nil errors. + +func str2err(s string) error { + if s == "" { + return nil + } + return errors.New(s) +} + +func err2str(err error) string { + if err == nil { + return "" + } + return err.Error() +} diff --git a/examples/addsvc/pkg/addtransport/http.go b/examples/addsvc/pkg/addtransport/http.go new file mode 100644 index 000000000..9759d157b --- /dev/null +++ b/examples/addsvc/pkg/addtransport/http.go @@ -0,0 +1,219 @@ +package addtransport + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "io/ioutil" + "net/http" + "net/url" + "strings" + "time" + + jujuratelimit "github.com/juju/ratelimit" + stdopentracing "github.com/opentracing/opentracing-go" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/ratelimit" + "github.com/go-kit/kit/tracing/opentracing" + httptransport "github.com/go-kit/kit/transport/http" + + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" +) + +// NewHTTPHandler returns an HTTP handler that makes a set of endpoints +// available on predefined paths. +func NewHTTPHandler(endpoints addendpoint.Set, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { + options := []httptransport.ServerOption{ + httptransport.ServerErrorEncoder(errorEncoder), + httptransport.ServerErrorLogger(logger), + } + m := http.NewServeMux() + m.Handle("/sum", httptransport.NewServer( + endpoints.SumEndpoint, + decodeHTTPSumRequest, + encodeHTTPGenericResponse, + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., + )) + m.Handle("/concat", httptransport.NewServer( + endpoints.ConcatEndpoint, + decodeHTTPConcatRequest, + encodeHTTPGenericResponse, + append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., + )) + return m +} + +// NewHTTPClient returns an AddService backed by an HTTP server living at the +// remote instance. We expect instance to come from a service discovery system, +// so likely of the form "host:port". We bake-in certain middlewares, +// implementing the client library pattern. +func NewHTTPClient(instance string, tracer stdopentracing.Tracer, logger log.Logger) (addservice.Service, error) { + // Quickly sanitize the instance string. + if !strings.HasPrefix(instance, "http") { + instance = "http://" + instance + } + u, err := url.Parse(instance) + if err != nil { + return nil, err + } + + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // made your own client library, you'd do this work there, so your server + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/sum"), + encodeHTTPGenericRequest, + decodeHTTPSumResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + sumEndpoint = opentracing.TraceClient(tracer, "Sum")(sumEndpoint) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = httptransport.NewClient( + "POST", + copyURL(u, "/concat"), + encodeHTTPGenericRequest, + decodeHTTPConcatResponse, + httptransport.ClientBefore(opentracing.ToHTTPRequest(tracer, logger)), + ).Endpoint() + concatEndpoint = opentracing.TraceClient(tracer, "Concat")(concatEndpoint) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + }, nil +} + +func copyURL(base *url.URL, path string) *url.URL { + next := *base + next.Path = path + return &next +} + +func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { + w.WriteHeader(err2code(err)) + json.NewEncoder(w).Encode(errorWrapper{Error: err.Error()}) +} + +func err2code(err error) int { + switch err { + case addservice.ErrTwoZeroes, addservice.ErrMaxSizeExceeded, addservice.ErrIntOverflow: + return http.StatusBadRequest + } + return http.StatusInternalServerError +} + +func errorDecoder(r *http.Response) error { + var w errorWrapper + if err := json.NewDecoder(r.Body).Decode(&w); err != nil { + return err + } + return errors.New(w.Error) +} + +type errorWrapper struct { + Error string `json:"error"` +} + +// decodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a +// JSON-encoded sum request from the HTTP request body. Primarily useful in a +// server. +func decodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { + var req addendpoint.SumRequest + err := json.NewDecoder(r.Body).Decode(&req) + return req, err +} + +// decodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a +// JSON-encoded concat request from the HTTP request body. Primarily useful in a +// server. +func decodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { + var req addendpoint.ConcatRequest + err := json.NewDecoder(r.Body).Decode(&req) + return req, err +} + +// decodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a +// JSON-encoded sum response from the HTTP response body. If the response has a +// non-200 status code, we will interpret that as an error and attempt to decode +// the specific error message from the response body. Primarily useful in a +// client. +func decodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { + if r.StatusCode != http.StatusOK { + return nil, errors.New(r.Status) + } + var resp addendpoint.SumResponse + err := json.NewDecoder(r.Body).Decode(&resp) + return resp, err +} + +// decodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes +// a JSON-encoded concat response from the HTTP response body. If the response +// has a non-200 status code, we will interpret that as an error and attempt to +// decode the specific error message from the response body. Primarily useful in +// a client. +func decodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { + if r.StatusCode != http.StatusOK { + return nil, errors.New(r.Status) + } + var resp addendpoint.ConcatResponse + err := json.NewDecoder(r.Body).Decode(&resp) + return resp, err +} + +// encodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that +// JSON-encodes any request to the request body. Primarily useful in a client. +func encodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(request); err != nil { + return err + } + r.Body = ioutil.NopCloser(&buf) + return nil +} + +// encodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes +// the response as JSON to the response writer. Primarily useful in a server. +func encodeHTTPGenericResponse(ctx context.Context, w http.ResponseWriter, response interface{}) error { + if f, ok := response.(addendpoint.Failer); ok && f.Failed() != nil { + errorEncoder(ctx, f.Failed(), w) + return nil + } + w.Header().Set("Content-Type", "application/json; charset=utf-8") + return json.NewEncoder(w).Encode(response) +} diff --git a/examples/addsvc/pkg/addtransport/thrift.go b/examples/addsvc/pkg/addtransport/thrift.go new file mode 100644 index 000000000..cf7151428 --- /dev/null +++ b/examples/addsvc/pkg/addtransport/thrift.go @@ -0,0 +1,120 @@ +package addtransport + +import ( + "context" + "time" + + jujuratelimit "github.com/juju/ratelimit" + "github.com/sony/gobreaker" + + "github.com/go-kit/kit/circuitbreaker" + "github.com/go-kit/kit/endpoint" + "github.com/go-kit/kit/ratelimit" + + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" + addthrift "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" +) + +type thriftServer struct { + ctx context.Context + endpoints addendpoint.Set +} + +// NewThriftServer makes a set of endpoints available as a Thrift service. +func NewThriftServer(ctx context.Context, endpoints addendpoint.Set) addthrift.AddService { + return &thriftServer{ + ctx: ctx, + endpoints: endpoints, + } +} + +func (s *thriftServer) Sum(a int64, b int64) (*addthrift.SumReply, error) { + request := addendpoint.SumRequest{A: int(a), B: int(b)} + response, err := s.endpoints.SumEndpoint(s.ctx, request) + if err != nil { + return nil, err + } + resp := response.(addendpoint.SumResponse) + return &addthrift.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil +} + +func (s *thriftServer) Concat(a string, b string) (*addthrift.ConcatReply, error) { + request := addendpoint.ConcatRequest{A: a, B: b} + response, err := s.endpoints.ConcatEndpoint(s.ctx, request) + if err != nil { + return nil, err + } + resp := response.(addendpoint.ConcatResponse) + return &addthrift.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil +} + +// NewThriftClient returns an AddService backed by a Thrift server described by +// the provided client. The caller is responsible for constructing the client, +// and eventually closing the underlying transport. We bake-in certain middlewares, +// implementing the client library pattern. +func NewThriftClient(client *addthrift.AddServiceClient) addservice.Service { + // We construct a single ratelimiter middleware, to limit the total outgoing + // QPS from this client to all methods on the remote instance. We also + // construct per-endpoint circuitbreaker middlewares to demonstrate how + // that's done, although they could easily be combined into a single breaker + // for the entire remote instance, too. + limiter := ratelimit.NewTokenBucketLimiter(jujuratelimit.NewBucketWithRate(100, 100)) + + // Each individual endpoint is an http/transport.Client (which implements + // endpoint.Endpoint) that gets wrapped with various middlewares. If you + // could rely on a consistent set of client behavior. + var sumEndpoint endpoint.Endpoint + { + sumEndpoint = MakeThriftSumEndpoint(client) + sumEndpoint = limiter(sumEndpoint) + sumEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Sum", + Timeout: 30 * time.Second, + }))(sumEndpoint) + } + + // The Concat endpoint is the same thing, with slightly different + // middlewares to demonstrate how to specialize per-endpoint. + var concatEndpoint endpoint.Endpoint + { + concatEndpoint = MakeThriftConcatEndpoint(client) + concatEndpoint = limiter(concatEndpoint) + concatEndpoint = circuitbreaker.Gobreaker(gobreaker.NewCircuitBreaker(gobreaker.Settings{ + Name: "Concat", + Timeout: 10 * time.Second, + }))(concatEndpoint) + } + + // Returning the endpoint.Set as a service.Service relies on the + // endpoint.Set implementing the Service methods. That's just a simple bit + // of glue code. + return addendpoint.Set{ + SumEndpoint: sumEndpoint, + ConcatEndpoint: concatEndpoint, + } +} + +// MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client. +// Useful only in clients, and only until a proper transport/thrift.Client exists. +func MakeThriftSumEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.SumRequest) + reply, err := client.Sum(int64(req.A), int64(req.B)) + if err == addservice.ErrIntOverflow { + return nil, err // special case; see comment on ErrIntOverflow + } + return addendpoint.SumResponse{V: int(reply.Value), Err: err}, nil + } +} + +// MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift +// client. Useful only in clients, and only until a proper +// transport/thrift.Client exists. +func MakeThriftConcatEndpoint(client *addthrift.AddServiceClient) endpoint.Endpoint { + return func(ctx context.Context, request interface{}) (interface{}, error) { + req := request.(addendpoint.ConcatRequest) + reply, err := client.Concat(req.A, req.B) + return addendpoint.ConcatResponse{V: reply.Value, Err: err}, nil + } +} diff --git a/examples/addsvc/service.go b/examples/addsvc/service.go deleted file mode 100644 index 971590cc0..000000000 --- a/examples/addsvc/service.go +++ /dev/null @@ -1,163 +0,0 @@ -package addsvc - -// This file contains the Service definition, and a basic service -// implementation. It also includes service middlewares. - -import ( - "context" - "errors" - "time" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/metrics" -) - -// Service describes a service that adds things together. -type Service interface { - Sum(ctx context.Context, a, b int) (int, error) - Concat(ctx context.Context, a, b string) (string, error) -} - -// Business-domain errors like these may be served in two ways: returned -// directly by endpoints, or bundled into the response struct. Both methods can -// be made to work, but errors returned directly by endpoints are counted by -// middlewares that check errors, like circuit breakers. -// -// If you don't want that behavior -- and you probably don't -- then it's better -// to bundle errors into the response struct. - -var ( - // ErrTwoZeroes is an arbitrary business rule for the Add method. - ErrTwoZeroes = errors.New("can't sum two zeroes") - - // ErrIntOverflow protects the Add method. We've decided that this error - // indicates a misbehaving service and should count against e.g. circuit - // breakers. So, we return it directly in endpoints, to illustrate the - // difference. In a real service, this probably wouldn't be the case. - ErrIntOverflow = errors.New("integer overflow") - - // ErrMaxSizeExceeded protects the Concat method. - ErrMaxSizeExceeded = errors.New("result exceeds maximum size") -) - -// These annoying helper functions are required to translate Go error types to -// and from strings, which is the type we use in our IDLs to represent errors. -// There is special casing to treat empty strings as nil errors. - -func str2err(s string) error { - if s == "" { - return nil - } - return errors.New(s) -} - -func err2str(err error) string { - if err == nil { - return "" - } - return err.Error() -} - -// NewBasicService returns a naïve, stateless implementation of Service. -func NewBasicService() Service { - return basicService{} -} - -type basicService struct{} - -const ( - intMax = 1<<31 - 1 - intMin = -(intMax + 1) - maxLen = 102400 -) - -// Sum implements Service. -func (s basicService) Sum(_ context.Context, a, b int) (int, error) { - if a == 0 && b == 0 { - return 0, ErrTwoZeroes - } - if (b > 0 && a > (intMax-b)) || (b < 0 && a < (intMin-b)) { - return 0, ErrIntOverflow - } - return a + b, nil -} - -// Concat implements Service. -func (s basicService) Concat(_ context.Context, a, b string) (string, error) { - if len(a)+len(b) > maxLen { - return "", ErrMaxSizeExceeded - } - return a + b, nil -} - -// Middleware describes a service (as opposed to endpoint) middleware. -type Middleware func(Service) Service - -// ServiceLoggingMiddleware returns a service middleware that logs the -// parameters and result of each method invocation. -func ServiceLoggingMiddleware(logger log.Logger) Middleware { - return func(next Service) Service { - return serviceLoggingMiddleware{ - logger: logger, - next: next, - } - } -} - -type serviceLoggingMiddleware struct { - logger log.Logger - next Service -} - -func (mw serviceLoggingMiddleware) Sum(ctx context.Context, a, b int) (v int, err error) { - defer func(begin time.Time) { - mw.logger.Log( - "method", "Sum", - "a", a, "b", b, "result", v, "error", err, - "took", time.Since(begin), - ) - }(time.Now()) - return mw.next.Sum(ctx, a, b) -} - -func (mw serviceLoggingMiddleware) Concat(ctx context.Context, a, b string) (v string, err error) { - defer func(begin time.Time) { - mw.logger.Log( - "method", "Concat", - "a", a, "b", b, "result", v, "error", err, - "took", time.Since(begin), - ) - }(time.Now()) - return mw.next.Concat(ctx, a, b) -} - -// ServiceInstrumentingMiddleware returns a service middleware that instruments -// the number of integers summed and characters concatenated over the lifetime of -// the service. -func ServiceInstrumentingMiddleware(ints, chars metrics.Counter) Middleware { - return func(next Service) Service { - return serviceInstrumentingMiddleware{ - ints: ints, - chars: chars, - next: next, - } - } -} - -type serviceInstrumentingMiddleware struct { - ints metrics.Counter - chars metrics.Counter - next Service -} - -func (mw serviceInstrumentingMiddleware) Sum(ctx context.Context, a, b int) (int, error) { - v, err := mw.next.Sum(ctx, a, b) - mw.ints.Add(float64(v)) - return v, err -} - -func (mw serviceInstrumentingMiddleware) Concat(ctx context.Context, a, b string) (string, error) { - v, err := mw.next.Concat(ctx, a, b) - mw.chars.Add(float64(len(v))) - return v, err -} diff --git a/examples/addsvc/transport_grpc.go b/examples/addsvc/transport_grpc.go deleted file mode 100644 index dcfc03a05..000000000 --- a/examples/addsvc/transport_grpc.go +++ /dev/null @@ -1,118 +0,0 @@ -package addsvc - -// This file provides server-side bindings for the gRPC transport. -// It utilizes the transport/grpc.Server. - -import ( - "context" - - stdopentracing "github.com/opentracing/opentracing-go" - oldcontext "golang.org/x/net/context" - - "github.com/go-kit/kit/examples/addsvc/pb" - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/tracing/opentracing" - grpctransport "github.com/go-kit/kit/transport/grpc" -) - -// MakeGRPCServer makes a set of endpoints available as a gRPC AddServer. -func MakeGRPCServer(endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) pb.AddServer { - options := []grpctransport.ServerOption{ - grpctransport.ServerErrorLogger(logger), - } - return &grpcServer{ - sum: grpctransport.NewServer( - endpoints.SumEndpoint, - DecodeGRPCSumRequest, - EncodeGRPCSumResponse, - append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Sum", logger)))..., - ), - concat: grpctransport.NewServer( - endpoints.ConcatEndpoint, - DecodeGRPCConcatRequest, - EncodeGRPCConcatResponse, - append(options, grpctransport.ServerBefore(opentracing.FromGRPCRequest(tracer, "Concat", logger)))..., - ), - } -} - -type grpcServer struct { - sum grpctransport.Handler - concat grpctransport.Handler -} - -func (s *grpcServer) Sum(ctx oldcontext.Context, req *pb.SumRequest) (*pb.SumReply, error) { - _, rep, err := s.sum.ServeGRPC(ctx, req) - if err != nil { - return nil, err - } - return rep.(*pb.SumReply), nil -} - -func (s *grpcServer) Concat(ctx oldcontext.Context, req *pb.ConcatRequest) (*pb.ConcatReply, error) { - _, rep, err := s.concat.ServeGRPC(ctx, req) - if err != nil { - return nil, err - } - return rep.(*pb.ConcatReply), nil -} - -// DecodeGRPCSumRequest is a transport/grpc.DecodeRequestFunc that converts a -// gRPC sum request to a user-domain sum request. Primarily useful in a server. -func DecodeGRPCSumRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { - req := grpcReq.(*pb.SumRequest) - return sumRequest{A: int(req.A), B: int(req.B)}, nil -} - -// DecodeGRPCConcatRequest is a transport/grpc.DecodeRequestFunc that converts a -// gRPC concat request to a user-domain concat request. Primarily useful in a -// server. -func DecodeGRPCConcatRequest(_ context.Context, grpcReq interface{}) (interface{}, error) { - req := grpcReq.(*pb.ConcatRequest) - return concatRequest{A: req.A, B: req.B}, nil -} - -// DecodeGRPCSumResponse is a transport/grpc.DecodeResponseFunc that converts a -// gRPC sum reply to a user-domain sum response. Primarily useful in a client. -func DecodeGRPCSumResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { - reply := grpcReply.(*pb.SumReply) - return sumResponse{V: int(reply.V), Err: str2err(reply.Err)}, nil -} - -// DecodeGRPCConcatResponse is a transport/grpc.DecodeResponseFunc that converts -// a gRPC concat reply to a user-domain concat response. Primarily useful in a -// client. -func DecodeGRPCConcatResponse(_ context.Context, grpcReply interface{}) (interface{}, error) { - reply := grpcReply.(*pb.ConcatReply) - return concatResponse{V: reply.V, Err: str2err(reply.Err)}, nil -} - -// EncodeGRPCSumResponse is a transport/grpc.EncodeResponseFunc that converts a -// user-domain sum response to a gRPC sum reply. Primarily useful in a server. -func EncodeGRPCSumResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(sumResponse) - return &pb.SumReply{V: int64(resp.V), Err: err2str(resp.Err)}, nil -} - -// EncodeGRPCConcatResponse is a transport/grpc.EncodeResponseFunc that converts -// a user-domain concat response to a gRPC concat reply. Primarily useful in a -// server. -func EncodeGRPCConcatResponse(_ context.Context, response interface{}) (interface{}, error) { - resp := response.(concatResponse) - return &pb.ConcatReply{V: resp.V, Err: err2str(resp.Err)}, nil -} - -// EncodeGRPCSumRequest is a transport/grpc.EncodeRequestFunc that converts a -// user-domain sum request to a gRPC sum request. Primarily useful in a client. -func EncodeGRPCSumRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(sumRequest) - return &pb.SumRequest{A: int64(req.A), B: int64(req.B)}, nil -} - -// EncodeGRPCConcatRequest is a transport/grpc.EncodeRequestFunc that converts a -// user-domain concat request to a gRPC concat request. Primarily useful in a -// client. -func EncodeGRPCConcatRequest(_ context.Context, request interface{}) (interface{}, error) { - req := request.(concatRequest) - return &pb.ConcatRequest{A: req.A, B: req.B}, nil -} diff --git a/examples/addsvc/transport_http.go b/examples/addsvc/transport_http.go deleted file mode 100644 index 75a9b839b..000000000 --- a/examples/addsvc/transport_http.go +++ /dev/null @@ -1,130 +0,0 @@ -package addsvc - -// This file provides server-side bindings for the HTTP transport. -// It utilizes the transport/http.Server. - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "io/ioutil" - "net/http" - - stdopentracing "github.com/opentracing/opentracing-go" - - "github.com/go-kit/kit/log" - "github.com/go-kit/kit/tracing/opentracing" - httptransport "github.com/go-kit/kit/transport/http" -) - -// MakeHTTPHandler returns a handler that makes a set of endpoints available -// on predefined paths. -func MakeHTTPHandler(endpoints Endpoints, tracer stdopentracing.Tracer, logger log.Logger) http.Handler { - options := []httptransport.ServerOption{ - httptransport.ServerErrorEncoder(errorEncoder), - httptransport.ServerErrorLogger(logger), - } - m := http.NewServeMux() - m.Handle("/sum", httptransport.NewServer( - endpoints.SumEndpoint, - DecodeHTTPSumRequest, - EncodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Sum", logger)))..., - )) - m.Handle("/concat", httptransport.NewServer( - endpoints.ConcatEndpoint, - DecodeHTTPConcatRequest, - EncodeHTTPGenericResponse, - append(options, httptransport.ServerBefore(opentracing.FromHTTPRequest(tracer, "Concat", logger)))..., - )) - return m -} - -func errorEncoder(_ context.Context, err error, w http.ResponseWriter) { - code := http.StatusInternalServerError - msg := err.Error() - - switch err { - case ErrTwoZeroes, ErrMaxSizeExceeded, ErrIntOverflow: - code = http.StatusBadRequest - } - - w.WriteHeader(code) - json.NewEncoder(w).Encode(errorWrapper{Error: msg}) -} - -func errorDecoder(r *http.Response) error { - var w errorWrapper - if err := json.NewDecoder(r.Body).Decode(&w); err != nil { - return err - } - return errors.New(w.Error) -} - -type errorWrapper struct { - Error string `json:"error"` -} - -// DecodeHTTPSumRequest is a transport/http.DecodeRequestFunc that decodes a -// JSON-encoded sum request from the HTTP request body. Primarily useful in a -// server. -func DecodeHTTPSumRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req sumRequest - err := json.NewDecoder(r.Body).Decode(&req) - return req, err -} - -// DecodeHTTPConcatRequest is a transport/http.DecodeRequestFunc that decodes a -// JSON-encoded concat request from the HTTP request body. Primarily useful in a -// server. -func DecodeHTTPConcatRequest(_ context.Context, r *http.Request) (interface{}, error) { - var req concatRequest - err := json.NewDecoder(r.Body).Decode(&req) - return req, err -} - -// DecodeHTTPSumResponse is a transport/http.DecodeResponseFunc that decodes a -// JSON-encoded sum response from the HTTP response body. If the response has a -// non-200 status code, we will interpret that as an error and attempt to decode -// the specific error message from the response body. Primarily useful in a -// client. -func DecodeHTTPSumResponse(_ context.Context, r *http.Response) (interface{}, error) { - if r.StatusCode != http.StatusOK { - return nil, errorDecoder(r) - } - var resp sumResponse - err := json.NewDecoder(r.Body).Decode(&resp) - return resp, err -} - -// DecodeHTTPConcatResponse is a transport/http.DecodeResponseFunc that decodes -// a JSON-encoded concat response from the HTTP response body. If the response -// has a non-200 status code, we will interpret that as an error and attempt to -// decode the specific error message from the response body. Primarily useful in -// a client. -func DecodeHTTPConcatResponse(_ context.Context, r *http.Response) (interface{}, error) { - if r.StatusCode != http.StatusOK { - return nil, errorDecoder(r) - } - var resp concatResponse - err := json.NewDecoder(r.Body).Decode(&resp) - return resp, err -} - -// EncodeHTTPGenericRequest is a transport/http.EncodeRequestFunc that -// JSON-encodes any request to the request body. Primarily useful in a client. -func EncodeHTTPGenericRequest(_ context.Context, r *http.Request, request interface{}) error { - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(request); err != nil { - return err - } - r.Body = ioutil.NopCloser(&buf) - return nil -} - -// EncodeHTTPGenericResponse is a transport/http.EncodeResponseFunc that encodes -// the response as JSON to the response writer. Primarily useful in a server. -func EncodeHTTPGenericResponse(_ context.Context, w http.ResponseWriter, response interface{}) error { - return json.NewEncoder(w).Encode(response) -} diff --git a/examples/addsvc/transport_thrift.go b/examples/addsvc/transport_thrift.go deleted file mode 100644 index 593cd31e5..000000000 --- a/examples/addsvc/transport_thrift.go +++ /dev/null @@ -1,73 +0,0 @@ -package addsvc - -// This file provides server-side bindings for the Thrift transport. -// -// This file also provides endpoint constructors that utilize a Thrift client, -// for use in client packages, because package transport/thrift doesn't exist -// yet. See https://github.com/go-kit/kit/issues/184. - -import ( - "context" - - "github.com/go-kit/kit/endpoint" - thriftadd "github.com/go-kit/kit/examples/addsvc/thrift/gen-go/addsvc" -) - -// MakeThriftHandler makes a set of endpoints available as a Thrift service. -func MakeThriftHandler(ctx context.Context, e Endpoints) thriftadd.AddService { - return &thriftServer{ - ctx: ctx, - sum: e.SumEndpoint, - concat: e.ConcatEndpoint, - } -} - -type thriftServer struct { - ctx context.Context - sum endpoint.Endpoint - concat endpoint.Endpoint -} - -func (s *thriftServer) Sum(a int64, b int64) (*thriftadd.SumReply, error) { - request := sumRequest{A: int(a), B: int(b)} - response, err := s.sum(s.ctx, request) - if err != nil { - return nil, err - } - resp := response.(sumResponse) - return &thriftadd.SumReply{Value: int64(resp.V), Err: err2str(resp.Err)}, nil -} - -func (s *thriftServer) Concat(a string, b string) (*thriftadd.ConcatReply, error) { - request := concatRequest{A: a, B: b} - response, err := s.concat(s.ctx, request) - if err != nil { - return nil, err - } - resp := response.(concatResponse) - return &thriftadd.ConcatReply{Value: resp.V, Err: err2str(resp.Err)}, nil -} - -// MakeThriftSumEndpoint returns an endpoint that invokes the passed Thrift client. -// Useful only in clients, and only until a proper transport/thrift.Client exists. -func MakeThriftSumEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(sumRequest) - reply, err := client.Sum(int64(req.A), int64(req.B)) - if err == ErrIntOverflow { - return nil, err // special case; see comment on ErrIntOverflow - } - return sumResponse{V: int(reply.Value), Err: err}, nil - } -} - -// MakeThriftConcatEndpoint returns an endpoint that invokes the passed Thrift -// client. Useful only in clients, and only until a proper -// transport/thrift.Client exists. -func MakeThriftConcatEndpoint(client *thriftadd.AddServiceClient) endpoint.Endpoint { - return func(ctx context.Context, request interface{}) (interface{}, error) { - req := request.(concatRequest) - reply, err := client.Concat(req.A, req.B) - return concatResponse{V: reply.Value, Err: err}, nil - } -} diff --git a/examples/apigateway/main.go b/examples/apigateway/main.go index bba617818..3b4cd675e 100644 --- a/examples/apigateway/main.go +++ b/examples/apigateway/main.go @@ -16,19 +16,21 @@ import ( "syscall" "time" + consulsd "github.com/go-kit/kit/sd/consul" "github.com/gorilla/mux" "github.com/hashicorp/consul/api" stdopentracing "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" "github.com/go-kit/kit/endpoint" - "github.com/go-kit/kit/examples/addsvc" - addsvcgrpcclient "github.com/go-kit/kit/examples/addsvc/client/grpc" "github.com/go-kit/kit/log" "github.com/go-kit/kit/sd" - consulsd "github.com/go-kit/kit/sd/consul" "github.com/go-kit/kit/sd/lb" httptransport "github.com/go-kit/kit/transport/http" - "google.golang.org/grpc" + + "github.com/go-kit/kit/examples/addsvc/pkg/addendpoint" + "github.com/go-kit/kit/examples/addsvc/pkg/addservice" + "github.com/go-kit/kit/examples/addsvc/pkg/addtransport" ) func main() { @@ -79,22 +81,21 @@ func main() { // addsvc client package to construct a complete service. We can then // leverage the addsvc.Make{Sum,Concat}Endpoint constructors to convert // the complete service to specific endpoint. - var ( tags = []string{} passingOnly = true - endpoints = addsvc.Endpoints{} + endpoints = addendpoint.Set{} instancer = consulsd.NewInstancer(client, logger, "addsvc", tags, passingOnly) ) { - factory := addsvcFactory(addsvc.MakeSumEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeSumEndpoint, tracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) endpoints.SumEndpoint = retry } { - factory := addsvcFactory(addsvc.MakeConcatEndpoint, tracer, logger) + factory := addsvcFactory(addendpoint.MakeConcatEndpoint, tracer, logger) endpointer := sd.NewEndpointer(instancer, factory, logger) balancer := lb.NewRoundRobin(endpointer) retry := lb.Retry(*retryMax, *retryTimeout, balancer) @@ -105,7 +106,7 @@ func main() { // HTTP handler, and just install it under a particular path prefix in // our router. - r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addsvc.MakeHTTPHandler(endpoints, tracer, logger))) + r.PathPrefix("/addsvc").Handler(http.StripPrefix("/addsvc", addtransport.NewHTTPHandler(endpoints, tracer, logger))) } // stringsvc routes. @@ -164,7 +165,7 @@ func main() { logger.Log("exit", <-errc) } -func addsvcFactory(makeEndpoint func(addsvc.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory { +func addsvcFactory(makeEndpoint func(addservice.Service) endpoint.Endpoint, tracer stdopentracing.Tracer, logger log.Logger) sd.Factory { return func(instance string) (endpoint.Endpoint, io.Closer, error) { // We could just as easily use the HTTP or Thrift client package to make // the connection to addsvc. We've chosen gRPC arbitrarily. Note that @@ -175,7 +176,7 @@ func addsvcFactory(makeEndpoint func(addsvc.Service) endpoint.Endpoint, tracer s if err != nil { return nil, nil, err } - service := addsvcgrpcclient.New(conn, tracer, logger) + service := addtransport.NewGRPCClient(conn, tracer, logger) endpoint := makeEndpoint(service) // Notice that the addsvc gRPC client converts the connection to a